LOCAL PREVIEW View on GitHub

Circuit Breaker and Timeout Patterns for Safeguarded AI Workflows

MangaAssist context: JP Manga store chatbot on AWS — Bedrock Claude 3 (Sonnet at $3/$15 per 1M tokens input/output, Haiku at $0.25/$1.25), OpenSearch Serverless (vector store), DynamoDB (sessions/products), ECS Fargate (orchestrator), API Gateway WebSocket, ElastiCache Redis. Target: useful answer in under 3 seconds, 1M messages/day scale.


Skill Mapping

Dimension Detail
Certification AWS AIP-C01 — AI Practitioner
Domain 2 — Development and Implementation of GenAI Solutions
Task 2.1 — Develop agentic AI solutions using AWS services
Skill 2.1.3 — Define safeguards within AI workflows (for example, stopping conditions, timeout mechanisms, IAM boundaries, circuit breakers, Guardrails)
This File Deep-dive into circuit breaker patterns (closed/open/half-open), Lambda timeout design, Step Functions error handling, and graceful degradation strategies

Skill Scope Statement

This file extends the safeguard architecture (covered in File 01) with detailed implementation patterns for circuit breakers protecting FM calls, Lambda timeout design with graceful degradation, and Step Functions error handling strategies. It covers: the three circuit breaker states with Redis-backed distributed state, progressive half-open recovery, per-service failure thresholds, Lambda cold-start-aware timeout configuration, nested timeout hierarchies (SDK < Lambda < Step Functions < API Gateway), and Step Functions retry/catch patterns with fallback state machines.


Mind Map — Circuit Breaker and Timeout Patterns

mindmap
  root((Circuit Breaker<br/>& Timeout Patterns))
    Circuit Breaker States
      Closed State
        Normal operation
        Failure counter tracking
        Rolling window metrics
      Open State
        Requests blocked
        Fallback response served
        Timer for recovery
      Half-Open State
        Limited probe requests
        Progressive recovery
        Success threshold for close
    Lambda Timeout Design
      Cold Start Awareness
        Init phase budget
        Provisioned Concurrency
        Keep-alive pings
      Timeout Hierarchy
        SDK timeout < Lambda timeout
        Lambda timeout < Step Functions state timeout
        Step Functions < API Gateway timeout
      Graceful Shutdown
        Signal handling
        Partial result saving
        Cleanup on timeout
    Step Functions Error Handling
      Retry Strategies
        ErrorEquals matching
        IntervalSeconds tuning
        BackoffRate optimization
        MaxAttempts per error type
      Catch Patterns
        Error-specific catch blocks
        ResultPath for error context
        Fallback state chains
      Timeout Configuration
        TimeoutSeconds per state
        HeartbeatSeconds for long tasks
        Workflow-level timeout
    Fallback Strategies
      Cached Responses
        Redis popular results cache
        FAQ response matching
        Static response library
      Degraded Service Modes
        Read-only mode
        Reduced feature set
        Queue for later processing
      User Communication
        Transparent error messaging
        Retry guidance
        Alternative channel routing

Architecture — Timeout Hierarchy in MangaAssist

flowchart LR
    subgraph Client["Client Layer"]
        CL["Browser<br/>3s UX timeout"]
    end

    subgraph APIGW["API Gateway"]
        GW["WebSocket<br/>29s integration timeout"]
    end

    subgraph SF["Step Functions"]
        WF["Workflow<br/>60s timeout"]
        ST["State<br/>30s timeout"]
    end

    subgraph Lambda["Lambda"]
        FN["Function<br/>30s timeout"]
        SDK["Bedrock SDK<br/>25s read timeout<br/>5s connect timeout"]
    end

    subgraph Bedrock["Bedrock"]
        FM["Claude 3<br/>Model inference"]
    end

    CL -->|"3s"| GW
    GW -->|"29s"| WF
    WF -->|"60s"| ST
    ST -->|"30s"| FN
    FN -->|"25s"| SDK
    SDK -->|"variable"| FM

    style CL fill:#ffcdd2
    style GW fill:#fff9c4
    style WF fill:#c8e6c9
    style ST fill:#c8e6c9
    style FN fill:#bbdefb
    style SDK fill:#bbdefb
    style FM fill:#e1bee7

