LOCAL PREVIEW View on GitHub

GenAI API Design Patterns

MangaAssist context: JP Manga store chatbot on AWS — Bedrock Claude 3 (Sonnet at $3/$15 per 1M tokens input/output, Haiku at $0.25/$1.25), OpenSearch Serverless (vector store), DynamoDB (sessions/products), ECS Fargate (orchestrator), API Gateway WebSocket, ElastiCache Redis. Target: useful answer in under 3 seconds, 1M messages/day scale.

Skill Mapping

Field Value
Domain 2 — Implementation & Integration
Task 2.5 — Application Integration Patterns
Skill 2.5.1 — FM API Interfaces
Focus Conversation APIs, token budget enforcement, idempotent retries
MangaAssist Relevance Production API patterns for multi-turn manga recommendation conversations with cost controls

Mind Map

mindmap
  root((GenAI API Design Patterns))
    Conversation API Design
      Multi-Turn Protocol
        Turn-based message format
        Role alternation enforcement
        System prompt injection
      Context Management
        Sliding window
        Summarization fallback
        Priority-based retention
      Response Formatting
        Structured JSON responses
        Markdown rendering
        Bilingual JP/EN support
    Token Budget Enforcement
      Pre-Request Validation
        Input token estimation
        Context window math
        Rejection vs truncation
      In-Flight Monitoring
        Stream token counter
        Budget breach detection
        Graceful stop
      Post-Request Accounting
        Actual vs estimated reconciliation
        Per-user daily caps
        Alerting thresholds
    Idempotent Retry Strategies
      Request Fingerprinting
        Content hash generation
        Timestamp windowing
        Session-scoped dedup
      Retry Policies
        Exponential backoff with jitter
        Max retry ceiling
        Error classification
      Circuit Breaker
        Failure rate tracking
        Open/half-open/closed states
        Fallback responses
    Model Selection Router
      Query Classification
        Complexity scoring
        Intent detection
        Language detection
      Cost-Aware Routing
        Haiku for simple queries
        Sonnet for complex reasoning
        Fallback chains
      A/B Testing
        Traffic splitting
        Quality metric comparison
        Cost-quality tradeoff
    Error Handling
      Bedrock Errors
        ThrottlingException
        ModelTimeoutException
        ValidationException
      Graceful Degradation
        Cached response fallback
        Reduced quality mode
        Maintenance messages
      Client Communication
        Typed error codes
        Retry-After headers
        User-friendly messages

Conversation API Design

Multi-Turn Message Protocol

MangaAssist follows a strict message protocol that ensures Claude 3 receives well-formed conversation history while enforcing business rules.

sequenceDiagram
    participant User
    participant API as API Layer
    participant Validator as Request Validator
    participant Budget as Token Budget
    participant Context as Context Builder
    participant Router as Model Router
    participant Bedrock as Amazon Bedrock

    User->>API: {"message": "鬼滅の刃みたいなマンガは?", "sessionId": "abc123"}
    API->>Validator: Validate request schema
    Validator->>Validator: Check required fields, sanitize input
    Validator->>Budget: Estimate input tokens
    Budget->>Budget: Check user daily quota remaining
    Budget-->>Validator: OK (2,847 tokens remaining today)

    Validator->>Context: Build conversation context
    Context->>Context: Load session history (last 10 turns)
    Context->>Context: Apply sliding window (≤3000 tokens)
    Context->>Context: Inject system prompt + RAG results

    Context->>Router: Route to model
    Router->>Router: Classify query complexity
    Router-->>Bedrock: Haiku (simple recommendation)

    Bedrock-->>API: Streaming response
    API-->>User: Streamed chunks + metadata

Conversation API Handler

"""
MangaAssist Conversation API
Handles multi-turn chat with token budget enforcement and model routing.
"""

import json
import time
import hashlib
import logging
from dataclasses import dataclass, field
from enum import Enum
from typing import Optional

import boto3
from botocore.exceptions import ClientError

logger = logging.getLogger(__name__)


class QueryComplexity(Enum):
    """Query complexity levels for model routing."""
    SIMPLE = "simple"      # Greetings, yes/no, simple lookups
    MODERATE = "moderate"  # Recommendations, comparisons
    COMPLEX = "complex"    # Multi-step reasoning, analysis, creative


class ModelChoice(Enum):
    HAIKU = "anthropic.claude-3-haiku-20240307-v1:0"
    SONNET = "anthropic.claude-3-sonnet-20240229-v1:0"


# Cost per 1M tokens
MODEL_COSTS = {
    ModelChoice.HAIKU: {"input": 0.25, "output": 1.25},
    ModelChoice.SONNET: {"input": 3.00, "output": 15.00},
}


@dataclass
class ConversationRequest:
    """Validated conversation request."""
    session_id: str
    user_id: str
    message: str
    language: str = "ja"
    idempotency_key: Optional[str] = None
    max_tokens: Optional[int] = None
    stream: bool = True


@dataclass
class ConversationResponse:
    """Structured conversation response."""
    session_id: str
    message_id: str
    text: str
    model_used: str
    input_tokens: int
    output_tokens: int
    cost_usd: float
    latency_ms: int
    cached: bool = False


