Circuit Breakers, Timeout Mechanisms, and Resource Boundaries
MangaAssist context: JP Manga store chatbot on AWS — Bedrock Claude 3 (Sonnet at $3/$15 per 1M tokens input/output, Haiku at $0.25/$1.25), OpenSearch Serverless (vector store), DynamoDB (sessions/products), ECS Fargate (orchestrator), API Gateway WebSocket, ElastiCache Redis. Target: useful answer in under 3 seconds, 1M messages/day scale.
Skill Mapping
| Field | Value |
|---|---|
| Certification | AWS AI Practitioner (AIP-C01) |
| Domain | 2 — Implementation and Integration |
| Task | 2.1 — Implement agentic AI solutions and tool integrations |
| Skill | 2.1.3 — Develop safeguarded AI workflows to ensure controlled FM behavior |
| Focus Areas | Circuit breakers, Lambda timeouts, IAM resource boundaries, Step Functions stopping conditions, token budget guardrails |
Mind Map — Safeguard Patterns for Controlled FM Behavior
mindmap
root((Safeguarded AI Workflows))
Circuit Breakers
Closed State
Normal operation
Failure counter tracks errors
Threshold triggers transition
Open State
All requests rejected immediately
Fallback responses served
Recovery timer countdown
Half-Open State
Limited probe requests allowed
Success resets to closed
Failure returns to open
Configuration
Failure threshold 5 errors
Recovery timeout 30 seconds
Half-open max probes 3
Timeout Mechanisms
Lambda Timeouts
Hard limit 15 minutes max
Soft timeout at 80 percent
Graceful degradation path
API Gateway Timeouts
WebSocket idle 10 minutes
Integration timeout 29 seconds
Custom timeout per route
Bedrock InvokeModel Timeout
SDK read timeout 60 seconds
Connection timeout 10 seconds
Retry with exponential backoff
Streaming Timeouts
First byte timeout 5 seconds
Inter-chunk timeout 2 seconds
Total stream timeout 30 seconds
IAM Resource Boundaries
Least Privilege Policies
Model-specific ARN restrictions
Region-locked access
Action-level granularity
Service Control Policies
Organization-wide FM limits
Deny expensive models
Enforce tagging requirements
Resource Tags
Cost center allocation
Environment separation
Team-level boundaries
Step Functions Controls
Stopping Conditions
MaxConcurrency limits
TimeoutSeconds per state
HeartbeatSeconds monitoring
Error Handling
Retry with backoff
Catch with fallback
ResultPath for error context
Cost Guards
Execution time limits
State transition caps
Map state concurrency
Token Budget Guardrails
Input Token Limits
Max prompt length 4096 tokens
Context window management
Truncation strategies
Output Token Limits
MaxTokens parameter
Stop sequences
Budget per conversation turn
Cost Ceiling
Daily spend limit
Per-user token quota
Alert at 80 percent threshold
1. Circuit Breaker Patterns for FM Calls
Why Circuit Breakers Matter for FM Services
Foundation Model APIs are external dependencies with variable latency and availability. Without circuit breakers, a degraded Bedrock endpoint can cascade failures through the entire MangaAssist stack: API Gateway queues requests, ECS tasks exhaust connections, DynamoDB session writes spike, and users see hanging WebSocket connections. A circuit breaker isolates the failure, serves cached or fallback responses, and allows the system to recover gracefully.
State Machine: Closed, Open, Half-Open
CLOSED STATE (Normal Operation)
|
| failure_count >= threshold (5 errors in 60s)
v
OPEN STATE (Requests Blocked)
|
| recovery_timeout elapsed (30 seconds)
v
HALF-OPEN STATE (Probe Mode)
|
|--- probe succeeds --> CLOSED STATE (reset counters)
|--- probe fails -----> OPEN STATE (restart timer)
Closed State: Every FM request passes through. The breaker tracks consecutive failures. When MangaAssist sends a product recommendation query to Claude Sonnet and it succeeds, the failure counter resets. If Bedrock returns a ThrottlingException or ModelTimeoutException, the counter increments. Once failures hit the threshold (e.g., 5 within 60 seconds), the breaker trips to Open.
Open State: All FM requests are immediately rejected without contacting Bedrock. MangaAssist returns a cached response from ElastiCache Redis or a graceful fallback message. This prevents wasting compute and money on requests that will likely fail, and protects the 3-second latency target. After a recovery timeout (e.g., 30 seconds), the breaker transitions to Half-Open.
Half-Open State: A limited number of probe requests (e.g., 3) are forwarded to Bedrock. If they succeed, the breaker returns to Closed and normal traffic resumes. If any probe fails, the breaker returns to Open and the recovery timer restarts. This prevents flooding a recovering service.
MangaAssist-Specific Thresholds
| Parameter | Value | Rationale |
|---|---|---|
| Failure threshold | 5 errors | Tolerates brief Bedrock blips without tripping |
| Failure window | 60 seconds | Distinguishes burst errors from sustained outage |
| Recovery timeout | 30 seconds | Bedrock throttling typically resolves in 15-20s |
| Half-open probes | 3 requests | Validates recovery without overwhelming endpoint |
| Probe timeout | 5 seconds | Quick validation — full 3s target + 2s buffer |
| Fallback TTL | 300 seconds | Cache popular manga responses for 5 minutes |
Circuit Breaker for Multiple FM Models
MangaAssist uses both Sonnet (complex queries) and Haiku (simple lookups). Each model needs its own circuit breaker instance because they have independent availability:
Sonnet Circuit Breaker ── [CLOSED] ── Routes complex queries
Haiku Circuit Breaker ── [OPEN] ── Fallback to cached responses
(Sonnet still operational)
When the Haiku breaker opens, MangaAssist can optionally promote simple queries to Sonnet (at higher cost) rather than serving only cached responses. This tiered fallback strategy maintains service quality.
2. Lambda Timeout Mechanisms and Graceful Degradation
Lambda Timeout Architecture
AWS Lambda enforces a hard timeout ceiling of 15 minutes. For MangaAssist's real-time chatbot, Lambda functions processing FM calls should have much tighter timeouts:
| Lambda Function | Timeout | Rationale |
|---|---|---|
manga-chat-orchestrator |
30 seconds | Full RAG pipeline: retrieve + generate |
manga-bedrock-invoke |
15 seconds | Single Bedrock InvokeModel call |
manga-embedding-generator |
10 seconds | Embedding for vector search |
manga-session-manager |
5 seconds | DynamoDB read/write operations |
manga-fallback-handler |
3 seconds | Serve cached response quickly |
Soft Timeout vs Hard Timeout
Hard Timeout: Lambda kills the execution. The response is lost. The user sees an error or timeout message through the WebSocket.
Soft Timeout: Application-level timer fires before the Lambda hard timeout. The function catches the signal and returns a partial or fallback response. This is the preferred pattern for user-facing interactions.
Timeline for manga-bedrock-invoke (15s hard timeout):
|-------- Bedrock call --------|
0s 5s 10s 12s 13s 15s
| | | | |
First byte Soft timeout | Grace Hard
expected fires (80%) | period timeout
|
Return partial
or fallback
The soft timeout fires at 80% of the hard timeout (12 seconds for a 15-second function). This leaves a 3-second grace period to write session state to DynamoDB, log the timeout event to CloudWatch, and return a fallback response through the WebSocket.
Graceful Degradation Tiers
When timeouts occur, MangaAssist degrades service quality rather than failing completely:
| Tier | Trigger | Response Quality | Latency |
|---|---|---|---|
| Tier 0 | Normal | Full Sonnet response with recommendations | < 3s |
| Tier 1 | Sonnet slow | Switch to Haiku mid-request | 3-5s |
| Tier 2 | All FM slow | Serve cached similar response from Redis | < 1s |
| Tier 3 | Full timeout | Static fallback with apology + retry button | < 0.5s |
| Tier 4 | Lambda crash | API Gateway default response | < 0.1s |
3. IAM Policy Design for Least-Privilege FM Access
Principle of Least Privilege for Bedrock
Every MangaAssist component that touches Bedrock should have the minimum IAM permissions required. This prevents accidental or malicious use of expensive models, limits blast radius if credentials are compromised, and enforces cost boundaries at the infrastructure level.
Policy Layering Strategy
Organization SCP (Deny expensive models org-wide)
└── Account-level policy (Allow only approved regions)
└── Role-level policy (Allow specific models + actions)
└── Session policy (Temporary further restrictions)
└── Resource tags (Cost allocation + environment)
IAM Policy: Bedrock Invoke — Production Chatbot Role
{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "AllowBedrockInvokeApprovedModels",
"Effect": "Allow",
"Action": [
"bedrock:InvokeModel",
"bedrock:InvokeModelWithResponseStream"
],
"Resource": [
"arn:aws:bedrock:us-east-1::foundation-model/anthropic.claude-3-sonnet-20240229-v1:0",
"arn:aws:bedrock:us-east-1::foundation-model/anthropic.claude-3-haiku-20240307-v1:0"
],
"Condition": {
"StringEquals": {
"aws:RequestedRegion": "us-east-1"
},
"NumericLessThanEquals": {
"bedrock:MaxTokens": "4096"
}
}
},
{
"Sid": "DenyExpensiveModels",
"Effect": "Deny",
"Action": "bedrock:InvokeModel",
"Resource": [
"arn:aws:bedrock:*::foundation-model/anthropic.claude-3-opus*",
"arn:aws:bedrock:*::foundation-model/anthropic.claude-3.5-sonnet*"
]
},
{
"Sid": "AllowBedrockGuardrails",
"Effect": "Allow",
"Action": [
"bedrock:ApplyGuardrail"
],
"Resource": "arn:aws:bedrock:us-east-1:123456789012:guardrail/manga-content-filter-*"
},
{
"Sid": "AllowCloudWatchLogging",
"Effect": "Allow",
"Action": [
"logs:CreateLogGroup",
"logs:CreateLogStream",
"logs:PutLogEvents"
],
"Resource": "arn:aws:logs:us-east-1:123456789012:log-group:/aws/lambda/manga-*"
}
]
}
IAM Policy: Service Control Policy (Organization-Level)
{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "DenyBedrockOutsideApprovedRegions",
"Effect": "Deny",
"Action": "bedrock:*",
"Resource": "*",
"Condition": {
"StringNotEquals": {
"aws:RequestedRegion": [
"us-east-1",
"us-west-2"
]
}
}
},
{
"Sid": "EnforceBedrockTagging",
"Effect": "Deny",
"Action": "bedrock:InvokeModel",
"Resource": "*",
"Condition": {
"Null": {
"aws:RequestTag/CostCenter": "true",
"aws:RequestTag/Environment": "true"
}
}
}
]
}
IAM Permission Boundaries for Development Teams
| Role | Allowed Models | Allowed Actions | Max Tokens | Region Lock |
|---|---|---|---|---|
manga-prod-chatbot |
Sonnet, Haiku | Invoke, InvokeStream | 4096 | us-east-1 |
manga-dev-sandbox |
Haiku only | Invoke | 1024 | us-east-1 |
manga-eval-pipeline |
Sonnet, Haiku | Invoke, GetModelEvaluation | 8192 | us-east-1, us-west-2 |
manga-admin |
All approved | Full Bedrock | 8192 | us-east-1, us-west-2 |
4. Step Functions Stopping Conditions and Heartbeat Patterns
Why Stopping Conditions Matter
Step Functions state machines coordinate multi-step AI workflows in MangaAssist: retrieving context from OpenSearch, invoking Bedrock, validating output with guardrails, and writing results back. Without stopping conditions, a stuck state (e.g., Bedrock hanging) keeps the execution running indefinitely, accumulating costs at $0.025 per 1,000 state transitions.
TimeoutSeconds vs HeartbeatSeconds
TimeoutSeconds: The maximum duration a Task state will wait for completion. If the timeout expires, the state fails with a States.Timeout error that can be caught and handled.
HeartbeatSeconds: The interval within which a long-running task must send a heartbeat signal. If no heartbeat arrives within this window, the state fails with a States.Timeout error. This detects tasks that are alive but stuck.
TimeoutSeconds = 60 (total budget)
HeartbeatSeconds = 15 (must check in every 15s)
Timeline:
0s 15s 30s 45s 60s
|------|------|------|------|
HB1 HB2 HB3 HB4 TIMEOUT
OK OK MISS → States.Timeout error
Step Functions ASL: Safeguarded RAG Pipeline
{
"Comment": "MangaAssist Safeguarded RAG Pipeline with Stopping Conditions",
"StartAt": "ValidateInput",
"TimeoutSeconds": 120,
"States": {
"ValidateInput": {
"Type": "Task",
"Resource": "arn:aws:lambda:us-east-1:123456789012:function:manga-validate-input",
"TimeoutSeconds": 5,
"ResultPath": "$.validation",
"Next": "CheckValidation",
"Retry": [
{
"ErrorEquals": ["Lambda.ServiceException"],
"IntervalSeconds": 1,
"MaxAttempts": 2,
"BackoffRate": 2.0
}
],
"Catch": [
{
"ErrorEquals": ["States.ALL"],
"Next": "HandleValidationError",
"ResultPath": "$.error"
}
]
},
"CheckValidation": {
"Type": "Choice",
"Choices": [
{
"Variable": "$.validation.isValid",
"BooleanEquals": true,
"Next": "RetrieveContext"
}
],
"Default": "ReturnValidationError"
},
"RetrieveContext": {
"Type": "Task",
"Resource": "arn:aws:lambda:us-east-1:123456789012:function:manga-retrieve-context",
"TimeoutSeconds": 10,
"HeartbeatSeconds": 5,
"ResultPath": "$.context",
"Next": "InvokeFoundationModel",
"Retry": [
{
"ErrorEquals": ["States.Timeout"],
"IntervalSeconds": 2,
"MaxAttempts": 1,
"BackoffRate": 1.0
}
],
"Catch": [
{
"ErrorEquals": ["States.Timeout"],
"Next": "ServeCachedResponse",
"ResultPath": "$.error"
},
{
"ErrorEquals": ["States.ALL"],
"Next": "HandleRetrievalError",
"ResultPath": "$.error"
}
]
},
"InvokeFoundationModel": {
"Type": "Task",
"Resource": "arn:aws:lambda:us-east-1:123456789012:function:manga-bedrock-invoke",
"TimeoutSeconds": 20,
"HeartbeatSeconds": 8,
"ResultPath": "$.fmResponse",
"Next": "ApplyGuardrails",
"Retry": [
{
"ErrorEquals": ["BedrockThrottling"],
"IntervalSeconds": 3,
"MaxAttempts": 3,
"BackoffRate": 2.0
},
{
"ErrorEquals": ["States.Timeout"],
"IntervalSeconds": 5,
"MaxAttempts": 1,
"BackoffRate": 1.0
}
],
"Catch": [
{
"ErrorEquals": ["States.Timeout", "BedrockThrottling"],
"Next": "FallbackToHaiku",
"ResultPath": "$.error"
},
{
"ErrorEquals": ["States.ALL"],
"Next": "HandleFMError",
"ResultPath": "$.error"
}
]
},
"FallbackToHaiku": {
"Type": "Task",
"Resource": "arn:aws:lambda:us-east-1:123456789012:function:manga-bedrock-invoke-haiku",
"TimeoutSeconds": 10,
"HeartbeatSeconds": 5,
"ResultPath": "$.fmResponse",
"Next": "ApplyGuardrails",
"Catch": [
{
"ErrorEquals": ["States.ALL"],
"Next": "ServeCachedResponse",
"ResultPath": "$.error"
}
]
},
"ApplyGuardrails": {
"Type": "Task",
"Resource": "arn:aws:lambda:us-east-1:123456789012:function:manga-apply-guardrails",
"TimeoutSeconds": 5,
"ResultPath": "$.guardrailResult",
"Next": "CheckGuardrailResult",
"Catch": [
{
"ErrorEquals": ["States.ALL"],
"Next": "HandleGuardrailError",
"ResultPath": "$.error"
}
]
},
"CheckGuardrailResult": {
"Type": "Choice",
"Choices": [
{
"Variable": "$.guardrailResult.passed",
"BooleanEquals": true,
"Next": "ReturnResponse"
}
],
"Default": "ReturnFilteredResponse"
},
"ReturnResponse": {
"Type": "Succeed"
},
"ReturnFilteredResponse": {
"Type": "Pass",
"Result": {
"message": "I can help with manga recommendations. Could you rephrase your question?",
"filtered": true
},
"End": true
},
"ServeCachedResponse": {
"Type": "Task",
"Resource": "arn:aws:lambda:us-east-1:123456789012:function:manga-serve-cached",
"TimeoutSeconds": 3,
"End": true,
"Catch": [
{
"ErrorEquals": ["States.ALL"],
"Next": "ReturnStaticFallback",
"ResultPath": "$.error"
}
]
},
"ReturnStaticFallback": {
"Type": "Pass",
"Result": {
"message": "Our manga assistant is temporarily busy. Please try again in a moment.",
"fallback": true
},
"End": true
},
"ReturnValidationError": {
"Type": "Pass",
"Result": {
"message": "Please provide a valid manga-related question.",
"error": "validation_failed"
},
"End": true
},
"HandleValidationError": { "Type": "Pass", "Result": {"error": "validation_error"}, "End": true },
"HandleRetrievalError": { "Type": "Pass", "Result": {"error": "retrieval_error"}, "End": true },
"HandleFMError": { "Type": "Pass", "Result": {"error": "fm_error"}, "End": true },
"HandleGuardrailError": { "Type": "Pass", "Result": {"error": "guardrail_error"}, "End": true }
}
}
Stopping Condition Summary Table
| State | TimeoutSeconds | HeartbeatSeconds | Retry | Fallback Path |
|---|---|---|---|---|
| ValidateInput | 5 | — | 2 attempts | HandleValidationError |
| RetrieveContext | 10 | 5 | 1 attempt | ServeCachedResponse |
| InvokeFoundationModel | 20 | 8 | 3 (throttle), 1 (timeout) | FallbackToHaiku |
| FallbackToHaiku | 10 | 5 | — | ServeCachedResponse |
| ApplyGuardrails | 5 | — | — | HandleGuardrailError |
| ServeCachedResponse | 3 | — | — | ReturnStaticFallback |
Execution-Level vs State-Level Timeouts
The state machine itself has "TimeoutSeconds": 120 — a global budget for the entire workflow. Individual states have their own budgets. This dual-level approach ensures that even if individual retries and fallbacks chain together, the total execution never exceeds 2 minutes.
5. Token Budget Guardrails as Safety Mechanisms
The Cost Risk Without Token Budgets
At MangaAssist's scale of 1M messages/day, uncontrolled token usage is a financial risk:
| Scenario | Input Tokens/Msg | Output Tokens/Msg | Daily Cost (Sonnet) | Monthly Cost |
|---|---|---|---|---|
| Controlled (avg) | 800 | 400 | $4,200 | $126,000 |
| Uncontrolled (prompt injection) | 4,000 | 4,096 | $64,440 | $1,933,200 |
| With token budget (capped) | 800 | 1,024 | $7,800 | $234,000 |
Token budgets prevent a single malicious or poorly constructed prompt from consuming excessive resources.
Token Budget Strategy
Per-Request Budget:
Input: max 2,048 tokens (truncate context if needed)
Output: max 1,024 tokens (set MaxTokens parameter)
Per-Session Budget (conversation):
Total: max 20,000 tokens across all turns
Action: After limit, require new session
Per-User Budget (daily):
Total: max 100,000 tokens per user per day
Action: After 80%, warn user; at 100%, serve cached only
System Budget (hourly):
Total: max 50M tokens per hour across all users
Action: At 80%, route new requests to Haiku; at 100%, cached only
Token Counting and Enforcement Points
| Enforcement Point | Mechanism | Latency Impact |
|---|---|---|
| API Gateway | Request size limit (256KB) | < 1ms |
| Lambda pre-processing | tiktoken token count | ~5ms |
| Bedrock InvokeModel | max_tokens parameter |
None (API param) |
| Redis token counter | Atomic increment per user/session | ~1ms |
| CloudWatch alarm | Hourly aggregation alert | Async (no latency) |
6. Production Python Code
CircuitBreaker Class
import time
import threading
import logging
from enum import Enum
from dataclasses import dataclass, field
from typing import Optional, Callable, Any, Dict
from functools import wraps
logger = logging.getLogger("manga_assist.circuit_breaker")
class CircuitState(Enum):
"""Circuit breaker states for FM call protection."""
CLOSED = "closed" # Normal operation — requests flow through
OPEN = "open" # Failure detected — requests blocked
HALF_OPEN = "half_open" # Recovery probe — limited requests allowed
@dataclass
class CircuitBreakerConfig:
"""Configuration for a circuit breaker instance."""
failure_threshold: int = 5 # Errors before tripping
failure_window_seconds: float = 60 # Window to count failures
recovery_timeout_seconds: float = 30 # Time in OPEN before HALF_OPEN
half_open_max_probes: int = 3 # Probes allowed in HALF_OPEN
probe_timeout_seconds: float = 5 # Timeout for probe requests
model_id: str = "" # Which FM model this protects
name: str = "default" # Human-readable breaker name
class CircuitBreaker:
"""
Circuit breaker for Bedrock FM calls in MangaAssist.
Protects the system from cascading failures when Bedrock
endpoints are degraded. Tracks failures per model and
provides fast-fail with fallback responses.
Usage:
breaker = CircuitBreaker(CircuitBreakerConfig(
failure_threshold=5,
recovery_timeout_seconds=30,
name="sonnet-breaker",
model_id="anthropic.claude-3-sonnet-20240229-v1:0"
))
try:
response = breaker.call(bedrock_invoke, prompt=user_query)
except CircuitOpenError:
response = serve_cached_fallback(user_query)
"""
def __init__(self, config: CircuitBreakerConfig):
self.config = config
self._state = CircuitState.CLOSED
self._failure_timestamps: list[float] = []
self._last_failure_time: float = 0
self._open_since: float = 0
self._half_open_probes: int = 0
self._half_open_successes: int = 0
self._lock = threading.Lock()
self._metrics = {
"total_calls": 0,
"total_failures": 0,
"total_rejections": 0,
"total_fallbacks": 0,
"state_transitions": [],
}
logger.info(
f"CircuitBreaker '{config.name}' initialized for model "
f"'{config.model_id}' — threshold={config.failure_threshold}, "
f"recovery={config.recovery_timeout_seconds}s"
)
@property
def state(self) -> CircuitState:
"""Current breaker state, auto-transitioning OPEN -> HALF_OPEN if recovery elapsed."""
with self._lock:
if self._state == CircuitState.OPEN:
elapsed = time.time() - self._open_since
if elapsed >= self.config.recovery_timeout_seconds:
self._transition_to(CircuitState.HALF_OPEN)
return self._state
def _transition_to(self, new_state: CircuitState) -> None:
"""Internal state transition with logging and metrics."""
old_state = self._state
self._state = new_state
transition = {
"from": old_state.value,
"to": new_state.value,
"timestamp": time.time(),
}
self._metrics["state_transitions"].append(transition)
logger.warning(
f"CircuitBreaker '{self.config.name}': "
f"{old_state.value} -> {new_state.value}"
)
if new_state == CircuitState.HALF_OPEN:
self._half_open_probes = 0
self._half_open_successes = 0
elif new_state == CircuitState.CLOSED:
self._failure_timestamps.clear()
def _count_recent_failures(self) -> int:
"""Count failures within the configured window."""
cutoff = time.time() - self.config.failure_window_seconds
self._failure_timestamps = [
t for t in self._failure_timestamps if t > cutoff
]
return len(self._failure_timestamps)
def call(self, func: Callable, *args, **kwargs) -> Any:
"""
Execute a function through the circuit breaker.
Args:
func: The function to call (e.g., bedrock_invoke)
*args, **kwargs: Arguments to pass to the function
Returns:
The function's return value if successful
Raises:
CircuitOpenError: If the breaker is OPEN and blocking requests
"""
current_state = self.state # Triggers auto-transition check
self._metrics["total_calls"] += 1
if current_state == CircuitState.OPEN:
self._metrics["total_rejections"] += 1
logger.info(
f"CircuitBreaker '{self.config.name}': "
f"OPEN — rejecting request (recovery in "
f"{self._time_until_half_open():.1f}s)"
)
raise CircuitOpenError(
breaker_name=self.config.name,
model_id=self.config.model_id,
time_until_recovery=self._time_until_half_open(),
)
if current_state == CircuitState.HALF_OPEN:
with self._lock:
if self._half_open_probes >= self.config.half_open_max_probes:
raise CircuitOpenError(
breaker_name=self.config.name,
model_id=self.config.model_id,
time_until_recovery=0,
)
self._half_open_probes += 1
try:
result = func(*args, **kwargs)
self._on_success()
return result
except Exception as e:
self._on_failure(e)
raise
def _on_success(self) -> None:
"""Handle successful call — potentially close the breaker."""
with self._lock:
if self._state == CircuitState.HALF_OPEN:
self._half_open_successes += 1
if self._half_open_successes >= self.config.half_open_max_probes:
self._transition_to(CircuitState.CLOSED)
logger.info(
f"CircuitBreaker '{self.config.name}': "
f"Recovery confirmed — back to CLOSED"
)
def _on_failure(self, error: Exception) -> None:
"""Handle failed call — potentially open the breaker."""
with self._lock:
now = time.time()
self._failure_timestamps.append(now)
self._last_failure_time = now
self._metrics["total_failures"] += 1
if self._state == CircuitState.HALF_OPEN:
self._transition_to(CircuitState.OPEN)
self._open_since = now
logger.warning(
f"CircuitBreaker '{self.config.name}': "
f"Probe failed — returning to OPEN"
)
elif self._state == CircuitState.CLOSED:
recent = self._count_recent_failures()
if recent >= self.config.failure_threshold:
self._transition_to(CircuitState.OPEN)
self._open_since = now
logger.error(
f"CircuitBreaker '{self.config.name}': "
f"Threshold reached ({recent} failures) — OPEN"
)
def _time_until_half_open(self) -> float:
"""Seconds remaining until OPEN -> HALF_OPEN transition."""
if self._state != CircuitState.OPEN:
return 0
elapsed = time.time() - self._open_since
return max(0, self.config.recovery_timeout_seconds - elapsed)
def get_metrics(self) -> Dict[str, Any]:
"""Return current metrics for monitoring."""
return {
**self._metrics,
"current_state": self._state.value,
"breaker_name": self.config.name,
"model_id": self.config.model_id,
"recent_failures": self._count_recent_failures(),
}
def force_open(self) -> None:
"""Manually trip the breaker (e.g., from admin console)."""
with self._lock:
self._transition_to(CircuitState.OPEN)
self._open_since = time.time()
def force_close(self) -> None:
"""Manually reset the breaker (e.g., after confirming recovery)."""
with self._lock:
self._transition_to(CircuitState.CLOSED)
class CircuitOpenError(Exception):
"""Raised when a circuit breaker is OPEN and blocking requests."""
def __init__(self, breaker_name: str, model_id: str, time_until_recovery: float):
self.breaker_name = breaker_name
self.model_id = model_id
self.time_until_recovery = time_until_recovery
super().__init__(
f"Circuit breaker '{breaker_name}' is OPEN for model '{model_id}'. "
f"Recovery in {time_until_recovery:.1f}s"
)
TimeoutManager Class
import signal
import asyncio
import time
import logging
from dataclasses import dataclass
from typing import Optional, Callable, Any, Dict
from concurrent.futures import ThreadPoolExecutor, TimeoutError as FuturesTimeoutError
logger = logging.getLogger("manga_assist.timeout_manager")
@dataclass
class TimeoutConfig:
"""Timeout configuration for a Lambda function or service call."""
hard_timeout_seconds: float # Lambda/system hard limit
soft_timeout_pct: float = 0.80 # Soft timeout as fraction of hard
grace_period_seconds: float = 2 # Time to write state + return fallback
name: str = "default"
@property
def soft_timeout_seconds(self) -> float:
return self.hard_timeout_seconds * self.soft_timeout_pct
@property
def effective_work_seconds(self) -> float:
return self.soft_timeout_seconds - self.grace_period_seconds
class TimeoutManager:
"""
Manages soft and hard timeouts for MangaAssist Lambda functions.
Provides graceful degradation by firing a soft timeout before the
Lambda hard limit, giving the function time to return a fallback
response rather than being killed mid-execution.
Usage:
tm = TimeoutManager(TimeoutConfig(
hard_timeout_seconds=15,
soft_timeout_pct=0.80,
name="bedrock-invoke"
))
result = tm.execute_with_timeout(
func=invoke_bedrock,
fallback_func=serve_cached_response,
prompt=user_query
)
"""
def __init__(self, config: TimeoutConfig):
self.config = config
self._executor = ThreadPoolExecutor(max_workers=2)
self._metrics = {
"total_calls": 0,
"soft_timeouts": 0,
"hard_timeouts": 0,
"successful_calls": 0,
"fallback_served": 0,
"avg_duration_ms": 0,
}
self._total_duration_ms: float = 0
logger.info(
f"TimeoutManager '{config.name}' initialized — "
f"hard={config.hard_timeout_seconds}s, "
f"soft={config.soft_timeout_seconds:.1f}s, "
f"effective_work={config.effective_work_seconds:.1f}s"
)
def execute_with_timeout(
self,
func: Callable,
fallback_func: Optional[Callable] = None,
*args,
**kwargs,
) -> Dict[str, Any]:
"""
Execute a function with soft timeout and fallback.
Returns:
Dict with 'result', 'source' ('primary'|'fallback'|'error'),
and 'duration_ms'.
"""
self._metrics["total_calls"] += 1
start_time = time.time()
try:
future = self._executor.submit(func, *args, **kwargs)
result = future.result(timeout=self.config.soft_timeout_seconds)
duration_ms = (time.time() - start_time) * 1000
self._record_success(duration_ms)
return {
"result": result,
"source": "primary",
"duration_ms": round(duration_ms, 2),
"timeout_budget_remaining_ms": round(
(self.config.hard_timeout_seconds * 1000) - duration_ms, 2
),
}
except FuturesTimeoutError:
self._metrics["soft_timeouts"] += 1
elapsed = time.time() - start_time
logger.warning(
f"TimeoutManager '{self.config.name}': Soft timeout at "
f"{elapsed:.2f}s — invoking fallback"
)
if fallback_func:
return self._execute_fallback(fallback_func, start_time, *args, **kwargs)
return {
"result": None,
"source": "error",
"error": "soft_timeout",
"duration_ms": round(elapsed * 1000, 2),
}
except Exception as e:
elapsed = time.time() - start_time
logger.error(
f"TimeoutManager '{self.config.name}': Error after "
f"{elapsed:.2f}s — {type(e).__name__}: {e}"
)
if fallback_func:
return self._execute_fallback(fallback_func, start_time, *args, **kwargs)
return {
"result": None,
"source": "error",
"error": str(e),
"duration_ms": round(elapsed * 1000, 2),
}
def _execute_fallback(
self,
fallback_func: Callable,
start_time: float,
*args,
**kwargs,
) -> Dict[str, Any]:
"""Execute the fallback function within the grace period."""
try:
grace_deadline = self.config.grace_period_seconds
future = self._executor.submit(fallback_func, *args, **kwargs)
result = future.result(timeout=grace_deadline)
duration_ms = (time.time() - start_time) * 1000
self._metrics["fallback_served"] += 1
return {
"result": result,
"source": "fallback",
"duration_ms": round(duration_ms, 2),
}
except Exception as e:
duration_ms = (time.time() - start_time) * 1000
logger.error(
f"TimeoutManager '{self.config.name}': Fallback also failed — {e}"
)
return {
"result": {"message": "Service temporarily unavailable. Please retry."},
"source": "error",
"error": f"fallback_failed: {e}",
"duration_ms": round(duration_ms, 2),
}
def _record_success(self, duration_ms: float) -> None:
self._metrics["successful_calls"] += 1
self._total_duration_ms += duration_ms
self._metrics["avg_duration_ms"] = round(
self._total_duration_ms / self._metrics["successful_calls"], 2
)
def get_metrics(self) -> Dict[str, Any]:
return {**self._metrics, "name": self.config.name}
class StreamingTimeoutManager:
"""
Timeout manager for Bedrock streaming responses.
Tracks three timeout types:
- First byte timeout: max time to receive first chunk
- Inter-chunk timeout: max gap between consecutive chunks
- Total stream timeout: max time for complete response
"""
def __init__(
self,
first_byte_timeout: float = 5.0,
inter_chunk_timeout: float = 2.0,
total_timeout: float = 30.0,
name: str = "streaming",
):
self.first_byte_timeout = first_byte_timeout
self.inter_chunk_timeout = inter_chunk_timeout
self.total_timeout = total_timeout
self.name = name
async def consume_stream(self, stream_iterator) -> Dict[str, Any]:
"""Consume a Bedrock streaming response with timeout enforcement."""
chunks = []
start_time = time.time()
last_chunk_time = start_time
first_chunk_received = False
try:
async for chunk in stream_iterator:
now = time.time()
total_elapsed = now - start_time
if not first_chunk_received:
if total_elapsed > self.first_byte_timeout:
raise TimeoutError(
f"First byte timeout: {self.first_byte_timeout}s exceeded"
)
first_chunk_received = True
else:
gap = now - last_chunk_time
if gap > self.inter_chunk_timeout:
raise TimeoutError(
f"Inter-chunk timeout: {gap:.2f}s > {self.inter_chunk_timeout}s"
)
if total_elapsed > self.total_timeout:
logger.warning(
f"StreamingTimeoutManager '{self.name}': "
f"Total timeout reached at {total_elapsed:.2f}s — "
f"returning partial ({len(chunks)} chunks)"
)
break
chunks.append(chunk)
last_chunk_time = now
return {
"chunks": chunks,
"complete": total_elapsed <= self.total_timeout,
"total_chunks": len(chunks),
"duration_ms": round((time.time() - start_time) * 1000, 2),
}
except TimeoutError as e:
return {
"chunks": chunks,
"complete": False,
"error": str(e),
"total_chunks": len(chunks),
"duration_ms": round((time.time() - start_time) * 1000, 2),
}
SafeguardOrchestrator Class
import json
import time
import logging
import hashlib
from dataclasses import dataclass, field
from typing import Optional, Dict, Any, List
import boto3
import redis
logger = logging.getLogger("manga_assist.safeguard_orchestrator")
@dataclass
class TokenBudget:
"""Token budget limits for various scopes."""
max_input_tokens: int = 2048
max_output_tokens: int = 1024
max_session_tokens: int = 20_000
max_user_daily_tokens: int = 100_000
max_system_hourly_tokens: int = 50_000_000
warn_threshold_pct: float = 0.80
@dataclass
class SafeguardConfig:
"""Complete safeguard configuration for MangaAssist."""
token_budget: TokenBudget = field(default_factory=TokenBudget)
primary_model: str = "anthropic.claude-3-sonnet-20240229-v1:0"
fallback_model: str = "anthropic.claude-3-haiku-20240307-v1:0"
redis_host: str = "manga-cache.abc123.ng.0001.use1.cache.amazonaws.com"
redis_port: int = 6379
cache_ttl_seconds: int = 300
region: str = "us-east-1"
class SafeguardOrchestrator:
"""
Orchestrates all safeguard mechanisms for MangaAssist FM calls.
Combines circuit breakers, timeout management, token budgets,
and IAM-aware invocation into a single entry point. Every FM
request flows through this orchestrator.
Flow:
1. Check token budget (user, session, system)
2. Check circuit breaker state
3. Truncate input if over token limit
4. Invoke FM with timeout management
5. Record token usage
6. Return result or fallback
Usage:
orchestrator = SafeguardOrchestrator(config)
result = orchestrator.process_request(
user_id="user-123",
session_id="sess-456",
prompt="Recommend manga like One Piece",
context_docs=retrieved_docs,
)
"""
def __init__(self, config: SafeguardConfig):
self.config = config
self._bedrock = boto3.client("bedrock-runtime", region_name=config.region)
self._redis = redis.Redis(
host=config.redis_host,
port=config.redis_port,
decode_responses=True,
)
# Circuit breakers — one per model
self._breakers = {
config.primary_model: CircuitBreaker(CircuitBreakerConfig(
failure_threshold=5,
recovery_timeout_seconds=30,
name="sonnet-breaker",
model_id=config.primary_model,
)),
config.fallback_model: CircuitBreaker(CircuitBreakerConfig(
failure_threshold=8,
recovery_timeout_seconds=20,
name="haiku-breaker",
model_id=config.fallback_model,
)),
}
# Timeout manager for FM calls
self._timeout_mgr = TimeoutManager(TimeoutConfig(
hard_timeout_seconds=15,
soft_timeout_pct=0.80,
name="fm-invoke",
))
logger.info("SafeguardOrchestrator initialized with full safeguard stack")
def process_request(
self,
user_id: str,
session_id: str,
prompt: str,
context_docs: Optional[List[str]] = None,
) -> Dict[str, Any]:
"""
Process a user request through all safeguard layers.
Returns:
Dict with 'response', 'model_used', 'tokens_used',
'safeguard_actions', and 'duration_ms'.
"""
start_time = time.time()
safeguard_actions = []
# --- Step 1: Token budget check ---
budget_status = self._check_token_budget(user_id, session_id)
if budget_status["blocked"]:
safeguard_actions.append(f"token_budget_exceeded: {budget_status['reason']}")
return self._serve_budget_exceeded_response(
user_id, budget_status, safeguard_actions, start_time
)
if budget_status.get("warning"):
safeguard_actions.append(f"token_budget_warning: {budget_status['warning']}")
# --- Step 2: Prepare input with token truncation ---
prepared_prompt, truncated = self._prepare_prompt(prompt, context_docs)
if truncated:
safeguard_actions.append("input_truncated_to_budget")
# --- Step 3: Try primary model through circuit breaker ---
model_used = self.config.primary_model
try:
breaker = self._breakers[model_used]
result = self._timeout_mgr.execute_with_timeout(
func=lambda: breaker.call(
self._invoke_bedrock, model_used, prepared_prompt
),
fallback_func=lambda: self._try_fallback_model(prepared_prompt),
)
except CircuitOpenError as e:
safeguard_actions.append(f"circuit_open: {e.breaker_name}")
result = self._try_fallback_model(prepared_prompt)
if result is None:
return self._serve_cached_response(
prompt, safeguard_actions, start_time
)
model_used = self.config.fallback_model
# --- Step 4: Process result ---
if result.get("source") == "error":
safeguard_actions.append("all_models_failed")
return self._serve_cached_response(prompt, safeguard_actions, start_time)
response_text = result.get("result", {}).get("text", "")
tokens_used = result.get("result", {}).get("usage", {})
# --- Step 5: Record token usage ---
self._record_token_usage(user_id, session_id, tokens_used)
duration_ms = (time.time() - start_time) * 1000
return {
"response": response_text,
"model_used": model_used,
"tokens_used": tokens_used,
"safeguard_actions": safeguard_actions,
"source": result.get("source", "primary"),
"duration_ms": round(duration_ms, 2),
}
def _check_token_budget(self, user_id: str, session_id: str) -> Dict[str, Any]:
"""Check all token budget scopes and return status."""
budget = self.config.token_budget
# Check user daily budget
user_key = f"tokens:user:{user_id}:daily"
user_tokens = int(self._redis.get(user_key) or 0)
if user_tokens >= budget.max_user_daily_tokens:
return {"blocked": True, "reason": "user_daily_limit_exceeded",
"used": user_tokens, "limit": budget.max_user_daily_tokens}
# Check session budget
session_key = f"tokens:session:{session_id}"
session_tokens = int(self._redis.get(session_key) or 0)
if session_tokens >= budget.max_session_tokens:
return {"blocked": True, "reason": "session_limit_exceeded",
"used": session_tokens, "limit": budget.max_session_tokens}
# Check system hourly budget
system_key = f"tokens:system:hourly:{int(time.time() // 3600)}"
system_tokens = int(self._redis.get(system_key) or 0)
if system_tokens >= budget.max_system_hourly_tokens:
return {"blocked": True, "reason": "system_hourly_limit_exceeded",
"used": system_tokens, "limit": budget.max_system_hourly_tokens}
# Check for warnings
warning = None
user_pct = user_tokens / budget.max_user_daily_tokens
if user_pct >= budget.warn_threshold_pct:
warning = f"user_daily_at_{int(user_pct * 100)}%"
return {"blocked": False, "warning": warning}
def _prepare_prompt(
self, prompt: str, context_docs: Optional[List[str]]
) -> tuple:
"""Prepare and optionally truncate the prompt to fit token budget."""
max_tokens = self.config.token_budget.max_input_tokens
full_prompt = prompt
if context_docs:
context_text = "\n\n".join(context_docs)
full_prompt = (
f"Context:\n{context_text}\n\n"
f"User question: {prompt}"
)
estimated_tokens = len(full_prompt.split()) * 1.3
if estimated_tokens > max_tokens:
char_limit = int(max_tokens * 3.5)
full_prompt = full_prompt[:char_limit] + "\n[Context truncated for safety]"
return full_prompt, True
return full_prompt, False
def _invoke_bedrock(self, model_id: str, prompt: str) -> Dict[str, Any]:
"""Invoke Bedrock with the given model and prompt."""
body = json.dumps({
"anthropic_version": "bedrock-2023-05-31",
"max_tokens": self.config.token_budget.max_output_tokens,
"messages": [{"role": "user", "content": prompt}],
})
response = self._bedrock.invoke_model(
modelId=model_id,
contentType="application/json",
accept="application/json",
body=body,
)
result = json.loads(response["body"].read())
return {
"text": result["content"][0]["text"],
"usage": result.get("usage", {}),
}
def _try_fallback_model(self, prompt: str) -> Optional[Dict[str, Any]]:
"""Attempt the fallback model through its own circuit breaker."""
fallback = self.config.fallback_model
try:
breaker = self._breakers[fallback]
result = breaker.call(self._invoke_bedrock, fallback, prompt)
return {"result": result, "source": "fallback"}
except (CircuitOpenError, Exception) as e:
logger.error(f"Fallback model also failed: {e}")
return None
def _serve_cached_response(
self, prompt: str, actions: List[str], start_time: float
) -> Dict[str, Any]:
"""Serve a cached response from Redis as last resort."""
cache_key = f"cache:response:{hashlib.md5(prompt.encode()).hexdigest()[:16]}"
cached = self._redis.get(cache_key)
duration_ms = (time.time() - start_time) * 1000
actions.append("served_cached_response")
if cached:
return {
"response": cached,
"model_used": "cache",
"tokens_used": {},
"safeguard_actions": actions,
"source": "cache",
"duration_ms": round(duration_ms, 2),
}
actions.append("no_cache_available")
return {
"response": "Our manga assistant is temporarily busy. Please try again shortly.",
"model_used": "static_fallback",
"tokens_used": {},
"safeguard_actions": actions,
"source": "static_fallback",
"duration_ms": round(duration_ms, 2),
}
def _serve_budget_exceeded_response(
self, user_id: str, budget_status: Dict, actions: List[str], start_time: float
) -> Dict[str, Any]:
duration_ms = (time.time() - start_time) * 1000
return {
"response": (
"You have reached your usage limit for this period. "
"Please try again later or start a new session."
),
"model_used": "none",
"tokens_used": {},
"safeguard_actions": actions,
"source": "budget_exceeded",
"budget_status": budget_status,
"duration_ms": round(duration_ms, 2),
}
def _record_token_usage(
self, user_id: str, session_id: str, usage: Dict[str, int]
) -> None:
"""Record token usage across all budget scopes in Redis."""
total = usage.get("input_tokens", 0) + usage.get("output_tokens", 0)
if total == 0:
return
pipe = self._redis.pipeline()
# User daily budget
user_key = f"tokens:user:{user_id}:daily"
pipe.incrby(user_key, total)
pipe.expire(user_key, 86400) # 24 hours
# Session budget
session_key = f"tokens:session:{session_id}"
pipe.incrby(session_key, total)
pipe.expire(session_key, 3600) # 1 hour
# System hourly budget
system_key = f"tokens:system:hourly:{int(time.time() // 3600)}"
pipe.incrby(system_key, total)
pipe.expire(system_key, 7200) # 2 hours (buffer)
pipe.execute()
def get_system_health(self) -> Dict[str, Any]:
"""Return health status of all safeguard components."""
return {
"circuit_breakers": {
name: breaker.get_metrics()
for name, breaker in self._breakers.items()
},
"timeout_manager": self._timeout_mgr.get_metrics(),
"timestamp": time.time(),
}
IAMPolicyGenerator Class
import json
import logging
from dataclasses import dataclass, field
from typing import List, Dict, Any, Optional
logger = logging.getLogger("manga_assist.iam_policy_generator")
@dataclass
class IAMPolicyRequest:
"""Request parameters for generating a scoped IAM policy."""
role_name: str
environment: str # "production", "staging", "development"
allowed_models: List[str] # Model IDs allowed
allowed_actions: List[str] # Bedrock actions allowed
allowed_regions: List[str] # AWS regions allowed
max_tokens: Optional[int] = None # MaxTokens condition
account_id: str = "123456789012"
require_tags: bool = True
tags: Dict[str, str] = field(default_factory=lambda: {
"Project": "MangaAssist",
"CostCenter": "ai-chatbot",
})
class IAMPolicyGenerator:
"""
Generates least-privilege IAM policies for MangaAssist Bedrock access.
Creates role policies, permission boundaries, and SCPs that enforce
model-level, action-level, and region-level restrictions on FM access.
Usage:
generator = IAMPolicyGenerator()
policy = generator.generate_role_policy(IAMPolicyRequest(
role_name="manga-prod-chatbot",
environment="production",
allowed_models=[
"anthropic.claude-3-sonnet-20240229-v1:0",
"anthropic.claude-3-haiku-20240307-v1:0",
],
allowed_actions=["bedrock:InvokeModel", "bedrock:InvokeModelWithResponseStream"],
allowed_regions=["us-east-1"],
max_tokens=4096,
))
"""
# Models considered expensive and blocked by default
EXPENSIVE_MODELS = [
"anthropic.claude-3-opus*",
"anthropic.claude-3.5-sonnet*",
"amazon.titan-text-premier*",
"meta.llama3-1-405b*",
]
def generate_role_policy(self, request: IAMPolicyRequest) -> Dict[str, Any]:
"""Generate a complete IAM role policy document."""
statements = []
# Allow statement for approved models
statements.append(self._allow_approved_models(request))
# Deny statement for expensive models
statements.append(self._deny_expensive_models(request))
# Allow guardrail access
statements.append(self._allow_guardrails(request))
# Allow CloudWatch logging
statements.append(self._allow_logging(request))
# Allow DynamoDB session access
if request.environment == "production":
statements.append(self._allow_dynamodb_sessions(request))
# Allow ElastiCache access
if request.environment == "production":
statements.append(self._allow_elasticache(request))
policy = {
"Version": "2012-10-17",
"Statement": statements,
}
logger.info(
f"Generated IAM policy for role '{request.role_name}' — "
f"{len(statements)} statements, "
f"{len(request.allowed_models)} models, "
f"{len(request.allowed_regions)} regions"
)
return policy
def _allow_approved_models(self, request: IAMPolicyRequest) -> Dict[str, Any]:
"""Build Allow statement for approved Bedrock models."""
resources = []
for model_id in request.allowed_models:
for region in request.allowed_regions:
resources.append(
f"arn:aws:bedrock:{region}::foundation-model/{model_id}"
)
statement = {
"Sid": f"AllowBedrockInvoke{request.environment.title()}",
"Effect": "Allow",
"Action": request.allowed_actions,
"Resource": resources,
"Condition": {
"StringEquals": {
"aws:RequestedRegion": request.allowed_regions,
},
},
}
if request.max_tokens:
statement["Condition"]["NumericLessThanEquals"] = {
"bedrock:MaxTokens": str(request.max_tokens)
}
return statement
def _deny_expensive_models(self, request: IAMPolicyRequest) -> Dict[str, Any]:
"""Build Deny statement for expensive models."""
resources = [
f"arn:aws:bedrock:*::foundation-model/{model}"
for model in self.EXPENSIVE_MODELS
]
return {
"Sid": "DenyExpensiveModels",
"Effect": "Deny",
"Action": "bedrock:InvokeModel",
"Resource": resources,
}
def _allow_guardrails(self, request: IAMPolicyRequest) -> Dict[str, Any]:
"""Allow access to Bedrock Guardrails."""
return {
"Sid": "AllowBedrockGuardrails",
"Effect": "Allow",
"Action": ["bedrock:ApplyGuardrail"],
"Resource": (
f"arn:aws:bedrock:{request.allowed_regions[0]}:"
f"{request.account_id}:guardrail/manga-*"
),
}
def _allow_logging(self, request: IAMPolicyRequest) -> Dict[str, Any]:
"""Allow CloudWatch log access for Lambda functions."""
return {
"Sid": "AllowCloudWatchLogs",
"Effect": "Allow",
"Action": [
"logs:CreateLogGroup",
"logs:CreateLogStream",
"logs:PutLogEvents",
],
"Resource": (
f"arn:aws:logs:{request.allowed_regions[0]}:"
f"{request.account_id}:log-group:/aws/lambda/manga-*"
),
}
def _allow_dynamodb_sessions(self, request: IAMPolicyRequest) -> Dict[str, Any]:
"""Allow DynamoDB access for session management."""
return {
"Sid": "AllowDynamoDBSessions",
"Effect": "Allow",
"Action": [
"dynamodb:GetItem",
"dynamodb:PutItem",
"dynamodb:UpdateItem",
"dynamodb:Query",
],
"Resource": (
f"arn:aws:dynamodb:{request.allowed_regions[0]}:"
f"{request.account_id}:table/manga-sessions*"
),
}
def _allow_elasticache(self, request: IAMPolicyRequest) -> Dict[str, Any]:
"""Allow ElastiCache access for caching responses."""
return {
"Sid": "AllowElastiCacheAccess",
"Effect": "Allow",
"Action": [
"elasticache:Connect",
],
"Resource": (
f"arn:aws:elasticache:{request.allowed_regions[0]}:"
f"{request.account_id}:replicationgroup:manga-cache-*"
),
}
def generate_permission_boundary(
self, allowed_regions: List[str]
) -> Dict[str, Any]:
"""Generate a permission boundary that caps maximum possible permissions."""
return {
"Version": "2012-10-17",
"Statement": [
{
"Sid": "BoundaryAllowBedrock",
"Effect": "Allow",
"Action": [
"bedrock:InvokeModel",
"bedrock:InvokeModelWithResponseStream",
"bedrock:ApplyGuardrail",
],
"Resource": "*",
"Condition": {
"StringEquals": {
"aws:RequestedRegion": allowed_regions,
}
},
},
{
"Sid": "BoundaryAllowSupporting",
"Effect": "Allow",
"Action": [
"logs:*",
"dynamodb:GetItem",
"dynamodb:PutItem",
"dynamodb:UpdateItem",
"dynamodb:Query",
"elasticache:Connect",
],
"Resource": "*",
},
{
"Sid": "BoundaryDenyAll",
"Effect": "Deny",
"Action": [
"iam:*",
"organizations:*",
"account:*",
],
"Resource": "*",
},
],
}
def generate_scp(self, allowed_regions: List[str]) -> Dict[str, Any]:
"""Generate an Organization-level Service Control Policy."""
return {
"Version": "2012-10-17",
"Statement": [
{
"Sid": "DenyBedrockOutsideRegions",
"Effect": "Deny",
"Action": "bedrock:*",
"Resource": "*",
"Condition": {
"StringNotEquals": {
"aws:RequestedRegion": allowed_regions,
}
},
},
{
"Sid": "DenyBedrockWithoutTags",
"Effect": "Deny",
"Action": "bedrock:InvokeModel",
"Resource": "*",
"Condition": {
"Null": {
"aws:RequestTag/CostCenter": "true",
}
},
},
],
}
def policy_to_json(self, policy: Dict[str, Any], indent: int = 2) -> str:
"""Serialize a policy to formatted JSON."""
return json.dumps(policy, indent=indent, default=str)
7. Comparison Tables
Circuit Breaker vs Retry vs Timeout — When to Use Each
| Pattern | Purpose | Protects Against | MangaAssist Use Case |
|---|---|---|---|
| Circuit Breaker | Stop calling a failing service | Cascading failures, resource exhaustion | Bedrock endpoint degradation |
| Retry with Backoff | Recover from transient errors | Throttling, network blips | ThrottlingException from Bedrock |
| Timeout | Bound execution time | Hanging requests, slow responses | Lambda + Bedrock latency control |
| Token Budget | Cap resource consumption | Cost overruns, abuse | Per-user and system-wide spend caps |
| IAM Boundary | Restrict access scope | Unauthorized model use, credential leak | Least-privilege FM access |
| Stopping Condition | Limit workflow execution | Runaway state machines, infinite loops | Step Functions timeout/heartbeat |
Safeguard Implementation Complexity vs Risk Reduction
| Safeguard | Implementation Effort | Risk Reduced | Cost to Implement | Monthly Savings |
|---|---|---|---|---|
| Circuit Breaker | Medium (2-3 days) | Cascading failure | ~$2K dev time | $10K-50K avoided downtime |
| Lambda Timeout | Low (0.5 day) | Hanging functions | ~$500 dev time | $1K-5K wasted compute |
| IAM Policies | Low (1 day) | Unauthorized access | ~$1K dev time | $5K-100K credential abuse |
| Step Functions Stops | Medium (1-2 days) | Runaway workflows | ~$1.5K dev time | $2K-20K execution costs |
| Token Budgets | Medium (2 days) | Cost overruns | ~$2K dev time | $50K-200K prompt abuse |
| Full Safeguard Stack | High (1-2 weeks) | All of the above | ~$10K dev time | $70K-375K combined |
Timeout Values Across the MangaAssist Stack
| Component | Timeout | Configurable Via | Impact if Too Low | Impact if Too High |
|---|---|---|---|---|
| API Gateway WebSocket | 10 min idle | Stage settings | User reconnects frequently | Zombie connections accumulate |
| API Gateway Integration | 29 sec | Integration settings | Complex queries fail | Users wait too long |
| Lambda hard timeout | 15 sec | Function config | Sonnet responses cut off | Costs increase, latency rises |
| Lambda soft timeout | 12 sec | Application code | Less time for fallback | Grace period too short |
| Bedrock SDK read | 60 sec | boto3 config | Long outputs fail | Holds connection too long |
| Bedrock SDK connect | 10 sec | boto3 config | Intermittent failures | Delayed error detection |
| Circuit breaker recovery | 30 sec | Application code | Probes too early | Extended outage for users |
| Redis cache TTL | 300 sec | Application code | Cache misses increase | Stale responses served |
| Step Functions state | 5-20 sec | ASL definition | Legitimate work killed | Dead states linger |
| Step Functions execution | 120 sec | ASL definition | Complex RAG fails | Runaway cost risk |
8. Cost and Risk Analysis
Cost of NOT Having Safeguards (Monthly at 1M msgs/day)
| Failure Scenario | Without Safeguard | With Safeguard | Monthly Savings |
|---|---|---|---|
| Bedrock outage cascades to all services | 4h downtime = $120K revenue loss | 30s failover to cache | ~$118K |
| Prompt injection generates max tokens | $1.9M token costs | $234K (capped) | ~$1.7M |
| Runaway Step Functions execution | $15K/day execution costs | Auto-terminates at 120s | ~$14.5K |
| Compromised credentials invoke Opus | $450K/day model costs | IAM denies access | ~$450K |
| Lambda functions hang indefinitely | $8K/day compute waste | 15s timeout kills | ~$7.5K |
Safeguard Monitoring Dashboard Metrics
| Metric | CloudWatch Alarm Threshold | Action |
|---|---|---|
| Circuit breaker OPEN events | > 3 in 5 minutes | Page on-call engineer |
| Soft timeout rate | > 10% of requests | Scale Bedrock provisioned throughput |
| Token budget warnings | > 100 users/hour at 80% | Investigate potential abuse |
| IAM deny events | > 0 in production | Security incident review |
| Step Functions timeout rate | > 5% of executions | Review state machine configuration |
| Fallback response rate | > 20% of responses | Investigate Bedrock health |
Key Takeaways
-
Circuit breakers are non-negotiable for FM calls — Bedrock is an external dependency with variable availability. The closed/open/half-open pattern isolates failures and maintains MangaAssist's 3-second latency target by fast-failing to cached responses during outages.
-
Soft timeouts beat hard timeouts for user experience — Setting application-level timeouts at 80% of the Lambda hard limit provides a grace period to write state, log context, and return a fallback response instead of letting Lambda kill the execution mid-stream.
-
IAM policies enforce cost boundaries at infrastructure level — Least-privilege policies that restrict specific model ARNs, regions, and max tokens prevent both accidental expensive model usage and credential compromise scenarios that could cost hundreds of thousands monthly.
-
Step Functions need both TimeoutSeconds and HeartbeatSeconds — TimeoutSeconds caps total state duration while HeartbeatSeconds detects tasks that are alive but stuck. Together they prevent runaway execution costs at $0.025 per 1,000 state transitions.
-
Token budgets operate at multiple scopes — Per-request, per-session, per-user-daily, and system-hourly budgets create layered defense against cost overruns. Redis atomic counters enforce these in real-time with sub-millisecond overhead.
-
The full safeguard stack costs ~$10K to implement but prevents $70K-375K monthly in potential losses — Circuit breakers, timeouts, IAM policies, stopping conditions, and token budgets together provide defense-in-depth that protects both system reliability and financial health at MangaAssist's 1M messages/day scale.