1. Circuit Breaker Deep Dive

1.1 State Machine for FM Calls

stateDiagram-v2
    [*] --> Closed
    Closed --> Open: failure_count >= threshold\nOR failure_rate >= 50%
    Open --> HalfOpen: timeout_seconds elapsed\n(default: 60s)
    HalfOpen --> Closed: success_count >= 3\n(progressive recovery)
    HalfOpen --> Open: any failure in probe
    Closed --> Closed: success (reset failure counter)
    Open --> Open: requests blocked\n(serve fallback)

    note right of Closed
        Normal operation.
        All requests pass through.
        Track failures in rolling window.
    end note

    note right of Open
        All requests return fallback.
        No calls to downstream service.
        Timer counts down to half-open.
    end note

    note right of HalfOpen
        Allow limited probe requests.
        Progressive: 1 → 3 → 10 requests.
        Any failure reverts to Open.
    end note

1.2 Production Circuit Breaker with Progressive Recovery

"""
MangaAssist Circuit Breaker with progressive half-open recovery.
Distributed state via ElastiCache Redis for ECS Fargate fleet coordination.
"""

import time
import json
import math
import logging
from enum import Enum
from dataclasses import dataclass, field
from typing import Any, Callable, Optional
from functools import wraps

import redis
import boto3

logger = logging.getLogger("manga_circuit_breaker")


class CircuitState(str, Enum):
    CLOSED = "CLOSED"
    OPEN = "OPEN"
    HALF_OPEN = "HALF_OPEN"


@dataclass
class CircuitConfig:
    """Per-service circuit breaker configuration."""
    name: str
    failure_threshold: int = 5
    failure_rate_threshold: float = 0.50
    window_seconds: int = 60
    open_timeout_seconds: int = 60
    half_open_max_probes: int = 3
    success_threshold_for_close: int = 3
    progressive_recovery_stages: list[int] = field(
        default_factory=lambda: [1, 3, 10]
    )


# Pre-configured circuits for each MangaAssist downstream service
CIRCUIT_CONFIGS = {
    "bedrock_sonnet": CircuitConfig(
        name="bedrock_sonnet",
        failure_threshold=5,
        failure_rate_threshold=0.50,
        window_seconds=60,
        open_timeout_seconds=60,
    ),
    "bedrock_haiku": CircuitConfig(
        name="bedrock_haiku",
        failure_threshold=8,
        failure_rate_threshold=0.50,
        window_seconds=60,
        open_timeout_seconds=30,
    ),
    "opensearch": CircuitConfig(
        name="opensearch",
        failure_threshold=5,
        failure_rate_threshold=0.30,
        window_seconds=30,
        open_timeout_seconds=45,
    ),
    "dynamodb": CircuitConfig(
        name="dynamodb",
        failure_threshold=10,
        failure_rate_threshold=0.20,
        window_seconds=30,
        open_timeout_seconds=30,
    ),
    "redis": CircuitConfig(
        name="redis",
        failure_threshold=15,
        failure_rate_threshold=0.40,
        window_seconds=30,
        open_timeout_seconds=15,
    ),
}