@dataclass
class UserQuota:
    """Per-user daily token quota."""
    daily_input_limit: int = 500_000
    daily_output_limit: int = 200_000
    daily_cost_limit_usd: float = 5.00
    used_input_tokens: int = 0
    used_output_tokens: int = 0
    used_cost_usd: float = 0.0

    @property
    def input_remaining(self) -> int:
        return max(0, self.daily_input_limit - self.used_input_tokens)

    @property
    def output_remaining(self) -> int:
        return max(0, self.daily_output_limit - self.used_output_tokens)

    @property
    def cost_remaining(self) -> float:
        return max(0.0, self.daily_cost_limit_usd - self.used_cost_usd)

    def can_proceed(self, estimated_input: int, estimated_output: int) -> bool:
        if self.used_input_tokens + estimated_input > self.daily_input_limit:
            return False
        if self.used_output_tokens + estimated_output > self.daily_output_limit:
            return False
        return True


class ConversationAPI:
    """
    Production conversation API with token budgets and model routing.
    """

    SYSTEM_PROMPT_JA = """あなたはMangaAssistです。日本のマンガストアのAIアシスタントとして、
お客様のマンガ選びをお手伝いします。

ルール:
- 丁寧な日本語で回答してください
- マンガのジャンル、作者、ストーリーに基づいた推薦を行います
- 在庫情報や価格は正確な情報のみ提供してください
- 不明な場合は正直にお伝えし、カスタマーサポートへの問い合わせを促してください
- 回答は簡潔に、300文字以内を目安にしてください"""

    SYSTEM_PROMPT_EN = """You are MangaAssist, an AI assistant for a Japanese manga store.
You help customers discover and choose manga.

Rules:
- Provide recommendations based on genres, authors, and story preferences
- Only share accurate inventory and pricing information
- If uncertain, be honest and suggest contacting customer support
- Keep responses concise, around 150 words"""

    # Keywords that indicate complex queries requiring Sonnet
    COMPLEX_INDICATORS = [
        "比較", "分析", "違い", "なぜ", "理由", "おすすめ理由",
        "compare", "analyze", "difference", "explain why",
        "ランキング", "トップ", "ベスト",
        "似ている", "みたいな", "のような",
    ]

    SIMPLE_INDICATORS = [
        "こんにちは", "ありがとう", "はい", "いいえ",
        "hello", "thanks", "yes", "no",
        "在庫", "価格", "値段", "stock", "price",
    ]

    def __init__(self, redis_client, session_manager):
        self.redis = redis_client
        self.session_manager = session_manager
        self.bedrock = boto3.client(
            "bedrock-runtime",
            region_name="ap-northeast-1",
        )

    def validate_request(self, raw_body: dict) -> ConversationRequest:
        """
        Validate and normalize incoming request.

        Raises ValueError for invalid requests.
        """
        message = raw_body.get("message", "").strip()
        if not message:
            raise ValueError("Message cannot be empty")

        if len(message) > 5000:
            raise ValueError("Message exceeds 5000 character limit")

        session_id = raw_body.get("sessionId")
        if not session_id:
            raise ValueError("sessionId is required")

        # Generate idempotency key if not provided
        idempotency_key = raw_body.get("idempotencyKey")
        if not idempotency_key:
            ts_window = int(time.time()) // 5
            idempotency_key = hashlib.sha256(
                f"{session_id}:{message}:{ts_window}".encode()
            ).hexdigest()[:16]

        # Detect language
        jp_chars = sum(1 for c in message if ord(c) > 0x3000)
        language = "ja" if jp_chars > len(message) * 0.2 else "en"

        return ConversationRequest(
            session_id=session_id,
            user_id=raw_body.get("userId", "anonymous"),
            message=message,
            language=language,
            idempotency_key=idempotency_key,
            max_tokens=raw_body.get("maxTokens"),
            stream=raw_body.get("stream", True),
        )

    def classify_complexity(self, message: str) -> QueryComplexity:
        """
        Classify query complexity to determine model routing.
        Simple queries go to Haiku (cheap), complex to Sonnet (capable).
        """
        message_lower = message.lower()

        # Check for simple patterns
        for indicator in self.SIMPLE_INDICATORS:
            if indicator in message_lower and len(message) < 50:
                return QueryComplexity.SIMPLE

        # Check for complex patterns
        complex_score = sum(
            1 for ind in self.COMPLEX_INDICATORS if ind in message_lower
        )

        if complex_score >= 2 or len(message) > 500:
            return QueryComplexity.COMPLEX

        if complex_score >= 1 or len(message) > 200:
            return QueryComplexity.MODERATE

        return QueryComplexity.SIMPLE

    def select_model(self, complexity: QueryComplexity) -> ModelChoice:
        """Map complexity to model choice."""
        routing = {
            QueryComplexity.SIMPLE: ModelChoice.HAIKU,
            QueryComplexity.MODERATE: ModelChoice.HAIKU,  # Haiku handles moderate well
            QueryComplexity.COMPLEX: ModelChoice.SONNET,
        }
        return routing[complexity]

    def check_idempotency(self, key: str) -> Optional[ConversationResponse]:
        """Check if this request was already processed."""
        cached = self.redis.get(f"idemp_resp:{key}")
        if cached:
            data = json.loads(cached)
            return ConversationResponse(
                session_id=data["session_id"],
                message_id=data["message_id"],
                text=data["text"],
                model_used=data["model_used"],
                input_tokens=data["input_tokens"],
                output_tokens=data["output_tokens"],
                cost_usd=data["cost_usd"],
                latency_ms=data["latency_ms"],
                cached=True,
            )
        return None

    def get_user_quota(self, user_id: str) -> UserQuota:
        """Load user's daily quota from Redis."""
        today = time.strftime("%Y-%m-%d")
        key = f"quota:{user_id}:{today}"
        data = self.redis.hgetall(key)

        if not data:
            return UserQuota()

        return UserQuota(
            used_input_tokens=int(data.get("input_tokens", 0)),
            used_output_tokens=int(data.get("output_tokens", 0)),
            used_cost_usd=float(data.get("cost_usd", 0.0)),
        )

    def update_user_quota(
        self, user_id: str, input_tokens: int, output_tokens: int, cost_usd: float
    ) -> None:
        """Update user's daily usage counters."""
        today = time.strftime("%Y-%m-%d")
        key = f"quota:{user_id}:{today}"
        pipe = self.redis.pipeline()
        pipe.hincrby(key, "input_tokens", input_tokens)
        pipe.hincrby(key, "output_tokens", output_tokens)
        pipe.hincrbyfloat(key, "cost_usd", cost_usd)
        pipe.expire(key, 86400)  # Auto-expire at end of day
        pipe.execute()

    def build_prompt(
        self,
        request: ConversationRequest,
        context_history: list[dict],
        rag_results: list[str],
    ) -> tuple[str, list[dict]]:
        """
        Build the full prompt with system instructions, RAG context, and history.

        Returns (system_prompt, messages).
        """
        system = (
            self.SYSTEM_PROMPT_JA if request.language == "ja"
            else self.SYSTEM_PROMPT_EN
        )

        # Inject RAG results into system prompt if available
        if rag_results:
            rag_context = "\n\n".join(rag_results[:3])  # Top 3 results
            system += f"\n\n参考情報:\n{rag_context}"

        # Build messages from history + current message
        messages = list(context_history)
        messages.append({
            "role": "user",
            "content": request.message,
        })

        return system, messages

    def calculate_cost(
        self, model: ModelChoice, input_tokens: int, output_tokens: int
    ) -> float:
        """Calculate request cost in USD."""
        rates = MODEL_COSTS[model]
        input_cost = (input_tokens / 1_000_000) * rates["input"]
        output_cost = (output_tokens / 1_000_000) * rates["output"]
        return input_cost + output_cost

    async def handle_chat(self, raw_body: dict) -> ConversationResponse:
        """
        Main entry point for chat requests.
        Validates, budgets, routes, invokes, and tracks.
        """
        start = time.time()

        # 1. Validate request
        request = self.validate_request(raw_body)

        # 2. Check idempotency
        cached_response = self.check_idempotency(request.idempotency_key)
        if cached_response:
            logger.info(f"Idempotent cache hit: {request.idempotency_key}")
            return cached_response

        # 3. Check user quota
        quota = self.get_user_quota(request.user_id)
        estimated_output = 300  # Conservative estimate
        if not quota.can_proceed(500, estimated_output):
            raise QuotaExceededException(
                f"Daily quota exceeded. Remaining input: {quota.input_remaining}, "
                f"output: {quota.output_remaining}"
            )

        # 4. Load session and build context
        session = self.session_manager.get_or_create_session(
            request.session_id, request.user_id
        )
        context_history = self.session_manager.get_context_for_prompt(session)

        # 5. Classify and route
        complexity = self.classify_complexity(request.message)
        model = self.select_model(complexity)

        # 6. Build prompt
        rag_results = await self._retrieve_rag_context(request.message)
        system_prompt, messages = self.build_prompt(
            request, context_history, rag_results
        )

        # 7. Invoke Bedrock
        response_text, input_tokens, output_tokens = await self._invoke_bedrock(
            model=model,
            system_prompt=system_prompt,
            messages=messages,
            max_tokens=request.max_tokens or 1024,
        )

        # 8. Calculate cost and update quota
        cost = self.calculate_cost(model, input_tokens, output_tokens)
        self.update_user_quota(request.user_id, input_tokens, output_tokens, cost)

        # 9. Update session
        self.session_manager.add_turn(
            session, "user", request.message, token_count=input_tokens
        )
        self.session_manager.add_turn(
            session, "assistant", response_text,
            token_count=output_tokens, model_id=model.value,
        )

        latency_ms = int((time.time() - start) * 1000)

        # 10. Build response
        response = ConversationResponse(
            session_id=request.session_id,
            message_id=hashlib.sha256(
                f"{request.session_id}:{time.time()}".encode()
            ).hexdigest()[:12],
            text=response_text,
            model_used=model.value,
            input_tokens=input_tokens,
            output_tokens=output_tokens,
            cost_usd=cost,
            latency_ms=latency_ms,
        )

        # 11. Cache for idempotency
        self._cache_response(request.idempotency_key, response)

        logger.info(
            "Chat completed",
            extra={
                "model": model.name,
                "complexity": complexity.value,
                "latency_ms": latency_ms,
                "input_tokens": input_tokens,
                "output_tokens": output_tokens,
                "cost_usd": f"{cost:.6f}",
            },
        )

        return response

    async def _retrieve_rag_context(self, query: str) -> list[str]:
        """Retrieve relevant manga information from OpenSearch."""
        # Implementation calls OpenSearch Serverless vector search
        # Returns top-k relevant product descriptions and reviews
        pass

    async def _invoke_bedrock(
        self,
        model: ModelChoice,
        system_prompt: str,
        messages: list[dict],
        max_tokens: int,
    ) -> tuple[str, int, int]:
        """Invoke Bedrock and return (text, input_tokens, output_tokens)."""
        # Implementation calls Bedrock InvokeModel or InvokeModelWithResponseStream
        pass

    def _cache_response(self, idempotency_key: str, response: ConversationResponse):
        """Cache response for idempotency."""
        data = {
            "session_id": response.session_id,
            "message_id": response.message_id,
            "text": response.text,
            "model_used": response.model_used,
            "input_tokens": response.input_tokens,
            "output_tokens": response.output_tokens,
            "cost_usd": response.cost_usd,
            "latency_ms": response.latency_ms,
        }
        self.redis.setex(
            f"idemp_resp:{idempotency_key}",
            30,  # 30-second dedup window
            json.dumps(data, ensure_ascii=False),
        )