class ProgressiveCircuitBreaker:
    """
    Circuit breaker with progressive half-open recovery.

    Recovery stages:
      Stage 1: Allow 1 probe request
      Stage 2: Allow 3 concurrent probes (if stage 1 passed)
      Stage 3: Allow 10 concurrent probes (if stage 2 passed)
      If all stages pass → transition to CLOSED

    This prevents thundering herd when a service recovers — instead of
    all ECS tasks sending requests simultaneously, they gradually ramp up.
    """

    REDIS_PREFIX = "manga:cb:"

    def __init__(self, redis_client: redis.Redis, config: CircuitConfig):
        self._redis = redis_client
        self._config = config
        self._key = f"{self.REDIS_PREFIX}{config.name}"
        self._cloudwatch = boto3.client("cloudwatch", region_name="us-east-1")

    def _get_state(self) -> dict:
        """Load circuit state from Redis."""
        raw = self._redis.hgetall(self._key)
        if not raw:
            return {
                "state": CircuitState.CLOSED.value,
                "failure_count": 0,
                "success_count": 0,
                "total_requests": 0,
                "last_failure": 0.0,
                "last_state_change": time.time(),
                "recovery_stage": 0,
                "probes_in_flight": 0,
            }

        return {
            k.decode() if isinstance(k, bytes) else k:
            v.decode() if isinstance(v, bytes) else v
            for k, v in raw.items()
        }

    def _save_state(self, state: dict) -> None:
        """Persist state to Redis."""
        self._redis.hset(self._key, mapping={
            str(k): str(v) for k, v in state.items()
        })
        self._redis.expire(self._key, 86400)

    def can_execute(self) -> tuple[bool, str]:
        """
        Check if a request can proceed.
        Returns (allowed, reason).
        """
        state = self._get_state()
        current_state = state.get("state", CircuitState.CLOSED.value)
        now = time.time()

        if current_state == CircuitState.CLOSED.value:
            return True, "circuit_closed"

        if current_state == CircuitState.OPEN.value:
            last_change = float(state.get("last_state_change", 0))
            elapsed = now - last_change
            if elapsed >= self._config.open_timeout_seconds:
                # Transition to half-open
                state["state"] = CircuitState.HALF_OPEN.value
                state["last_state_change"] = now
                state["recovery_stage"] = 0
                state["success_count"] = 0
                state["probes_in_flight"] = 0
                self._save_state(state)
                self._emit_state_metric(CircuitState.HALF_OPEN)
                logger.info(
                    "Circuit [%s] OPEN → HALF_OPEN after %ds",
                    self._config.name, int(elapsed),
                )
                return True, "half_open_probe"
            return False, "circuit_open"

        if current_state == CircuitState.HALF_OPEN.value:
            stage = int(state.get("recovery_stage", 0))
            probes = int(state.get("probes_in_flight", 0))
            stages = self._config.progressive_recovery_stages
            max_probes = stages[stage] if stage < len(stages) else stages[-1]

            if probes < max_probes:
                state["probes_in_flight"] = probes + 1
                self._save_state(state)
                return True, f"half_open_stage_{stage}"
            return False, f"half_open_stage_{stage}_full"

        return False, "unknown_state"

    def record_success(self) -> None:
        """Record a successful request."""
        state = self._get_state()
        current = state.get("state", CircuitState.CLOSED.value)
        state["total_requests"] = int(state.get("total_requests", 0)) + 1

        if current == CircuitState.HALF_OPEN.value:
            state["success_count"] = int(state.get("success_count", 0)) + 1
            state["probes_in_flight"] = max(
                0, int(state.get("probes_in_flight", 0)) - 1
            )

            success_count = int(state["success_count"])
            stage = int(state.get("recovery_stage", 0))
            stages = self._config.progressive_recovery_stages
            required = stages[stage] if stage < len(stages) else stages[-1]

            if success_count >= required:
                if stage + 1 >= len(stages):
                    # All stages passed → close circuit
                    state["state"] = CircuitState.CLOSED.value
                    state["failure_count"] = 0
                    state["success_count"] = 0
                    state["recovery_stage"] = 0
                    state["probes_in_flight"] = 0
                    state["last_state_change"] = time.time()
                    self._emit_state_metric(CircuitState.CLOSED)
                    logger.info(
                        "Circuit [%s] HALF_OPEN → CLOSED (all recovery stages passed)",
                        self._config.name,
                    )
                else:
                    # Advance to next recovery stage
                    state["recovery_stage"] = stage + 1
                    state["success_count"] = 0
                    state["probes_in_flight"] = 0
                    logger.info(
                        "Circuit [%s] recovery stage %d%d",
                        self._config.name, stage, stage + 1,
                    )

        elif current == CircuitState.CLOSED.value:
            # In closed state, periodically reset failure counter
            window_start = time.time() - self._config.window_seconds
            if float(state.get("last_failure", 0)) < window_start:
                state["failure_count"] = 0

        self._save_state(state)

    def record_failure(self) -> None:
        """Record a failed request."""
        state = self._get_state()
        current = state.get("state", CircuitState.CLOSED.value)
        state["failure_count"] = int(state.get("failure_count", 0)) + 1
        state["total_requests"] = int(state.get("total_requests", 0)) + 1
        state["last_failure"] = time.time()

        if current == CircuitState.HALF_OPEN.value:
            # Any failure in half-open → back to open
            state["state"] = CircuitState.OPEN.value
            state["last_state_change"] = time.time()
            state["probes_in_flight"] = 0
            self._emit_state_metric(CircuitState.OPEN)
            logger.warning(
                "Circuit [%s] HALF_OPEN → OPEN (probe failed)",
                self._config.name,
            )

        elif current == CircuitState.CLOSED.value:
            total = int(state.get("total_requests", 1))
            failures = int(state["failure_count"])
            failure_rate = failures / total if total > 0 else 0

            if (failures >= self._config.failure_threshold or
                    failure_rate >= self._config.failure_rate_threshold):
                state["state"] = CircuitState.OPEN.value
                state["last_state_change"] = time.time()
                self._emit_state_metric(CircuitState.OPEN)
                logger.warning(
                    "Circuit [%s] CLOSED → OPEN (failures=%d, rate=%.2f)",
                    self._config.name, failures, failure_rate,
                )

        self._save_state(state)

    def _emit_state_metric(self, new_state: CircuitState) -> None:
        """Emit CloudWatch metric for circuit state transition."""
        state_value = {"CLOSED": 0, "HALF_OPEN": 1, "OPEN": 2}
        try:
            self._cloudwatch.put_metric_data(
                Namespace="MangaAssist/CircuitBreaker",
                MetricData=[{
                    "MetricName": "CircuitState",
                    "Value": state_value[new_state.value],
                    "Unit": "None",
                    "Dimensions": [
                        {"Name": "Service", "Value": self._config.name},
                    ],
                }],
            )
        except Exception as e:
            logger.error("Failed to emit metric: %s", e)

2. Lambda Timeout Patterns

2.1 Nested Timeout Configuration

"""
MangaAssist Lambda timeout configuration — ensures SDK timeout < Lambda timeout
< Step Functions timeout to prevent orphaned operations and unhandled timeouts.
"""

import os
import time
import signal
import logging
from typing import Any
from contextlib import contextmanager

import boto3
from botocore.config import Config

logger = logging.getLogger("manga_timeout")


# ---------------------------------------------------------------------------
# Timeout Hierarchy Configuration
# ---------------------------------------------------------------------------
# Rule: Each layer's timeout must be LESS than its parent's timeout.
#
# API Gateway integration:  29s (hard limit, not configurable)
# Step Functions state:     30s (configurable per state)
# Lambda function:          30s (configurable, must be < SF state timeout)
# Bedrock SDK read:         25s (must be < Lambda timeout)
# Bedrock SDK connect:       5s (fast fail on connection issues)
# Redis socket:              2s (fast fail on cache issues)
# DynamoDB SDK:              5s (fast fail on database issues)
# ---------------------------------------------------------------------------

def create_bedrock_client() -> boto3.client:
    """Create Bedrock client with timeouts that fit within Lambda's 30s budget."""
    return boto3.client(
        "bedrock-runtime",
        region_name=os.environ.get("AWS_REGION", "us-east-1"),
        config=Config(
            read_timeout=25,          # Must be < Lambda 30s timeout
            connect_timeout=5,         # Fast fail on connection issues
            retries={
                "max_attempts": 1,     # No SDK retries — Step Functions handles retries
                "mode": "standard",
            },
        ),
    )


def create_dynamodb_resource() -> boto3.resource:
    """Create DynamoDB resource with tight timeouts."""
    return boto3.resource(
        "dynamodb",
        region_name=os.environ.get("AWS_REGION", "us-east-1"),
        config=Config(
            read_timeout=5,
            connect_timeout=3,
            retries={"max_attempts": 1},
        ),
    )


# ---------------------------------------------------------------------------
# Graceful Timeout Handler
# ---------------------------------------------------------------------------