class QuotaExceededException(Exception):
    """Raised when user exceeds daily token quota."""
    pass

Token Budget Enforcement Architecture

Token budgets are the cost firewall of MangaAssist. Without strict enforcement, a single misbehaving client or prompt injection could generate thousands of dollars in Bedrock costs within minutes.

Token Budget Flow

graph TB
    subgraph Pre_Request["Pre-Request Phase"]
        ESTIMATE[Estimate Input Tokens<br/>JP: ~1 token/1.5 chars<br/>EN: ~1 token/4 chars]
        CHECK_QUOTA[Check User Daily Quota<br/>Redis counter]
        CHECK_CONTEXT[Check Context Window<br/>History + RAG + Prompt ≤ 180K]
        TRUNCATE[Truncate if Needed<br/>Drop oldest turns first]
    end

    subgraph In_Flight["In-Flight Phase"]
        STREAM_COUNT[Count Output Tokens<br/>From stream events]
        BUDGET_CHECK[Check Against Cap<br/>max_tokens parameter]
        FORCE_STOP[Force Stop if Budget<br/>Exceeded 110% of cap]
    end

    subgraph Post_Request["Post-Request Phase"]
        ACTUAL[Get Actual Counts<br/>From message_delta event]
        RECONCILE[Reconcile Estimate<br/>vs Actual]
        UPDATE_QUOTA[Update User Quota<br/>Redis HINCRBY]
        EMIT_METRIC[Emit CloudWatch<br/>Cost Metric]
    end

    ESTIMATE --> CHECK_QUOTA
    CHECK_QUOTA -->|Quota OK| CHECK_CONTEXT
    CHECK_QUOTA -->|Exceeded| REJECT[Reject Request<br/>429 Quota Exceeded]
    CHECK_CONTEXT --> TRUNCATE
    TRUNCATE --> STREAM_COUNT
    STREAM_COUNT --> BUDGET_CHECK
    BUDGET_CHECK --> FORCE_STOP
    FORCE_STOP --> ACTUAL
    ACTUAL --> RECONCILE
    RECONCILE --> UPDATE_QUOTA
    UPDATE_QUOTA --> EMIT_METRIC

    style REJECT fill:#dc3545,color:#fff
    style FORCE_STOP fill:#ffc107,color:#000