class GracefulTimeoutHandler:
    """
    Handles Lambda timeout gracefully by saving partial results
    and returning a structured timeout response.

    Lambda sends SIGTERM 2 seconds before the configured timeout.
    We use this signal to save state and return a partial result.
    """

    def __init__(self, context):
        self._context = context
        self._partial_result = None
        self._cleanup_fns = []

    def register_partial_result(self, result: Any) -> None:
        """Register the current partial result for timeout recovery."""
        self._partial_result = result

    def register_cleanup(self, fn) -> None:
        """Register a cleanup function to run on timeout."""
        self._cleanup_fns.append(fn)

    def get_remaining_ms(self) -> int:
        """Get remaining execution time in milliseconds."""
        return self._context.get_remaining_time_in_millis()

    def has_sufficient_time(self, required_ms: int = 2000) -> bool:
        """Check if there is enough time for another operation."""
        return self.get_remaining_ms() > required_ms

    def create_timeout_response(self, operation: str) -> dict:
        """Create a structured timeout response with partial results."""
        return {
            "statusCode": 408,
            "timeout": True,
            "operation": operation,
            "partial_result": self._partial_result,
            "remaining_ms": self.get_remaining_ms(),
            "message": (
                "Operation timed out. Partial results may be available."
            ),
        }


@contextmanager
def time_budget(budget_ms: float, operation: str):
    """
    Context manager that tracks time budget for an operation.
    Raises TimeoutError if the budget is exceeded.
    """
    start = time.monotonic()
    yield
    elapsed_ms = (time.monotonic() - start) * 1000
    if elapsed_ms > budget_ms:
        logger.warning(
            "Operation '%s' exceeded budget: %.0fms / %.0fms",
            operation, elapsed_ms, budget_ms,
        )

2.2 Lambda Handler with Timeout Awareness

"""
MangaAssist Bedrock invocation Lambda — demonstrates timeout-aware
handler that saves partial results on approaching timeout.
"""

import json
import time
import logging

logger = logging.getLogger("manga_bedrock_invoke")

bedrock = create_bedrock_client()
dynamodb = create_dynamodb_resource()


def lambda_handler(event: dict, context) -> dict:
    """
    Invoke Bedrock with timeout awareness.

    Phases:
    1. Load session context (budget: 2s)
    2. Build prompt (budget: 0.5s)
    3. Invoke Bedrock (budget: remaining - 3s reserve)
    4. Save result and update session (budget: 2s)
    """
    timeout_handler = GracefulTimeoutHandler(context)
    start = time.monotonic()

    try:
        # Phase 1: Load context
        if not timeout_handler.has_sufficient_time(25000):
            return timeout_handler.create_timeout_response("init")

        session_context = _load_session_context(
            event.get("sessionId", ""),
            event.get("conversationHistory", []),
        )

        # Phase 2: Build prompt
        prompt = _build_prompt(
            user_message=event.get("userMessage", ""),
            rag_context=event.get("ragContext", []),
            conversation_history=session_context,
        )

        # Phase 3: Invoke Bedrock
        remaining_ms = timeout_handler.get_remaining_ms()
        bedrock_budget_ms = remaining_ms - 3000  # Reserve 3s for post-processing

        if bedrock_budget_ms < 5000:
            logger.warning("Insufficient budget for Bedrock: %dms", bedrock_budget_ms)
            return _fallback_response(event, "INSUFFICIENT_BUDGET")

        response = bedrock.invoke_model(
            modelId=event.get("modelId", "anthropic.claude-3-sonnet-20240229-v1:0"),
            contentType="application/json",
            accept="application/json",
            body=json.dumps({
                "anthropic_version": "bedrock-2023-05-31",
                "max_tokens": event.get("maxTokens", 1024),
                "temperature": 0.3,
                "messages": [{"role": "user", "content": prompt}],
            }),
        )

        result = json.loads(response["body"].read())
        answer = result["content"][0]["text"]
        usage = result.get("usage", {})

        # Register partial result in case Phase 4 times out
        timeout_handler.register_partial_result({
            "answer": answer,
            "usage": usage,
        })

        # Phase 4: Save result
        if timeout_handler.has_sufficient_time(2000):
            _save_session_update(event.get("sessionId", ""), answer, usage)

        elapsed_ms = (time.monotonic() - start) * 1000
        return {
            "statusCode": 200,
            "answer": answer,
            "usage": usage,
            "latency_ms": round(elapsed_ms),
            "actionRequired": _check_for_tool_use(result),
        }

    except bedrock.exceptions.ThrottlingException as e:
        logger.warning("Bedrock throttled: %s", str(e))
        raise  # Let Step Functions retry

    except bedrock.exceptions.ModelTimeoutException as e:
        logger.error("Bedrock model timeout: %s", str(e))
        return _fallback_response(event, "MODEL_TIMEOUT")

    except Exception as e:
        logger.error("Unexpected error: %s", str(e), exc_info=True)
        raise  # Let Step Functions catch


def _load_session_context(session_id: str, history: list) -> list:
    """Load session context with a tight timeout."""
    # Implementation uses DynamoDB with 5s timeout config
    return history


def _build_prompt(user_message: str, rag_context: list, conversation_history: list) -> str:
    """Build the prompt with RAG context and conversation history."""
    parts = []
    if rag_context:
        parts.append("Reference context:\n" + "\n".join(
            f"- {ctx.get('text', '')[:200]}" for ctx in rag_context[:5]
        ))
    parts.append(f"User message: {user_message}")
    return "\n\n".join(parts)


def _fallback_response(event: dict, reason: str) -> dict:
    """Generate a fallback response when Bedrock is unavailable."""
    return {
        "statusCode": 200,
        "answer": (
            "I'm experiencing a brief delay. Let me try a simpler approach. "
            "Could you rephrase your question or try again in a moment?"
        ),
        "fallback": True,
        "fallbackReason": reason,
    }


def _save_session_update(session_id: str, answer: str, usage: dict) -> None:
    """Save the response to session state."""
    pass  # Implementation writes to DynamoDB


def _check_for_tool_use(result: dict) -> bool:
    """Check if the model response indicates a tool call is needed."""
    return result.get("stop_reason") == "tool_use"

3. Step Functions Error Handling Patterns

3.1 Retry and Catch Strategy Matrix

"""
MangaAssist Step Functions retry and catch strategy definitions.
Each error type has a specific retry configuration and fallback path.
"""


STEP_FUNCTIONS_ERROR_STRATEGIES = {
    # Bedrock throttling: aggressive retry with backoff
    "ThrottlingException": {
        "retry": {
            "IntervalSeconds": 2,
            "MaxAttempts": 3,
            "BackoffRate": 3.0,
            # Total wait: 2 + 6 + 18 = 26s worst case
        },
        "catch_next_state": "FallbackResponse",
        "alert": False,
    },

    # Bedrock model timeout: minimal retry (model is overloaded)
    "ModelTimeoutException": {
        "retry": {
            "IntervalSeconds": 1,
            "MaxAttempts": 1,
            "BackoffRate": 1.0,
        },
        "catch_next_state": "TimeoutFallback",
        "alert": True,
    },

    # Lambda service errors: standard retry
    "Lambda.ServiceException": {
        "retry": {
            "IntervalSeconds": 1,
            "MaxAttempts": 2,
            "BackoffRate": 2.0,
        },
        "catch_next_state": "FallbackResponse",
        "alert": True,
    },

    # Lambda too many requests: immediate retry (burst capacity)
    "Lambda.TooManyRequestsException": {
        "retry": {
            "IntervalSeconds": 1,
            "MaxAttempts": 3,
            "BackoffRate": 1.5,
        },
        "catch_next_state": "FallbackResponse",
        "alert": False,
    },

    # DynamoDB throttling: fast retry with moderate backoff
    "DynamoDB.ProvisionedThroughputExceededException": {
        "retry": {
            "IntervalSeconds": 1,
            "MaxAttempts": 3,
            "BackoffRate": 2.0,
        },
        "catch_next_state": "FallbackResponse",
        "alert": True,
    },

    # Guardrail intervention: no retry (content is blocked)
    "GuardrailInterventionException": {
        "retry": None,
        "catch_next_state": "GuardrailBlockedResponse",
        "alert": False,
    },

    # Step Functions timeout: no retry (budget exhausted)
    "States.Timeout": {
        "retry": None,
        "catch_next_state": "TimeoutFallback",
        "alert": True,
    },

    # Catch-all: no retry for unknown errors
    "States.ALL": {
        "retry": None,
        "catch_next_state": "FallbackResponse",
        "alert": True,
    },
}