Token Budget Enforcer

"""
MangaAssist Token Budget Enforcer
Multi-layer token budget enforcement: pre-request, in-flight, and post-request.
"""

import time
import logging
from dataclasses import dataclass
from typing import Optional

logger = logging.getLogger(__name__)


@dataclass
class BudgetConfig:
    """Token budget configuration."""
    # Per-request limits
    max_input_tokens: int = 4000
    max_output_tokens: int = 1024
    max_context_window: int = 180_000  # Claude 3 Sonnet

    # Per-user daily limits
    daily_input_limit: int = 500_000
    daily_output_limit: int = 200_000
    daily_cost_limit_usd: float = 5.00

    # System overhead
    system_prompt_tokens: int = 300
    rag_context_tokens: int = 800
    safety_margin_tokens: int = 500

    # In-flight controls
    output_overshoot_tolerance: float = 1.10  # Allow 10% overshoot

    @property
    def available_for_history(self) -> int:
        """Tokens available for conversation history."""
        return (
            self.max_input_tokens
            - self.system_prompt_tokens
            - self.rag_context_tokens
            - self.safety_margin_tokens
        )


class TokenBudgetEnforcer:
    """Enforces token budgets at multiple levels."""

    def __init__(self, redis_client, config: Optional[BudgetConfig] = None):
        self.redis = redis_client
        self.config = config or BudgetConfig()

    def pre_request_check(
        self, user_id: str, estimated_input_tokens: int
    ) -> dict:
        """
        Pre-request validation. Returns budget allocation or rejection.
        """
        # Check user daily quota
        today = time.strftime("%Y-%m-%d")
        quota_key = f"quota:{user_id}:{today}"
        used = self.redis.hgetall(quota_key)

        used_input = int(used.get("input_tokens", 0))
        used_output = int(used.get("output_tokens", 0))
        used_cost = float(used.get("cost_usd", 0.0))

        if used_input + estimated_input_tokens > self.config.daily_input_limit:
            return {
                "allowed": False,
                "reason": "daily_input_limit_exceeded",
                "remaining_input": self.config.daily_input_limit - used_input,
            }

        if used_cost >= self.config.daily_cost_limit_usd:
            return {
                "allowed": False,
                "reason": "daily_cost_limit_exceeded",
                "remaining_cost_usd": self.config.daily_cost_limit_usd - used_cost,
            }

        # Check input doesn't exceed context window
        if estimated_input_tokens > self.config.max_context_window:
            return {
                "allowed": False,
                "reason": "context_window_exceeded",
                "max_allowed": self.config.max_context_window,
            }

        # Calculate allowed output tokens
        allowed_output = min(
            self.config.max_output_tokens,
            self.config.daily_output_limit - used_output,
        )

        return {
            "allowed": True,
            "allocated_output_tokens": max(allowed_output, 100),
            "remaining_daily_input": self.config.daily_input_limit - used_input,
            "remaining_daily_output": self.config.daily_output_limit - used_output,
        }

    def truncate_history(
        self, history: list[dict], max_tokens: Optional[int] = None
    ) -> list[dict]:
        """
        Truncate conversation history to fit within token budget.
        Keeps most recent turns, drops oldest first.
        Always preserves at least the last user-assistant pair.
        """
        max_tokens = max_tokens or self.config.available_for_history

        # Estimate tokens per turn
        turns_with_tokens = []
        for turn in history:
            text = turn.get("content", "")
            # Japanese: ~1 token per 1.5 chars; English: ~1 token per 4 chars
            jp_chars = sum(1 for c in text if ord(c) > 0x3000)
            en_chars = len(text) - jp_chars
            estimated = int(jp_chars / 1.5) + (en_chars // 4) + 5  # +5 overhead
            turns_with_tokens.append((turn, estimated))

        # Walk backward, accumulate tokens
        result = []
        total = 0
        for turn, tokens in reversed(turns_with_tokens):
            if total + tokens > max_tokens and len(result) >= 2:
                break
            result.insert(0, turn)
            total += tokens

        logger.info(
            f"History truncated: {len(history)} -> {len(result)} turns, "
            f"~{total} tokens"
        )
        return result

    def in_flight_check(
        self, output_tokens_so_far: int, allocated_output: int
    ) -> bool:
        """
        Check during streaming if we should force-stop.
        Returns True if streaming should continue.
        """
        max_allowed = int(allocated_output * self.config.output_overshoot_tolerance)
        return output_tokens_so_far < max_allowed

    def post_request_reconcile(
        self,
        user_id: str,
        estimated_input: int,
        actual_input: int,
        actual_output: int,
        model_id: str,
        cost_usd: float,
    ) -> dict:
        """
        Post-request: reconcile estimates with actuals and update quotas.
        """
        drift = abs(estimated_input - actual_input) / max(actual_input, 1)

        if drift > 0.20:
            logger.warning(
                f"Token estimation drift: {drift:.1%} "
                f"(estimated={estimated_input}, actual={actual_input})"
            )

        # Update daily quota
        today = time.strftime("%Y-%m-%d")
        quota_key = f"quota:{user_id}:{today}"
        pipe = self.redis.pipeline()
        pipe.hincrby(quota_key, "input_tokens", actual_input)
        pipe.hincrby(quota_key, "output_tokens", actual_output)
        pipe.hincrbyfloat(quota_key, "cost_usd", cost_usd)
        pipe.hincrby(quota_key, "request_count", 1)
        pipe.expire(quota_key, 86400)
        pipe.execute()

        return {
            "estimation_drift": drift,
            "actual_input": actual_input,
            "actual_output": actual_output,
            "cost_usd": cost_usd,
        }

Idempotent Retry Strategy

Retry Decision Tree

graph TB
    ERROR[Error Received]
    CLASSIFY{Classify Error}

    RETRYABLE[Retryable Errors]
    NON_RETRYABLE[Non-Retryable]

    THROTTLE[ThrottlingException]
    TIMEOUT[ModelTimeoutException]
    SERVER[ServiceUnavailable 503]
    NETWORK[ConnectionError]

    VALIDATION[ValidationException]
    ACCESS[AccessDeniedException]
    BADREQ[BadRequestException]
    QUOTA[QuotaExceeded]

    CHECK_RETRY{Retry Count<br/>< Max?}
    CALC_DELAY[Calculate Delay<br/>base × 2^attempt + jitter]
    CHECK_CB{Circuit Breaker<br/>Status?}
    WAIT[Wait with Backoff]
    RETRY[Retry Request]
    FAIL[Return Error<br/>to Client]
    FALLBACK[Try Fallback<br/>Haiku if Sonnet failed]

    ERROR --> CLASSIFY
    CLASSIFY --> RETRYABLE
    CLASSIFY --> NON_RETRYABLE

    RETRYABLE --> THROTTLE
    RETRYABLE --> TIMEOUT
    RETRYABLE --> SERVER
    RETRYABLE --> NETWORK

    NON_RETRYABLE --> VALIDATION
    NON_RETRYABLE --> ACCESS
    NON_RETRYABLE --> BADREQ
    NON_RETRYABLE --> QUOTA
    NON_RETRYABLE --> FAIL

    THROTTLE --> CHECK_RETRY
    TIMEOUT --> CHECK_RETRY
    SERVER --> CHECK_RETRY
    NETWORK --> CHECK_RETRY

    CHECK_RETRY -->|Yes| CALC_DELAY
    CHECK_RETRY -->|No| CHECK_CB
    CALC_DELAY --> WAIT
    WAIT --> RETRY
    CHECK_CB -->|Open| FALLBACK
    CHECK_CB -->|Closed| FAIL

    style RETRYABLE fill:#ffc107,color:#000
    style NON_RETRYABLE fill:#dc3545,color:#fff
    style FALLBACK fill:#28a745,color:#fff

Retry Controller with Circuit Breaker

"""
MangaAssist Retry Controller
Exponential backoff with jitter, circuit breaker, and model fallback.
"""

import time
import random
import logging
from dataclasses import dataclass, field
from enum import Enum
from typing import Callable, Optional, Any

from botocore.exceptions import ClientError

logger = logging.getLogger(__name__)


class CircuitState(Enum):
    CLOSED = "closed"      # Normal operation
    OPEN = "open"          # Failing, reject requests
    HALF_OPEN = "half_open"  # Testing recovery


@dataclass
class RetryConfig:
    """Retry behavior configuration."""
    max_retries: int = 3
    base_delay_seconds: float = 0.5
    max_delay_seconds: float = 8.0
    jitter_range: float = 0.5  # ±50% jitter
    timeout_seconds: float = 25.0

    # Circuit breaker settings
    failure_threshold: int = 5  # Failures before opening circuit
    recovery_timeout_seconds: float = 30.0  # Time before half-open
    success_threshold: int = 2  # Successes to close circuit


RETRYABLE_ERRORS = {
    "ThrottlingException",
    "ModelTimeoutException",
    "ServiceUnavailableException",
    "InternalServerException",
    "ModelStreamErrorException",
}

NON_RETRYABLE_ERRORS = {
    "ValidationException",
    "AccessDeniedException",
    "ResourceNotFoundException",
    "ModelNotReadyException",
}


class CircuitBreaker:
    """
    Circuit breaker for Bedrock API calls.
    Tracks failures per model and opens circuit when threshold is exceeded.
    """

    def __init__(self, redis_client, config: RetryConfig):
        self.redis = redis_client
        self.config = config

    def get_state(self, model_id: str) -> CircuitState:
        """Get current circuit state for a model."""
        key = f"circuit:{model_id}"
        data = self.redis.hgetall(key)

        if not data:
            return CircuitState.CLOSED

        state = data.get("state", "closed")
        failures = int(data.get("failures", 0))
        last_failure = float(data.get("last_failure", 0))

        if state == "open":
            # Check if recovery timeout has elapsed
            elapsed = time.time() - last_failure
            if elapsed >= self.config.recovery_timeout_seconds:
                self._set_state(model_id, CircuitState.HALF_OPEN)
                return CircuitState.HALF_OPEN
            return CircuitState.OPEN

        if failures >= self.config.failure_threshold:
            self._set_state(model_id, CircuitState.OPEN)
            return CircuitState.OPEN

        return CircuitState(state)

    def record_success(self, model_id: str) -> None:
        """Record a successful call."""
        key = f"circuit:{model_id}"
        state = self.get_state(model_id)

        if state == CircuitState.HALF_OPEN:
            successes = self.redis.hincrby(key, "half_open_successes", 1)
            if successes >= self.config.success_threshold:
                self._set_state(model_id, CircuitState.CLOSED)
                self.redis.delete(key)
        elif state == CircuitState.CLOSED:
            # Decay failure counter on success
            self.redis.hincrby(key, "failures", -1)
            failures = int(self.redis.hget(key, "failures") or 0)
            if failures <= 0:
                self.redis.delete(key)

    def record_failure(self, model_id: str) -> None:
        """Record a failed call."""
        key = f"circuit:{model_id}"
        pipe = self.redis.pipeline()
        pipe.hincrby(key, "failures", 1)
        pipe.hset(key, "last_failure", str(time.time()))
        pipe.expire(key, 300)  # Auto-cleanup after 5 min
        pipe.execute()

        state = self.get_state(model_id)
        if state == CircuitState.HALF_OPEN:
            self._set_state(model_id, CircuitState.OPEN)

    def _set_state(self, model_id: str, state: CircuitState) -> None:
        key = f"circuit:{model_id}"
        self.redis.hset(key, "state", state.value)
        self.redis.expire(key, 300)


class RetryController:
    """
    Manages retries with exponential backoff, jitter, and circuit breaker.
    Supports model fallback (Sonnet -> Haiku).
    """

    MODEL_FALLBACK = {
        "anthropic.claude-3-sonnet-20240229-v1:0": "anthropic.claude-3-haiku-20240307-v1:0",
    }

    def __init__(
        self,
        redis_client,
        config: Optional[RetryConfig] = None,
    ):
        self.config = config or RetryConfig()
        self.circuit_breaker = CircuitBreaker(redis_client, self.config)

    def calculate_delay(self, attempt: int) -> float:
        """Calculate backoff delay with jitter."""
        base = self.config.base_delay_seconds * (2 ** attempt)
        capped = min(base, self.config.max_delay_seconds)

        # Add jitter: ±jitter_range of the delay
        jitter = capped * self.config.jitter_range
        delay = capped + random.uniform(-jitter, jitter)

        return max(0.1, delay)  # Minimum 100ms

    def execute_with_retry(
        self,
        func: Callable,
        model_id: str,
        *args,
        **kwargs,
    ) -> Any:
        """
        Execute a function with retry logic and circuit breaker.

        Args:
            func: The function to call (e.g., bedrock invoke)
            model_id: The model being called
            *args, **kwargs: Passed to func

        Returns:
            The function's return value

        Raises:
            Last exception if all retries exhausted
        """
        # Check circuit breaker
        state = self.circuit_breaker.get_state(model_id)
        if state == CircuitState.OPEN:
            fallback = self.MODEL_FALLBACK.get(model_id)
            if fallback:
                logger.warning(
                    f"Circuit open for {model_id}, falling back to {fallback}"
                )
                return self._try_fallback(func, fallback, *args, **kwargs)
            raise CircuitOpenException(
                f"Circuit breaker open for {model_id}"
            )

        last_exception = None

        for attempt in range(self.config.max_retries + 1):
            try:
                result = func(model_id=model_id, *args, **kwargs)
                self.circuit_breaker.record_success(model_id)
                if attempt > 0:
                    logger.info(
                        f"Succeeded on retry {attempt} for {model_id}"
                    )
                return result

            except ClientError as e:
                error_code = e.response["Error"]["Code"]
                last_exception = e

                if error_code in NON_RETRYABLE_ERRORS:
                    logger.error(
                        f"Non-retryable error {error_code}: {e}"
                    )
                    raise

                if error_code in RETRYABLE_ERRORS:
                    self.circuit_breaker.record_failure(model_id)

                    if attempt < self.config.max_retries:
                        delay = self.calculate_delay(attempt)
                        logger.warning(
                            f"Retryable error {error_code} on attempt "
                            f"{attempt + 1}/{self.config.max_retries + 1}. "
                            f"Retrying in {delay:.2f}s"
                        )
                        time.sleep(delay)
                    else:
                        # All retries exhausted, try fallback
                        fallback = self.MODEL_FALLBACK.get(model_id)
                        if fallback:
                            return self._try_fallback(
                                func, fallback, *args, **kwargs
                            )
                else:
                    logger.error(f"Unknown error code {error_code}: {e}")
                    raise

            except Exception as e:
                last_exception = e
                self.circuit_breaker.record_failure(model_id)

                if attempt < self.config.max_retries:
                    delay = self.calculate_delay(attempt)
                    logger.warning(
                        f"Unexpected error on attempt "
                        f"{attempt + 1}: {e}. Retrying in {delay:.2f}s"
                    )
                    time.sleep(delay)

        raise last_exception

    def _try_fallback(
        self, func: Callable, fallback_model: str, *args, **kwargs
    ) -> Any:
        """Attempt request with fallback model."""
        logger.info(f"Attempting fallback to {fallback_model}")
        try:
            result = func(model_id=fallback_model, *args, **kwargs)
            self.circuit_breaker.record_success(fallback_model)
            return result
        except Exception as e:
            self.circuit_breaker.record_failure(fallback_model)
            raise


class CircuitOpenException(Exception):
    """Raised when circuit breaker is open and no fallback available."""
    pass

Model Selection Router

graph LR
    subgraph Input["Incoming Query"]
        MSG[User Message]
    end

    subgraph Classification["Query Classification"]
        INTENT[Intent Detection]
        COMPLEXITY[Complexity Scoring<br/>Keywords + Length]
        LANG[Language Detection<br/>JP / EN]
    end

    subgraph Routing["Model Router"]
        CACHE_CHECK{Cache Hit?}
        SIMPLE_Q{Simple?}
        COMPLEX_Q{Complex?}
    end

    subgraph Models["Bedrock Models"]
        REDIS_CACHE[Redis Cache<br/>$0.00/request]
        HAIKU[Claude 3 Haiku<br/>$0.25/$1.25 per 1M]
        SONNET[Claude 3 Sonnet<br/>$3/$15 per 1M]
    end

    MSG --> INTENT
    MSG --> COMPLEXITY
    MSG --> LANG

    INTENT --> CACHE_CHECK
    COMPLEXITY --> CACHE_CHECK

    CACHE_CHECK -->|Yes| REDIS_CACHE
    CACHE_CHECK -->|No| SIMPLE_Q
    SIMPLE_Q -->|Yes| HAIKU
    SIMPLE_Q -->|No| COMPLEX_Q
    COMPLEX_Q -->|Yes| SONNET
    COMPLEX_Q -->|No| HAIKU

    style REDIS_CACHE fill:#28a745,color:#fff
    style HAIKU fill:#17a2b8,color:#fff
    style SONNET fill:#ff9900,color:#000

API Response Format Standards

"""
MangaAssist API Response Format
Standardized response envelopes for all API endpoints.
"""

import json
import time
from dataclasses import dataclass, asdict
from typing import Optional, Any


@dataclass
class APIResponse:
    """Standard API response envelope."""
    success: bool
    data: Optional[Any] = None
    error: Optional[dict] = None
    metadata: Optional[dict] = None

    def to_dict(self) -> dict:
        result = {"success": self.success}
        if self.data is not None:
            result["data"] = self.data
        if self.error is not None:
            result["error"] = self.error
        if self.metadata is not None:
            result["metadata"] = self.metadata
        return result

    def to_json(self) -> str:
        return json.dumps(self.to_dict(), ensure_ascii=False)


@dataclass
class StreamChunk:
    """Individual streaming chunk sent via WebSocket."""
    type: str  # "chunk", "done", "error", "metadata"
    text: Optional[str] = None
    tokens_used: Optional[dict] = None
    error_code: Optional[str] = None
    message: Optional[str] = None
    sequence: int = 0

    def to_json(self) -> str:
        data = {"type": self.type}
        if self.text is not None:
            data["text"] = self.text
        if self.tokens_used is not None:
            data["tokensUsed"] = self.tokens_used
        if self.error_code is not None:
            data["errorCode"] = self.error_code
        if self.message is not None:
            data["message"] = self.message
        data["seq"] = self.sequence
        return json.dumps(data, ensure_ascii=False)


# Error code catalog
ERROR_CODES = {
    "QUOTA_EXCEEDED": {
        "status": 429,
        "message_ja": "本日のご利用上限に達しました。明日またお試しください。",
        "message_en": "Daily usage limit reached. Please try again tomorrow.",
        "retry_after": 86400,
    },
    "RATE_LIMITED": {
        "status": 429,
        "message_ja": "リクエストが多すぎます。少々お待ちください。",
        "message_en": "Too many requests. Please wait a moment.",
        "retry_after": 5,
    },
    "MODEL_TIMEOUT": {
        "status": 504,
        "message_ja": "応答に時間がかかっています。もう一度お試しください。",
        "message_en": "Response timed out. Please try again.",
        "retry_after": 2,
    },
    "INVALID_REQUEST": {
        "status": 400,
        "message_ja": "リクエストが不正です。",
        "message_en": "Invalid request format.",
        "retry_after": 0,
    },
    "SESSION_EXPIRED": {
        "status": 410,
        "message_ja": "セッションが期限切れです。新しいチャットを開始してください。",
        "message_en": "Session expired. Please start a new chat.",
        "retry_after": 0,
    },
    "INTERNAL_ERROR": {
        "status": 500,
        "message_ja": "内部エラーが発生しました。しばらくしてからお試しください。",
        "message_en": "An internal error occurred. Please try again later.",
        "retry_after": 10,
    },
}


def build_error_response(
    error_code: str, language: str = "ja", details: str = ""
) -> APIResponse:
    """Build a standardized error response."""
    error_info = ERROR_CODES.get(error_code, ERROR_CODES["INTERNAL_ERROR"])
    message_key = f"message_{language}"
    message = error_info.get(message_key, error_info["message_en"])

    return APIResponse(
        success=False,
        error={
            "code": error_code,
            "message": message,
            "details": details,
            "retryAfter": error_info["retry_after"],
        },
        metadata={
            "timestamp": int(time.time()),
            "statusCode": error_info["status"],
        },
    )


def build_chat_response(
    text: str,
    session_id: str,
    input_tokens: int,
    output_tokens: int,
    model_used: str,
    latency_ms: int,
) -> APIResponse:
    """Build a standardized chat response."""
    return APIResponse(
        success=True,
        data={
            "sessionId": session_id,
            "text": text,
        },
        metadata={
            "model": model_used,
            "tokensUsed": {
                "input": input_tokens,
                "output": output_tokens,
            },
            "latencyMs": latency_ms,
            "timestamp": int(time.time()),
        },
    )

Key Takeaways

# Takeaway MangaAssist Application
1 Classify queries to route models — Simple greetings and lookups go to Haiku ($0.25/1M input); complex multi-step reasoning goes to Sonnet ($3/1M input). This 70/30 split saves ~67% vs all-Sonnet. MangaAssist uses keyword + length heuristics to classify incoming Japanese text and route to the cheapest capable model.
2 Token budgets enforce at three layers — Pre-request estimation rejects over-budget inputs; in-flight monitoring stops runaway outputs; post-request reconciliation catches estimation drift. Each MangaAssist user gets 500K input tokens/day (~$1.50 on Sonnet); exceeding triggers a friendly Japanese-language quota message.
3 Idempotency keys use content hashing — SHA-256 of session ID + message + 5-second time window creates natural dedup that handles retries without false positives. WebSocket reconnects and double-taps do not generate duplicate Bedrock invocations or double-count token usage.
4 Circuit breakers with model fallback — When Sonnet fails 5 times in 30 seconds, the circuit opens and routes to Haiku automatically, then tests recovery via half-open state. MangaAssist degrades gracefully: users get slightly less nuanced recommendations instead of errors.
5 Exponential backoff with jitter prevents thundering herds — Random jitter (±50%) on retry delays prevents synchronized retries from hundreds of concurrent requests. At 50 RPS peak, a Bedrock throttling event does not cascade into a retry storm that compounds the throttling.
6 Conversation context uses a sliding token window — Rather than fixed turn counts, the session manager walks backward from the most recent turn, summing tokens until the budget is reached. A conversation with short turns keeps more history; one with long manga descriptions naturally trims further back.
7 Bilingual error messages improve UX — Error codes map to both Japanese and English user-facing messages with appropriate retry-after guidance. Japanese users see "応答に時間がかかっています" instead of cryptic HTTP error codes.