3.2 Fallback State Machine

"""
MangaAssist fallback state machine — a lightweight Step Functions
workflow that handles all fallback paths.
"""

FALLBACK_STATE_MACHINE = {
    "Comment": "MangaAssist Fallback Handler",
    "StartAt": "ClassifyFallbackType",
    "States": {
        "ClassifyFallbackType": {
            "Type": "Choice",
            "Choices": [
                {
                    "Variable": "$.fallbackType",
                    "StringEquals": "CIRCUIT_OPEN",
                    "Next": "ServeCachedResponse",
                },
                {
                    "Variable": "$.fallbackType",
                    "StringEquals": "MODEL_TIMEOUT",
                    "Next": "ServeSimplifiedResponse",
                },
                {
                    "Variable": "$.fallbackType",
                    "StringEquals": "GUARDRAIL_BLOCK",
                    "Next": "ServeRedirectResponse",
                },
                {
                    "Variable": "$.fallbackType",
                    "StringEquals": "THROTTLED",
                    "Next": "ServeQueuedResponse",
                },
            ],
            "Default": "ServeGenericFallback",
        },

        "ServeCachedResponse": {
            "Type": "Task",
            "Resource": "arn:aws:lambda:us-east-1:123456789012:function:manga-cached-response",
            "TimeoutSeconds": 3,
            "End": True,
            "Catch": [{"ErrorEquals": ["States.ALL"], "Next": "ServeGenericFallback"}],
        },

        "ServeSimplifiedResponse": {
            "Type": "Task",
            "Resource": "arn:aws:lambda:us-east-1:123456789012:function:manga-simplified",
            "TimeoutSeconds": 5,
            "End": True,
            "Catch": [{"ErrorEquals": ["States.ALL"], "Next": "ServeGenericFallback"}],
        },

        "ServeRedirectResponse": {
            "Type": "Pass",
            "Result": {
                "message": "I cannot help with that request. Try searching manga, checking orders, or asking for recommendations.",
                "messageJa": "そのリクエストにはお応えできません。漫画の検索、注文確認、またはおすすめをお試しください。",
            },
            "End": True,
        },

        "ServeQueuedResponse": {
            "Type": "Pass",
            "Result": {
                "message": "We're experiencing high demand. Your question has been queued and we'll respond shortly.",
                "messageJa": "現在アクセスが集中しています。ご質問は受け付けましたので、まもなく回答いたします。",
                "queued": True,
            },
            "End": True,
        },

        "ServeGenericFallback": {
            "Type": "Pass",
            "Result": {
                "message": "Something went wrong. Please try again or browse our popular manga!",
                "messageJa": "エラーが発生しました。もう一度お試しいただくか、人気の漫画をご覧ください!",
                "showPopularManga": True,
            },
            "End": True,
        },
    },
}

4. Circuit Breaker Manager for Multiple Services

4.1 Unified Circuit Management

"""
MangaAssist Circuit Breaker Manager — manages circuit breakers for all
downstream services from a single interface. Used by the ECS Fargate
orchestrator to check service health before routing requests.
"""

import logging
from typing import Optional
import redis

logger = logging.getLogger("manga_cb_manager")


class CircuitBreakerManager:
    """
    Manages all circuit breakers for the MangaAssist system.
    Provides a unified health check and routing decision interface.
    """

    def __init__(self, redis_client: redis.Redis):
        self._redis = redis_client
        self._breakers: dict[str, ProgressiveCircuitBreaker] = {}

        # Initialize breakers for all services
        for name, config in CIRCUIT_CONFIGS.items():
            self._breakers[name] = ProgressiveCircuitBreaker(
                redis_client=redis_client,
                config=config,
            )

    def get_system_health(self) -> dict:
        """
        Get health status of all downstream services.
        Used by the orchestrator to make routing decisions.
        """
        health = {}
        for name, breaker in self._breakers.items():
            allowed, reason = breaker.can_execute()
            state = breaker._get_state()
            health[name] = {
                "available": allowed,
                "state": state.get("state", "UNKNOWN"),
                "reason": reason,
                "failure_count": int(state.get("failure_count", 0)),
                "recovery_stage": int(state.get("recovery_stage", 0)),
            }
        return health

    def can_use_service(self, service_name: str) -> bool:
        """Quick check: is a specific service available?"""
        breaker = self._breakers.get(service_name)
        if not breaker:
            return True  # Unknown service: fail open
        allowed, _ = breaker.can_execute()
        return allowed

    def get_model_fallback(self) -> Optional[str]:
        """
        Determine which Bedrock model to use based on circuit states.
        If Sonnet is open, fall back to Haiku. If both open, return None.
        """
        if self.can_use_service("bedrock_sonnet"):
            return "anthropic.claude-3-sonnet-20240229-v1:0"
        if self.can_use_service("bedrock_haiku"):
            logger.info("Sonnet circuit open — falling back to Haiku")
            return "anthropic.claude-3-haiku-20240307-v1:0"
        logger.warning("All Bedrock circuits open — no model available")
        return None

    def record_outcome(self, service_name: str, success: bool) -> None:
        """Record a success or failure for a service."""
        breaker = self._breakers.get(service_name)
        if not breaker:
            return
        if success:
            breaker.record_success()
        else:
            breaker.record_failure()

5. Comparison — Timeout and Error Handling Strategies

Dimension Circuit Breaker Step Functions Retry Lambda Timeout SDK Timeout
Scope Cross-request, cross-task (distributed) Per-execution, per-state Per-invocation Per-API-call
State Persistent in Redis (shared across fleet) Ephemeral (within execution) None (single invocation) None (single call)
Decision Block all requests when threshold exceeded Retry same operation N times Kill function after N seconds Kill HTTP call after N seconds
Cost of Failure Serves fallback instantly (no token cost) Burns tokens on each retry attempt Wastes partial Lambda duration cost Wastes partial API call duration
Recovery Progressive (1 → 3 → 10 probes) Automatic on next retry N/A (new invocation) N/A (new call)
Best For Sustained service degradation (minutes) Transient errors (seconds) Runaway operations Network issues
MangaAssist Config Bedrock: 5 failures / 60s window → open ThrottlingException: 3 retries, 3x backoff 30s for Bedrock Lambda 25s read, 5s connect
Alert Trigger State transition to OPEN MaxAttempts exhausted Timeout event Connection timeout

6. Key Takeaways

  1. The timeout hierarchy must be strictly ordered — SDK timeout (25s) < Lambda timeout (30s) < Step Functions state timeout (30s) < Workflow timeout (60s) < API Gateway timeout (29s). If any inner timeout exceeds an outer one, you get orphaned operations and unpredictable behavior.

  2. Progressive circuit breaker recovery prevents thundering herd — when Bedrock recovers after an outage, a standard circuit breaker sends all traffic immediately. Progressive recovery (1 → 3 → 10 probes) ramps up gradually, preventing the recovered service from being overwhelmed.

  3. Circuit breakers need distributed state — with multiple ECS Fargate tasks, each task must see the same circuit state. Redis is the natural choice since MangaAssist already uses ElastiCache. The state is a simple hash map with 86400s TTL.

  4. Lambda graceful shutdown uses the remaining time APIcontext.get_remaining_time_in_millis() lets the handler save partial results before timeout. Always reserve 2-3 seconds for cleanup.

  5. Step Functions retry configs vary by error type — throttling errors deserve aggressive retries (3 attempts, 3x backoff), model timeouts deserve minimal retries (1 attempt), and guardrail blocks deserve no retries (content is blocked, retrying will just burn tokens).

  6. Model fallback is a first-class circuit breaker feature — when the Sonnet circuit opens, automatically fall back to Haiku. When both open, serve cached responses. This is transparent to the user and the agent.

  7. Disable SDK-level retries when Step Functions handles retries — having both SDK retries (boto3) and Step Functions retries creates multiplicative retry behavior (3 SDK retries x 3 SF retries = 9 total attempts), which can exhaust the timeout budget.


Next file: 03-scenarios-and-runbooks.md — Five production scenarios for safeguard and circuit breaker failures.