LOCAL PREVIEW View on GitHub

Asynchronous Processing and Request Validation Patterns

MangaAssist context: JP Manga store chatbot on AWS — Bedrock Claude 3 (Sonnet at $3/$15 per 1M tokens input/output, Haiku at $0.25/$1.25), OpenSearch Serverless (vector store), DynamoDB (sessions/products), ECS Fargate (orchestrator), API Gateway WebSocket, ElastiCache Redis. Target: useful answer in under 3 seconds, 1M messages/day scale.


Skill Mapping

Attribute Value
Certification AWS Certified AI Practitioner (AIP-C01)
Domain 2 — Implementation and Integration of Foundation Models
Task 2.4 — Design model interaction systems for generative AI applications
Skill 2.4.1 — Create flexible model interaction systems (Bedrock APIs for synchronous requests, language-specific SDKs and SQS for asynchronous processing, API Gateway for custom API clients with request validation)

1. SQS Queue Design for FM Requests

1.1 Queue Architecture Overview

flowchart TB
    subgraph Producers["Request Producers"]
        API["API Gateway<br/>REST Endpoints"]
        WS["WebSocket API<br/>Batch Submissions"]
        CRON["EventBridge<br/>Scheduled Jobs"]
        STEP["Step Functions<br/>Workflow Steps"]
    end

    subgraph Queues["SQS Queue Topology"]
        direction TB
        subgraph Standard["Standard Queues (At-Least-Once)"]
            HIGH["manga-fm-high-priority<br/>Visibility: 60s<br/>Retention: 4 days"]
            NORMAL["manga-fm-normal<br/>Visibility: 120s<br/>Retention: 7 days"]
            LOW["manga-fm-low-priority<br/>Visibility: 300s<br/>Retention: 14 days"]
        end
        subgraph FIFO["FIFO Queues (Exactly-Once, Ordered)"]
            ORDERED["manga-fm-ordered.fifo<br/>MessageGroupId: per-user<br/>Dedup: 5 min window"]
            SESSION["manga-fm-session.fifo<br/>MessageGroupId: session_id<br/>Ordered per session"]
        end
        subgraph DLQs["Dead Letter Queues"]
            DLQ_STD["manga-fm-dlq<br/>Max receives: 3<br/>Retention: 14 days"]
            DLQ_FIFO["manga-fm-dlq.fifo<br/>Max receives: 3<br/>Retention: 14 days"]
        end
    end

    subgraph Consumers["Request Consumers"]
        LAMBDA["Lambda Workers<br/>(short tasks, < 15 min)"]
        ECS["ECS Fargate Tasks<br/>(long-running analysis)"]
        EC2["EC2 Workers<br/>(GPU-adjacent, bulk)"]
    end

    subgraph Results["Result Delivery"]
        DDB["DynamoDB<br/>Status Tracking"]
        S3["S3 Bucket<br/>Result Storage"]
        SNS["SNS Topic<br/>Completion Notify"]
        CALLBACK["Webhook<br/>Callback URL"]
    end

    API --> HIGH
    API --> NORMAL
    WS --> LOW
    CRON --> LOW
    STEP --> ORDERED

    HIGH --> LAMBDA
    NORMAL --> LAMBDA
    NORMAL --> ECS
    LOW --> ECS
    LOW --> EC2
    ORDERED --> LAMBDA
    SESSION --> LAMBDA

    HIGH -->|"3 failures"| DLQ_STD
    NORMAL -->|"3 failures"| DLQ_STD
    LOW -->|"3 failures"| DLQ_STD
    ORDERED -->|"3 failures"| DLQ_FIFO

    LAMBDA --> DDB
    LAMBDA --> S3
    ECS --> DDB
    ECS --> S3
    LAMBDA --> SNS
    ECS --> SNS
    SNS --> CALLBACK

    style Standard fill:#e8f5e9,stroke:#388e3c
    style FIFO fill:#e3f2fd,stroke:#1565c0
    style DLQs fill:#fce4ec,stroke:#c62828

1.2 FIFO vs Standard Queue Selection

Factor Standard Queue FIFO Queue MangaAssist Decision
Throughput Unlimited (batched) 300 msg/s (batch), 3000 msg/s (high throughput) Standard for bulk; FIFO for session-ordered
Ordering Best-effort Strict per MessageGroupId FIFO needed for multi-turn context preservation
Deduplication Not built-in 5-min dedup window FIFO avoids duplicate FM calls on retries
Delivery At-least-once Exactly-once processing FIFO avoids double billing from duplicate invocations
Cost $0.40/1M requests $0.50/1M requests Standard for non-ordered batch work
Max message size 256 KB 256 KB Both sufficient for FM request payloads
Retention 1 min to 14 days 1 min to 14 days 7 days standard, 14 days DLQ for investigation

MangaAssist queue assignment rules:

  1. Standard queue — Batch catalog enrichment, content moderation, genre classification (order irrelevant).
  2. FIFO queue — Multi-turn conversation continuations, ordered per-user request sequences where context must be preserved.
  3. High-priority standard — Partner API real-time requests that narrowly miss the sync SLA but need fast turnaround.

1.3 Dead Letter Queue (DLQ) Configuration

"""
DLQ configuration and monitoring for MangaAssist FM request queues.
"""
import json
import time
import logging
from datetime import datetime, timedelta

import boto3

logger = logging.getLogger(__name__)


class DLQManager:
    """
    Manages dead letter queues for failed FM requests.
    Provides monitoring, redriving, and alerting capabilities.

    DLQ triggers:
    - Bedrock ThrottlingException after all retries exhausted
    - Bedrock ModelTimeoutException (model overloaded)
    - Malformed request body that fails deserialization
    - Lambda/ECS consumer timeout before processing completes
    """

    def __init__(self, region: str = "us-east-1"):
        self._sqs = boto3.client("sqs", region_name=region)
        self._cloudwatch = boto3.client("cloudwatch", region_name=region)
        self._sns = boto3.client("sns", region_name=region)

        self._dlq_urls = {
            "standard": self._get_queue_url("manga-fm-dlq"),
            "fifo": self._get_queue_url("manga-fm-dlq.fifo")
        }
        self._alert_topic_arn = "arn:aws:sns:us-east-1:123456789012:manga-fm-alerts"

    def _get_queue_url(self, queue_name: str) -> str:
        """Resolve queue URL from name."""
        response = self._sqs.get_queue_url(QueueName=queue_name)
        return response["QueueUrl"]

    def get_dlq_depth(self, queue_type: str = "standard") -> int:
        """
        Get the current approximate message count in a DLQ.

        Args:
            queue_type: 'standard' or 'fifo'

        Returns:
            Approximate number of messages in the DLQ
        """
        url = self._dlq_urls[queue_type]
        attrs = self._sqs.get_queue_attributes(
            QueueUrl=url,
            AttributeNames=["ApproximateNumberOfMessages"]
        )
        return int(attrs["Attributes"]["ApproximateNumberOfMessages"])

    def sample_dlq_messages(
        self, queue_type: str = "standard", max_messages: int = 5
    ) -> list:
        """
        Sample messages from the DLQ without deleting them.
        Visibility timeout set to 0 so messages remain available.
        """
        url = self._dlq_urls[queue_type]
        response = self._sqs.receive_message(
            QueueUrl=url,
            MaxNumberOfMessages=min(max_messages, 10),
            VisibilityTimeout=0,        # Peek — do not hide message
            MessageAttributeNames=["All"],
            AttributeNames=["All"]
        )
        messages = response.get("Messages", [])

        sampled = []
        for msg in messages:
            body = json.loads(msg["Body"])
            sampled.append({
                "message_id": msg["MessageId"],
                "request_id": body.get("request_id", "unknown"),
                "prompt_preview": body.get("prompt", "")[:100],
                "model_id": body.get("model_id", "unknown"),
                "sent_timestamp": msg["Attributes"].get("SentTimestamp"),
                "receive_count": int(msg["Attributes"].get("ApproximateReceiveCount", 0)),
                "first_receive": msg["Attributes"].get("ApproximateFirstReceiveTimestamp")
            })

        return sampled

    def redrive_messages(
        self,
        queue_type: str = "standard",
        target_queue_name: str = "manga-fm-normal",
        max_messages: int = 100,
        filter_fn: callable = None
    ) -> dict:
        """
        Redrive messages from DLQ back to a source queue for reprocessing.

        Args:
            queue_type: Which DLQ to drain
            target_queue_name: Destination queue for reprocessing
            max_messages: Maximum messages to redrive
            filter_fn: Optional filter — only redrive if filter_fn(body) returns True

        Returns:
            Summary with counts of redriven, filtered, and failed messages
        """
        dlq_url = self._dlq_urls[queue_type]
        target_url = self._get_queue_url(target_queue_name)

        redriven = 0
        filtered = 0
        failed = 0
        processed = 0

        while processed < max_messages:
            batch_size = min(10, max_messages - processed)
            response = self._sqs.receive_message(
                QueueUrl=dlq_url,
                MaxNumberOfMessages=batch_size,
                VisibilityTimeout=30,
                MessageAttributeNames=["All"],
                WaitTimeSeconds=1
            )

            messages = response.get("Messages", [])
            if not messages:
                break  # DLQ is empty

            for msg in messages:
                processed += 1
                body = json.loads(msg["Body"])

                # Apply optional filter
                if filter_fn and not filter_fn(body):
                    filtered += 1
                    # Release visibility so message stays in DLQ
                    self._sqs.change_message_visibility(
                        QueueUrl=dlq_url,
                        ReceiptHandle=msg["ReceiptHandle"],
                        VisibilityTimeout=0
                    )
                    continue

                try:
                    # Send to target queue
                    send_kwargs = {
                        "QueueUrl": target_url,
                        "MessageBody": msg["Body"],
                        "MessageAttributes": {
                            k: v for k, v in msg.get("MessageAttributes", {}).items()
                        }
                    }
                    # Add FIFO attributes if target is FIFO
                    if target_queue_name.endswith(".fifo"):
                        send_kwargs["MessageGroupId"] = body.get(
                            "message_group_id", "redrive"
                        )
                        send_kwargs["MessageDeduplicationId"] = (
                            f"redrive-{msg['MessageId']}"
                        )

                    self._sqs.send_message(**send_kwargs)

                    # Delete from DLQ after successful send
                    self._sqs.delete_message(
                        QueueUrl=dlq_url,
                        ReceiptHandle=msg["ReceiptHandle"]
                    )
                    redriven += 1

                except Exception as e:
                    logger.error(f"Redrive failed for {msg['MessageId']}: {e}")
                    failed += 1

        summary = {
            "redriven": redriven,
            "filtered": filtered,
            "failed": failed,
            "total_processed": processed
        }
        logger.info(f"DLQ redrive complete: {summary}")
        return summary

    def check_and_alert(self, threshold: int = 50):
        """
        Check DLQ depth and send SNS alert if threshold exceeded.
        Called periodically by EventBridge scheduled rule.
        """
        for queue_type in ["standard", "fifo"]:
            depth = self.get_dlq_depth(queue_type)

            # Emit CloudWatch metric
            self._cloudwatch.put_metric_data(
                Namespace="MangaAssist/SQS",
                MetricData=[{
                    "MetricName": "DLQDepth",
                    "Value": depth,
                    "Unit": "Count",
                    "Dimensions": [
                        {"Name": "QueueType", "Value": queue_type},
                        {"Name": "Service", "Value": "MangaAssist"}
                    ]
                }]
            )

            if depth > threshold:
                self._sns.publish(
                    TopicArn=self._alert_topic_arn,
                    Subject=f"[MangaAssist] DLQ Alert: {queue_type} depth = {depth}",
                    Message=json.dumps({
                        "alert": "DLQ depth threshold exceeded",
                        "queue_type": queue_type,
                        "depth": depth,
                        "threshold": threshold,
                        "timestamp": datetime.utcnow().isoformat(),
                        "action_required": (
                            "Investigate failed FM requests. Common causes: "
                            "Bedrock throttling, malformed requests, consumer timeouts. "
                            "Sample messages with DLQManager.sample_dlq_messages() "
                            "and redrive after root cause is resolved."
                        )
                    }, indent=2)
                )

1.4 Visibility Timeout Strategy

Visibility Timeout Guidelines for FM Request Queues:

Queue: manga-fm-high-priority
  Model: Haiku (fast)
  Expected processing: 2-5s
  Visibility timeout: 60s
  Rationale: 12x headroom for cold starts and transient delays.
             If Lambda times out at 30s, message reappears quickly.

Queue: manga-fm-normal
  Model: Haiku or Sonnet
  Expected processing: 5-30s
  Visibility timeout: 120s
  Rationale: Sonnet can take 10-15s; add buffer for retries within
             the consumer. 2 min prevents premature re-delivery.

Queue: manga-fm-low-priority
  Model: Sonnet (complex analysis)
  Expected processing: 30s-5min
  Visibility timeout: 300s
  Rationale: Long-running enrichment tasks. If ECS task takes longer,
             it extends visibility via ChangeMessageVisibility API.

Queue: manga-fm-ordered.fifo
  Model: Varies
  Expected processing: 2-30s
  Visibility timeout: 60s
  Rationale: Per-session ordering means blocking. Keep timeout tight
             so next message in group processes promptly on failure.

DLQ: manga-fm-dlq / manga-fm-dlq.fifo
  Visibility timeout: 30s (only for sampling/investigation)
  Retention: 14 days (maximum for forensic analysis)

2. Request Validation Schemas and Mapping Templates

2.1 RequestSchema — Comprehensive Validation Framework

"""
Comprehensive request validation framework for MangaAssist FM API.
Handles JSON Schema validation, Unicode-aware sanitization,
content-length checks, and request normalization.
"""
import re
import json
import logging
from typing import Optional
from dataclasses import dataclass, field

import jsonschema
from jsonschema import Draft7Validator, FormatChecker

logger = logging.getLogger(__name__)


@dataclass
class ValidationResult:
    """Result of a request validation operation."""
    valid: bool
    errors: list = field(default_factory=list)
    warnings: list = field(default_factory=list)
    normalized_body: Optional[dict] = None
    estimated_tokens: int = 0


class RequestSchema:
    """
    Request validation engine for MangaAssist FM API endpoints.

    Supports:
    - JSON Schema Draft-7 validation
    - Unicode-aware string validation (CJK characters)
    - Token estimation for cost prediction
    - Request normalization and default injection
    - Custom format validators (manga_id, isbn, etc.)

    Usage:
        schema = RequestSchema()
        result = schema.validate("manga_query", request_body)
        if result.valid:
            # Use result.normalized_body for FM invocation
            pass
        else:
            # Return result.errors to client
            pass
    """

    # Approximate token-to-character ratios
    CHARS_PER_TOKEN = {
        "en": 4.0,     # English: ~4 chars per token
        "ja": 1.5,     # Japanese: ~1.5 chars per token (CJK is dense)
        "mixed": 2.5   # Mixed content estimate
    }

    # Maximum input tokens per model to prevent excessive cost
    MAX_INPUT_TOKENS = {
        "anthropic.claude-3-sonnet-20240229-v1:0": 200000,
        "anthropic.claude-3-haiku-20240307-v1:0": 200000,
    }

    # Practical per-request token limits for cost control
    COST_GUARD_TOKENS = {
        "anthropic.claude-3-sonnet-20240229-v1:0": 8000,   # ~$0.024 input
        "anthropic.claude-3-haiku-20240307-v1:0": 16000,    # ~$0.004 input
    }

    SCHEMAS = {
        "manga_query": {
            "$schema": "http://json-schema.org/draft-07/schema#",
            "title": "MangaAssist Query Request",
            "type": "object",
            "required": ["query"],
            "properties": {
                "query": {
                    "type": "string",
                    "minLength": 1,
                    "maxLength": 8000,
                    "description": "User query text. Supports Japanese Unicode (Hiragana, Katakana, Kanji)."
                },
                "model": {
                    "type": "string",
                    "enum": ["sonnet", "haiku"],
                    "default": "haiku",
                    "description": "Model selection. Haiku for quick lookups, Sonnet for detailed analysis."
                },
                "max_tokens": {
                    "type": "integer",
                    "minimum": 1,
                    "maximum": 4096,
                    "default": 512
                },
                "temperature": {
                    "type": "number",
                    "minimum": 0.0,
                    "maximum": 1.0,
                    "default": 0.5
                },
                "conversation_id": {
                    "type": "string",
                    "format": "uuid"
                },
                "language": {
                    "type": "string",
                    "enum": ["en", "ja", "auto"],
                    "default": "auto"
                },
                "context": {
                    "type": "object",
                    "properties": {
                        "user_id": {"type": "string"},
                        "session_id": {"type": "string"},
                        "previous_manga": {
                            "type": "array",
                            "items": {"type": "string"},
                            "maxItems": 10
                        }
                    }
                }
            },
            "additionalProperties": False
        },

        "batch_analysis": {
            "$schema": "http://json-schema.org/draft-07/schema#",
            "title": "MangaAssist Batch Analysis Request",
            "type": "object",
            "required": ["items"],
            "properties": {
                "items": {
                    "type": "array",
                    "minItems": 1,
                    "maxItems": 100,
                    "items": {
                        "type": "object",
                        "required": ["manga_id", "analysis_type"],
                        "properties": {
                            "manga_id": {
                                "type": "string",
                                "pattern": "^MNG-[A-Z0-9]{8}$"
                            },
                            "analysis_type": {
                                "type": "string",
                                "enum": [
                                    "genre_classification",
                                    "content_summary",
                                    "similar_series",
                                    "reading_level",
                                    "full_enrichment"
                                ]
                            },
                            "additional_context": {
                                "type": "string",
                                "maxLength": 2000
                            }
                        },
                        "additionalProperties": False
                    }
                },
                "callback_url": {
                    "type": "string",
                    "format": "uri",
                    "pattern": "^https://"
                },
                "priority": {
                    "type": "string",
                    "enum": ["high", "normal", "low"],
                    "default": "normal"
                },
                "notification": {
                    "type": "object",
                    "properties": {
                        "sns_topic_arn": {"type": "string"},
                        "email": {"type": "string", "format": "email"}
                    }
                }
            },
            "additionalProperties": False
        },

        "streaming_chat": {
            "$schema": "http://json-schema.org/draft-07/schema#",
            "title": "MangaAssist Streaming Chat Request",
            "type": "object",
            "required": ["messages"],
            "properties": {
                "messages": {
                    "type": "array",
                    "minItems": 1,
                    "maxItems": 50,
                    "items": {
                        "type": "object",
                        "required": ["role", "content"],
                        "properties": {
                            "role": {
                                "type": "string",
                                "enum": ["user", "assistant"]
                            },
                            "content": {
                                "type": "string",
                                "minLength": 1,
                                "maxLength": 16000
                            }
                        }
                    }
                },
                "model": {
                    "type": "string",
                    "enum": ["sonnet", "haiku"],
                    "default": "sonnet"
                },
                "max_tokens": {
                    "type": "integer",
                    "minimum": 1,
                    "maximum": 4096,
                    "default": 2048
                },
                "stream": {
                    "type": "boolean",
                    "default": True
                },
                "session_id": {
                    "type": "string",
                    "format": "uuid"
                }
            },
            "additionalProperties": False
        }
    }

    def __init__(self):
        """Initialize validators with custom format checkers."""
        self._format_checker = FormatChecker()
        self._register_custom_formats()

        self._validators = {}
        for name, schema in self.SCHEMAS.items():
            self._validators[name] = Draft7Validator(
                schema,
                format_checker=self._format_checker
            )

    def _register_custom_formats(self):
        """Register custom format validators for manga-specific data."""

        @self._format_checker.checks("uuid", raises=jsonschema.FormatError)
        def check_uuid(value):
            pattern = r'^[a-f0-9]{8}-[a-f0-9]{4}-[a-f0-9]{4}-[a-f0-9]{4}-[a-f0-9]{12}$'
            if not re.match(pattern, value):
                raise jsonschema.FormatError(f"'{value}' is not a valid UUID")
            return True

        @self._format_checker.checks("manga_id", raises=jsonschema.FormatError)
        def check_manga_id(value):
            if not re.match(r'^MNG-[A-Z0-9]{8}$', value):
                raise jsonschema.FormatError(
                    f"'{value}' is not a valid manga ID (expected MNG-XXXXXXXX)"
                )
            return True

        @self._format_checker.checks("isbn", raises=jsonschema.FormatError)
        def check_isbn(value):
            clean = value.replace("-", "")
            if len(clean) not in (10, 13) or not clean.replace("X", "").isdigit():
                raise jsonschema.FormatError(f"'{value}' is not a valid ISBN")
            return True

    def validate(self, schema_name: str, body: dict) -> ValidationResult:
        """
        Validate a request body against a named schema.

        Performs:
        1. JSON Schema structural validation
        2. Unicode content checks
        3. Token estimation for cost guard
        4. Request normalization (default injection)

        Args:
            schema_name: Schema key from SCHEMAS dict
            body: Parsed JSON request body

        Returns:
            ValidationResult with errors, warnings, and normalized body
        """
        validator = self._validators.get(schema_name)
        if not validator:
            return ValidationResult(
                valid=False,
                errors=[{"path": "$", "message": f"Unknown schema: {schema_name}"}]
            )

        # Step 1: JSON Schema validation
        errors = []
        for error in validator.iter_errors(body):
            errors.append({
                "path": ".".join(str(p) for p in error.absolute_path) or "$",
                "message": error.message,
                "validator": error.validator,
                "schema_path": list(error.schema_path)
            })

        if errors:
            return ValidationResult(valid=False, errors=errors)

        # Step 2: Unicode and content checks
        warnings = []
        content_warnings = self._check_unicode_content(body, schema_name)
        warnings.extend(content_warnings)

        # Step 3: Token estimation
        estimated_tokens = self._estimate_input_tokens(body, schema_name)

        model_key = body.get("model", "haiku")
        model_id_map = {
            "sonnet": "anthropic.claude-3-sonnet-20240229-v1:0",
            "haiku": "anthropic.claude-3-haiku-20240307-v1:0"
        }
        model_id = model_id_map.get(model_key, model_id_map["haiku"])
        cost_guard = self.COST_GUARD_TOKENS.get(model_id, 8000)

        if estimated_tokens > cost_guard:
            warnings.append({
                "code": "TOKEN_LIMIT_WARNING",
                "message": (
                    f"Estimated input tokens ({estimated_tokens}) exceeds "
                    f"cost guard limit ({cost_guard}) for {model_key}. "
                    f"Consider shortening the query or using a more cost-effective model."
                )
            })

        # Step 4: Normalize — inject defaults
        normalized = self._normalize_request(body, schema_name)

        return ValidationResult(
            valid=True,
            errors=[],
            warnings=warnings,
            normalized_body=normalized,
            estimated_tokens=estimated_tokens
        )

    def _check_unicode_content(self, body: dict, schema_name: str) -> list:
        """Check for Unicode-related issues in request content."""
        warnings = []

        # Extract text fields to check
        text_fields = []
        if schema_name == "manga_query":
            text_fields.append(("query", body.get("query", "")))
        elif schema_name == "streaming_chat":
            for i, msg in enumerate(body.get("messages", [])):
                text_fields.append((f"messages[{i}].content", msg.get("content", "")))

        for field_name, text in text_fields:
            if not text:
                continue

            # Check for mixed scripts that might indicate injection
            has_cjk = bool(re.search(r'[\u4e00-\u9fff\u3040-\u309f\u30a0-\u30ff]', text))
            has_cyrillic = bool(re.search(r'[\u0400-\u04ff]', text))
            has_arabic = bool(re.search(r'[\u0600-\u06ff]', text))

            unusual_scripts = sum([has_cyrillic, has_arabic])
            if has_cjk and unusual_scripts > 0:
                warnings.append({
                    "code": "MIXED_SCRIPT_WARNING",
                    "field": field_name,
                    "message": "Unusual script mixing detected. Content contains CJK with other non-Latin scripts."
                })

            # Check for control characters (except common whitespace)
            control_chars = re.findall(r'[\x00-\x08\x0b\x0c\x0e-\x1f\x7f]', text)
            if control_chars:
                warnings.append({
                    "code": "CONTROL_CHAR_WARNING",
                    "field": field_name,
                    "message": f"Content contains {len(control_chars)} control character(s)."
                })

        return warnings

    def _estimate_input_tokens(self, body: dict, schema_name: str) -> int:
        """Estimate the number of input tokens for cost prediction."""
        total_chars = 0

        if schema_name == "manga_query":
            query = body.get("query", "")
            total_chars = len(query)
        elif schema_name == "streaming_chat":
            for msg in body.get("messages", []):
                total_chars += len(msg.get("content", ""))
        elif schema_name == "batch_analysis":
            for item in body.get("items", []):
                total_chars += len(item.get("additional_context", ""))
                total_chars += 200  # Approximate overhead per item prompt

        # Detect primary language for token estimation
        language = body.get("language", "auto")
        if language == "auto":
            sample = body.get("query", "") or ""
            cjk_count = len(re.findall(r'[\u4e00-\u9fff\u3040-\u309f\u30a0-\u30ff]', sample))
            if cjk_count > len(sample) * 0.3:
                language = "ja"
            else:
                language = "en"

        chars_per_token = self.CHARS_PER_TOKEN.get(language, self.CHARS_PER_TOKEN["mixed"])
        estimated_tokens = int(total_chars / chars_per_token)

        # Add system prompt overhead (~200 tokens for MangaAssist)
        estimated_tokens += 200

        return estimated_tokens

    def _normalize_request(self, body: dict, schema_name: str) -> dict:
        """
        Inject default values and normalize the request body.
        Returns a new dict without modifying the original.
        """
        schema = self.SCHEMAS.get(schema_name, {})
        properties = schema.get("properties", {})
        normalized = dict(body)

        for prop_name, prop_schema in properties.items():
            if prop_name not in normalized and "default" in prop_schema:
                normalized[prop_name] = prop_schema["default"]

        # Normalize nested item defaults for batch requests
        if schema_name == "batch_analysis" and "items" in normalized:
            item_schema = properties.get("items", {}).get("items", {}).get("properties", {})
            for item in normalized["items"]:
                for prop_name, prop_def in item_schema.items():
                    if prop_name not in item and "default" in prop_def:
                        item[prop_name] = prop_def["default"]

        return normalized

2.2 API Gateway Model Definitions (CloudFormation)

# CloudFormation resource definitions for API Gateway request models
# These models enable API Gateway-level validation before Lambda is invoked

Resources:
  MangaQueryModel:
    Type: AWS::ApiGateway::Model
    Properties:
      RestApiId: !Ref MangaFMApi
      ContentType: "application/json"
      Name: "MangaQueryModel"
      Description: "Validates manga query request structure"
      Schema:
        $schema: "http://json-schema.org/draft-07/schema#"
        title: "MangaQueryRequest"
        type: object
        required:
          - query
        properties:
          query:
            type: string
            minLength: 1
            maxLength: 4000
          model:
            type: string
            enum:
              - sonnet
              - haiku
          max_tokens:
            type: integer
            minimum: 1
            maximum: 4096
          temperature:
            type: number
            minimum: 0.0
            maximum: 1.0
          conversation_id:
            type: string
            pattern: "^[a-f0-9-]{36}$"
          language:
            type: string
            enum:
              - en
              - ja
              - auto

  BatchAnalysisModel:
    Type: AWS::ApiGateway::Model
    Properties:
      RestApiId: !Ref MangaFMApi
      ContentType: "application/json"
      Name: "BatchAnalysisModel"
      Description: "Validates batch manga analysis request"
      Schema:
        $schema: "http://json-schema.org/draft-07/schema#"
        title: "BatchAnalysisRequest"
        type: object
        required:
          - items
        properties:
          items:
            type: array
            minItems: 1
            maxItems: 100
            items:
              type: object
              required:
                - manga_id
                - analysis_type
              properties:
                manga_id:
                  type: string
                  pattern: "^MNG-[A-Z0-9]{8}$"
                analysis_type:
                  type: string
                  enum:
                    - genre_classification
                    - content_summary
                    - similar_series
                    - reading_level
                    - full_enrichment
          callback_url:
            type: string
            pattern: "^https://"
          priority:
            type: string
            enum:
              - high
              - normal
              - low

  RequestValidator:
    Type: AWS::ApiGateway::RequestValidator
    Properties:
      RestApiId: !Ref MangaFMApi
      Name: "MangaRequestValidator"
      ValidateRequestBody: true
      ValidateRequestParameters: true

3. MappingTemplateEngine — VTL Template Processing

"""
Mapping template engine for API Gateway VTL (Velocity Template Language)
request/response transformation. Handles the logic for building and
managing VTL templates used in API Gateway integration mappings.
"""
import json
import re
import logging
from typing import Optional
from string import Template

logger = logging.getLogger(__name__)


class MappingTemplateEngine:
    """
    Manages VTL mapping templates for API Gateway integrations.
    Generates request and response mapping templates that transform
    API Gateway payloads to/from Lambda and Bedrock formats.

    MangaAssist uses mapping templates to:
    1. Enrich incoming requests with context (request ID, timestamp, source IP)
    2. Transform client-facing API shapes into internal Lambda event formats
    3. Filter and reshape Lambda responses for client consumption
    4. Handle error responses with appropriate status codes

    Usage:
        engine = MappingTemplateEngine()

        # Get the request template for manga query endpoint
        template = engine.get_request_template("manga_query")

        # Get the response template with status code mapping
        resp_template = engine.get_response_template("manga_query", status_code=200)
    """

    # Request mapping templates
    REQUEST_TEMPLATES = {
        "manga_query": '''## MangaAssist Query - Request Mapping Template
## Transforms API Gateway request into Lambda event format
## Preserves Unicode content for Japanese manga queries
#set($inputRoot = $input.path('$'))
#set($context = $context)
{
    "httpMethod": "POST",
    "resource": "/v1/manga/query",
    "body": $input.json('$'),
    "headers": {
        "Content-Type": "$input.params().header.get('Content-Type')",
        "Accept-Language": "$input.params().header.get('Accept-Language')",
        "X-Api-Key": "$input.params().header.get('X-Api-Key')",
        "X-Request-Id": "$context.requestId"
    },
    "requestContext": {
        "requestId": "$context.requestId",
        "stage": "$context.stage",
        "identity": {
            "sourceIp": "$context.identity.sourceIp",
            "userAgent": "$context.identity.userAgent"
        },
        "requestTime": "$context.requestTime",
        "requestTimeEpoch": $context.requestTimeEpoch,
        "apiId": "$context.apiId"
    },
    "stageVariables": {
#if($stageVariables)
        "environment": "$stageVariables.get('environment')",
        "bedrock_region": "$stageVariables.get('bedrock_region')"
#else
        "environment": "prod",
        "bedrock_region": "us-east-1"
#end
    }
}''',

        "batch_analysis": '''## MangaAssist Batch Analysis - Request Mapping Template
## Transforms batch submission into Lambda event with SQS routing hints
#set($inputRoot = $input.path('$'))
{
    "httpMethod": "POST",
    "resource": "/v1/manga/batch",
    "body": $input.json('$'),
    "headers": {
        "Content-Type": "$input.params().header.get('Content-Type')",
        "X-Api-Key": "$input.params().header.get('X-Api-Key')",
        "X-Request-Id": "$context.requestId",
        "X-Batch-Size": "$inputRoot.items.size()"
    },
    "requestContext": {
        "requestId": "$context.requestId",
        "stage": "$context.stage",
        "requestTimeEpoch": $context.requestTimeEpoch
    },
    "routingHints": {
        "itemCount": $inputRoot.items.size(),
        "priority": #if($inputRoot.priority)"$inputRoot.priority"#else"normal"#end,
        "hasCallback": #if($inputRoot.callback_url)true#else false#end
    }
}''',

        "status_check": '''## MangaAssist Status Check - Request Mapping Template
## Passes path parameter for async request status lookup
{
    "httpMethod": "GET",
    "resource": "/v1/manga/status/{request_id}",
    "pathParameters": {
        "request_id": "$input.params('request_id')"
    },
    "requestContext": {
        "requestId": "$context.requestId",
        "stage": "$context.stage"
    }
}'''
    }

    # Response mapping templates
    RESPONSE_TEMPLATES = {
        "manga_query_200": '''## MangaAssist Query - 200 Response Template
## Reshapes Lambda response for client consumption
#set($inputRoot = $input.path('$'))
#set($body = $input.path('$.body'))
{
    "success": true,
    "data": {
        "response": $body.response,
        "model": "$body.model",
        "usage": {
            "input_tokens": $body.usage.input_tokens,
            "output_tokens": $body.usage.output_tokens
        }
    },
    "metadata": {
        "request_id": "$context.requestId",
        "timestamp": "$context.requestTime",
        "latency_ms": #if($body.latency_ms)$body.latency_ms#else 0#end
    }
}''',

        "manga_query_400": '''## MangaAssist Query - 400 Error Response Template
#set($inputRoot = $input.path('$'))
{
    "success": false,
    "error": {
        "code": "VALIDATION_ERROR",
        "message": "$inputRoot.body.error",
        "details": $inputRoot.body.details
    },
    "metadata": {
        "request_id": "$context.requestId",
        "timestamp": "$context.requestTime"
    }
}''',

        "manga_query_429": '''## MangaAssist Query - 429 Throttle Response Template
{
    "success": false,
    "error": {
        "code": "RATE_LIMITED",
        "message": "Request rate exceeded. The foundation model is currently throttled.",
        "retry_after_seconds": 5
    },
    "metadata": {
        "request_id": "$context.requestId"
    }
}''',

        "manga_query_500": '''## MangaAssist Query - 500 Error Response Template
{
    "success": false,
    "error": {
        "code": "INTERNAL_ERROR",
        "message": "An unexpected error occurred processing your request."
    },
    "metadata": {
        "request_id": "$context.requestId"
    }
}''',

        "batch_202": '''## MangaAssist Batch - 202 Accepted Response Template
#set($inputRoot = $input.path('$'))
{
    "success": true,
    "data": {
        "message": "Batch submitted for processing",
        "item_count": $inputRoot.body.item_count,
        "request_ids": $inputRoot.body.request_ids,
        "estimated_completion": "Results will be available within 5-30 minutes"
    },
    "metadata": {
        "request_id": "$context.requestId",
        "timestamp": "$context.requestTime"
    }
}'''
    }

    def __init__(self):
        """Initialize the mapping template engine."""
        self._request_templates = dict(self.REQUEST_TEMPLATES)
        self._response_templates = dict(self.RESPONSE_TEMPLATES)

    def get_request_template(self, endpoint: str) -> Optional[str]:
        """
        Get the request mapping template for an endpoint.

        Args:
            endpoint: Endpoint key (e.g., 'manga_query', 'batch_analysis')

        Returns:
            VTL template string or None if not found
        """
        return self._request_templates.get(endpoint)

    def get_response_template(
        self, endpoint: str, status_code: int = 200
    ) -> Optional[str]:
        """
        Get the response mapping template for an endpoint + status code.

        Args:
            endpoint: Endpoint key
            status_code: HTTP status code

        Returns:
            VTL template string or None if not found
        """
        key = f"{endpoint}_{status_code}"
        return self._response_templates.get(key)

    def register_template(
        self, name: str, template: str, template_type: str = "request"
    ):
        """Register a custom mapping template."""
        if template_type == "request":
            self._request_templates[name] = template
        elif template_type == "response":
            self._response_templates[name] = template
        else:
            raise ValueError(f"Invalid template_type: {template_type}")

    def validate_template_syntax(self, template: str) -> dict:
        """
        Basic syntax validation of a VTL template.
        Checks for common VTL errors without full parsing.

        Returns:
            Dict with 'valid' bool and optional 'issues' list
        """
        issues = []

        # Check balanced #if/#end blocks
        if_count = len(re.findall(r'#if\s*\(', template))
        end_count = len(re.findall(r'#end\b', template))
        foreach_count = len(re.findall(r'#foreach\s*\(', template))

        expected_ends = if_count + foreach_count
        if end_count != expected_ends:
            issues.append({
                "type": "UNBALANCED_BLOCKS",
                "message": (
                    f"Found {if_count} #if + {foreach_count} #foreach "
                    f"but only {end_count} #end directives"
                )
            })

        # Check for unclosed $input references
        unclosed = re.findall(r'\$input\.[^,}\s\)]+(?=[^"\'}\s])', template)
        if unclosed:
            issues.append({
                "type": "POTENTIAL_UNCLOSED_REFERENCE",
                "message": f"Potentially unclosed references: {unclosed[:5]}"
            })

        # Check for JSON syntax issues in static parts
        # Extract static JSON portions between VTL directives
        static_parts = re.sub(r'#[a-z]+\([^)]*\)|#end|\$[a-zA-Z._\[\]()]+', '""', template)
        try:
            json.loads(static_parts)
        except json.JSONDecodeError as e:
            issues.append({
                "type": "JSON_SYNTAX",
                "message": f"Static JSON structure may have issues: {str(e)[:100]}"
            })

        return {
            "valid": len(issues) == 0,
            "issues": issues
        }

    def generate_integration_config(self, endpoint: str) -> dict:
        """
        Generate the complete API Gateway integration configuration
        for an endpoint, including all mapping templates and status codes.

        Returns:
            Dict ready for CloudFormation/CDK integration configuration
        """
        request_template = self.get_request_template(endpoint)

        # Collect all response templates for this endpoint
        response_templates = {}
        status_codes = [200, 400, 429, 500]
        for code in status_codes:
            template = self.get_response_template(endpoint, code)
            if template:
                response_templates[str(code)] = template

        return {
            "type": "AWS_PROXY",  # or "AWS" for non-proxy
            "requestTemplates": {
                "application/json": request_template
            } if request_template else {},
            "integrationResponses": [
                {
                    "statusCode": code,
                    "responseTemplates": {
                        "application/json": response_templates.get(str(code), "")
                    },
                    "selectionPattern": self._status_pattern(code)
                }
                for code in status_codes
                if str(code) in response_templates
            ]
        }

    def _status_pattern(self, code: int) -> str:
        """Get the regex selection pattern for a status code."""
        patterns = {
            200: "",                # Default (no pattern = default)
            400: ".*4\\d{2}.*",
            429: ".*429.*",
            500: ".*5\\d{2}.*"
        }
        return patterns.get(code, "")

4. Batch Processing Patterns for Bulk Manga Analysis

4.1 SQSFMBatchProcessor

"""
SQS-driven batch processor for bulk manga analysis using Bedrock.
Handles queue consumption, concurrent FM invocation, result collection,
and callback delivery.
"""
import json
import time
import logging
import traceback
from typing import Optional
from dataclasses import dataclass, field
from concurrent.futures import ThreadPoolExecutor, as_completed

import boto3
from botocore.config import Config
from botocore.exceptions import ClientError

logger = logging.getLogger(__name__)


@dataclass
class BatchResult:
    """Result of processing a batch of FM requests."""
    total: int = 0
    succeeded: int = 0
    failed: int = 0
    throttled: int = 0
    total_input_tokens: int = 0
    total_output_tokens: int = 0
    total_cost_usd: float = 0.0
    processing_time_seconds: float = 0.0
    results: list = field(default_factory=list)
    errors: list = field(default_factory=list)


class SQSFMBatchProcessor:
    """
    Processes FM requests from SQS queues in configurable batches.
    Designed to run on ECS Fargate or EC2 as a long-running worker.

    Features:
    - Concurrent FM invocations with configurable parallelism
    - Automatic visibility timeout extension for long batches
    - Result storage to S3 with DynamoDB status tracking
    - Callback notification via SNS or webhook
    - Graceful shutdown with in-flight request completion
    - Cost tracking per batch

    Architecture:
        SQS Queue --> SQSFMBatchProcessor --> Bedrock InvokeModel
                                          --> S3 (results)
                                          --> DynamoDB (status)
                                          --> SNS/Webhook (notification)
    """

    PRICING = {
        "anthropic.claude-3-sonnet-20240229-v1:0": (3.00, 15.00),
        "anthropic.claude-3-haiku-20240307-v1:0": (0.25, 1.25),
    }

    def __init__(
        self,
        queue_url: str,
        region: str = "us-east-1",
        max_concurrent: int = 5,
        batch_size: int = 10,
        visibility_timeout: int = 120
    ):
        self._queue_url = queue_url
        self._batch_size = min(batch_size, 10)  # SQS max per receive
        self._visibility_timeout = visibility_timeout
        self._max_concurrent = max_concurrent
        self._running = True

        # AWS clients
        self._sqs = boto3.client("sqs", region_name=region)
        bedrock_config = Config(
            region_name=region,
            retries={"max_attempts": 3, "mode": "adaptive"},
            read_timeout=60,
            max_pool_connections=max_concurrent
        )
        self._bedrock = boto3.client("bedrock-runtime", config=bedrock_config)
        self._s3 = boto3.client("s3", region_name=region)
        self._dynamodb = boto3.resource("dynamodb", region_name=region)
        self._sns = boto3.client("sns", region_name=region)
        self._status_table = self._dynamodb.Table("manga-fm-request-status")
        self._executor = ThreadPoolExecutor(max_workers=max_concurrent)

    def process_loop(self, max_iterations: int = None):
        """
        Main processing loop. Continuously polls SQS and processes batches.

        Args:
            max_iterations: Maximum polling iterations (None for infinite)
        """
        iteration = 0
        logger.info(
            f"Starting batch processor: queue={self._queue_url}, "
            f"concurrency={self._max_concurrent}, batch_size={self._batch_size}"
        )

        while self._running:
            if max_iterations and iteration >= max_iterations:
                break
            iteration += 1

            try:
                messages = self._receive_batch()
                if not messages:
                    time.sleep(1)  # Brief pause on empty queue
                    continue

                batch_result = self._process_batch(messages)
                logger.info(
                    f"Batch complete: {batch_result.succeeded}/{batch_result.total} "
                    f"succeeded, {batch_result.failed} failed, "
                    f"cost=${batch_result.total_cost_usd:.4f}"
                )

            except Exception as e:
                logger.error(f"Batch processing error: {e}\n{traceback.format_exc()}")
                time.sleep(5)  # Back off on unexpected errors

    def _receive_batch(self) -> list:
        """Receive a batch of messages from SQS with long polling."""
        response = self._sqs.receive_message(
            QueueUrl=self._queue_url,
            MaxNumberOfMessages=self._batch_size,
            VisibilityTimeout=self._visibility_timeout,
            WaitTimeSeconds=20,              # Long polling
            MessageAttributeNames=["All"],
            AttributeNames=["All"]
        )
        return response.get("Messages", [])

    def _process_batch(self, messages: list) -> BatchResult:
        """Process a batch of SQS messages concurrently."""
        start_time = time.time()
        batch_result = BatchResult(total=len(messages))

        # Submit all messages for concurrent processing
        futures = {}
        for msg in messages:
            future = self._executor.submit(self._process_single, msg)
            futures[future] = msg

        # Collect results
        for future in as_completed(futures, timeout=self._visibility_timeout - 10):
            msg = futures[future]
            try:
                result = future.result()
                batch_result.succeeded += 1
                batch_result.total_input_tokens += result.get("input_tokens", 0)
                batch_result.total_output_tokens += result.get("output_tokens", 0)
                batch_result.total_cost_usd += result.get("cost_usd", 0)
                batch_result.results.append(result)

                # Delete message from queue on success
                self._sqs.delete_message(
                    QueueUrl=self._queue_url,
                    ReceiptHandle=msg["ReceiptHandle"]
                )

            except ClientError as e:
                error_code = e.response["Error"]["Code"]
                if error_code == "ThrottlingException":
                    batch_result.throttled += 1
                    # Do NOT delete — let visibility timeout expire for retry
                    logger.warning(f"Throttled: {msg['MessageId']}")
                else:
                    batch_result.failed += 1
                    batch_result.errors.append({
                        "message_id": msg["MessageId"],
                        "error": str(e)
                    })
            except Exception as e:
                batch_result.failed += 1
                batch_result.errors.append({
                    "message_id": msg["MessageId"],
                    "error": str(e)
                })

        batch_result.processing_time_seconds = time.time() - start_time
        return batch_result

    def _process_single(self, message: dict) -> dict:
        """Process a single SQS message through Bedrock."""
        body = json.loads(message["Body"])
        request_id = body.get("request_id", message["MessageId"])

        # Update status to processing
        self._update_status(request_id, "processing")

        prompt = body["prompt"]
        model_id = body.get("model_id", "anthropic.claude-3-haiku-20240307-v1:0")
        max_tokens = body.get("max_tokens", 1024)

        # Build Bedrock request
        request_body = json.dumps({
            "anthropic_version": "bedrock-2023-05-31",
            "max_tokens": max_tokens,
            "messages": [{"role": "user", "content": prompt}],
            "system": (
                "You are a manga analysis system. Provide structured, "
                "accurate analysis of manga content. Respond in JSON format "
                "when analysis_type requires structured output."
            ),
            "temperature": 0.2
        })

        response = self._bedrock.invoke_model(
            modelId=model_id,
            contentType="application/json",
            accept="application/json",
            body=request_body
        )

        response_body = json.loads(response["body"].read())
        text = response_body["content"][0]["text"]
        input_tokens = response_body["usage"]["input_tokens"]
        output_tokens = response_body["usage"]["output_tokens"]

        # Calculate cost
        pricing = self.PRICING.get(model_id, (0, 0))
        cost_usd = (
            (input_tokens / 1_000_000) * pricing[0] +
            (output_tokens / 1_000_000) * pricing[1]
        )

        # Store result in S3
        result_key = f"fm-results/{request_id}.json"
        result_data = {
            "request_id": request_id,
            "response": text,
            "model_id": model_id,
            "input_tokens": input_tokens,
            "output_tokens": output_tokens,
            "cost_usd": cost_usd,
            "metadata": body.get("metadata", {}),
            "processed_at": time.time()
        }

        self._s3.put_object(
            Bucket="manga-fm-results",
            Key=result_key,
            Body=json.dumps(result_data),
            ContentType="application/json"
        )

        # Update status to completed
        self._update_status(request_id, "completed", result_key=result_key)

        # Send callback notification if requested
        callback_url = body.get("callback_url")
        if callback_url:
            self._send_callback(callback_url, result_data)

        return result_data

    def _update_status(
        self, request_id: str, status: str, result_key: str = None
    ):
        """Update request status in DynamoDB."""
        try:
            update_expr = "SET #s = :status, updated_at = :ts"
            expr_values = {
                ":status": status,
                ":ts": int(time.time())
            }
            expr_names = {"#s": "status"}

            if result_key:
                update_expr += ", result_key = :rk"
                expr_values[":rk"] = result_key

            self._status_table.update_item(
                Key={"request_id": request_id},
                UpdateExpression=update_expr,
                ExpressionAttributeNames=expr_names,
                ExpressionAttributeValues=expr_values
            )
        except Exception as e:
            logger.warning(f"Status update failed for {request_id}: {e}")

    def _send_callback(self, url: str, result: dict):
        """Send completion callback to the requester."""
        import urllib.request
        try:
            req = urllib.request.Request(
                url,
                data=json.dumps(result).encode("utf-8"),
                headers={"Content-Type": "application/json"},
                method="POST"
            )
            urllib.request.urlopen(req, timeout=10)
        except Exception as e:
            logger.warning(f"Callback failed to {url}: {e}")

    def shutdown(self):
        """Gracefully stop the processor."""
        logger.info("Shutting down batch processor...")
        self._running = False
        self._executor.shutdown(wait=True, cancel_futures=False)
        logger.info("Batch processor stopped")

4.2 Step Functions Orchestration for Complex Batch Workflows

{
  "Comment": "MangaAssist Batch Analysis Workflow - Orchestrates multi-step manga analysis via Bedrock",
  "StartAt": "ValidateBatchInput",
  "States": {
    "ValidateBatchInput": {
      "Type": "Task",
      "Resource": "arn:aws:lambda:us-east-1:123456789012:function:manga-validate-batch",
      "ResultPath": "$.validation",
      "Next": "CheckValidation",
      "Retry": [
        {
          "ErrorEquals": ["Lambda.ServiceException"],
          "IntervalSeconds": 2,
          "MaxAttempts": 3,
          "BackoffRate": 2.0
        }
      ]
    },
    "CheckValidation": {
      "Type": "Choice",
      "Choices": [
        {
          "Variable": "$.validation.valid",
          "BooleanEquals": true,
          "Next": "SplitIntoBatches"
        }
      ],
      "Default": "ValidationFailed"
    },
    "SplitIntoBatches": {
      "Type": "Task",
      "Resource": "arn:aws:lambda:us-east-1:123456789012:function:manga-split-batch",
      "ResultPath": "$.batches",
      "Next": "ProcessBatchesMap"
    },
    "ProcessBatchesMap": {
      "Type": "Map",
      "ItemsPath": "$.batches.chunks",
      "MaxConcurrency": 5,
      "Iterator": {
        "StartAt": "InvokeBedrock",
        "States": {
          "InvokeBedrock": {
            "Type": "Task",
            "Resource": "arn:aws:states:::bedrock:invokeModel",
            "Parameters": {
              "ModelId.$": "$.model_id",
              "ContentType": "application/json",
              "Accept": "application/json",
              "Body": {
                "anthropic_version": "bedrock-2023-05-31",
                "max_tokens.$": "$.max_tokens",
                "messages": [
                  {
                    "role": "user",
                    "content.$": "$.prompt"
                  }
                ],
                "temperature": 0.2
              }
            },
            "ResultPath": "$.fm_response",
            "Next": "StoreResult",
            "Retry": [
              {
                "ErrorEquals": ["Bedrock.ThrottlingException"],
                "IntervalSeconds": 5,
                "MaxAttempts": 5,
                "BackoffRate": 2.0
              },
              {
                "ErrorEquals": ["Bedrock.ModelTimeoutException"],
                "IntervalSeconds": 10,
                "MaxAttempts": 3,
                "BackoffRate": 1.5
              }
            ],
            "Catch": [
              {
                "ErrorEquals": ["States.ALL"],
                "Next": "RecordFailure",
                "ResultPath": "$.error"
              }
            ]
          },
          "StoreResult": {
            "Type": "Task",
            "Resource": "arn:aws:lambda:us-east-1:123456789012:function:manga-store-result",
            "End": true
          },
          "RecordFailure": {
            "Type": "Task",
            "Resource": "arn:aws:lambda:us-east-1:123456789012:function:manga-record-failure",
            "End": true
          }
        }
      },
      "ResultPath": "$.batch_results",
      "Next": "AggregateResults"
    },
    "AggregateResults": {
      "Type": "Task",
      "Resource": "arn:aws:lambda:us-east-1:123456789012:function:manga-aggregate-results",
      "ResultPath": "$.summary",
      "Next": "NotifyCompletion"
    },
    "NotifyCompletion": {
      "Type": "Task",
      "Resource": "arn:aws:states:::sns:publish",
      "Parameters": {
        "TopicArn": "arn:aws:sns:us-east-1:123456789012:manga-fm-results",
        "Subject": "MangaAssist Batch Analysis Complete",
        "Message.$": "States.JsonToString($.summary)"
      },
      "End": true
    },
    "ValidationFailed": {
      "Type": "Task",
      "Resource": "arn:aws:states:::sns:publish",
      "Parameters": {
        "TopicArn": "arn:aws:sns:us-east-1:123456789012:manga-fm-alerts",
        "Subject": "MangaAssist Batch Validation Failed",
        "Message.$": "States.JsonToString($.validation.errors)"
      },
      "End": true
    }
  }
}

5. Visibility Timeout Extension Pattern

"""
Heartbeat-based visibility timeout extension for long-running FM requests.
Prevents SQS from re-delivering messages while processing is still active.
"""
import time
import threading
import logging

import boto3

logger = logging.getLogger(__name__)


class VisibilityExtender:
    """
    Automatically extends SQS message visibility timeout while processing.
    Runs a background thread that periodically extends visibility before
    the current timeout expires.

    This is critical for Bedrock requests that may take longer than the
    initial visibility timeout (e.g., Sonnet with large prompts).

    Usage:
        with VisibilityExtender(sqs_client, queue_url, receipt_handle, 120) as ext:
            # Process message — visibility auto-extends every 90 seconds
            result = invoke_bedrock(prompt)
            # On context exit, extension stops
    """

    def __init__(
        self,
        sqs_client,
        queue_url: str,
        receipt_handle: str,
        visibility_timeout: int = 120,
        extension_margin_seconds: int = 30
    ):
        self._sqs = sqs_client
        self._queue_url = queue_url
        self._receipt_handle = receipt_handle
        self._visibility_timeout = visibility_timeout
        self._margin = extension_margin_seconds
        self._stop_event = threading.Event()
        self._thread = None

    def __enter__(self):
        """Start the visibility extension thread."""
        self._thread = threading.Thread(
            target=self._extend_loop,
            daemon=True,
            name="visibility-extender"
        )
        self._thread.start()
        return self

    def __exit__(self, exc_type, exc_val, exc_tb):
        """Stop the visibility extension thread."""
        self._stop_event.set()
        if self._thread:
            self._thread.join(timeout=5)
        return False

    def _extend_loop(self):
        """Periodically extend visibility timeout until stopped."""
        interval = self._visibility_timeout - self._margin
        if interval <= 0:
            interval = self._visibility_timeout // 2

        while not self._stop_event.wait(timeout=interval):
            try:
                self._sqs.change_message_visibility(
                    QueueUrl=self._queue_url,
                    ReceiptHandle=self._receipt_handle,
                    VisibilityTimeout=self._visibility_timeout
                )
                logger.debug(
                    f"Extended visibility by {self._visibility_timeout}s "
                    f"for {self._receipt_handle[:20]}..."
                )
            except Exception as e:
                logger.error(f"Visibility extension failed: {e}")
                break  # Stop extending — message may be re-delivered

6. Request/Response Transformation Pipeline

flowchart LR
    subgraph Input["Client Request"]
        RAW["Raw HTTP Request<br/>(JSON body + headers)"]
    end

    subgraph Validation["Validation Layer"]
        SCHEMA["JSON Schema<br/>Validation"]
        UNICODE["Unicode<br/>Sanitization"]
        TOKEN["Token<br/>Estimation"]
        COST["Cost Guard<br/>Check"]
    end

    subgraph Transform["Transformation Layer"]
        VTL_REQ["VTL Request<br/>Mapping"]
        ENRICH["Context<br/>Enrichment"]
        ROUTE["Model<br/>Routing"]
    end

    subgraph FM["Foundation Model"]
        BEDROCK["Bedrock<br/>InvokeModel"]
    end

    subgraph Output["Response"]
        VTL_RESP["VTL Response<br/>Mapping"]
        FILTER["Response<br/>Filtering"]
        CLIENT["Client<br/>Response"]
    end

    RAW --> SCHEMA
    SCHEMA -->|"valid"| UNICODE
    SCHEMA -->|"invalid"| CLIENT
    UNICODE --> TOKEN
    TOKEN --> COST
    COST -->|"within budget"| VTL_REQ
    COST -->|"over budget"| CLIENT
    VTL_REQ --> ENRICH
    ENRICH --> ROUTE
    ROUTE --> BEDROCK
    BEDROCK --> VTL_RESP
    VTL_RESP --> FILTER
    FILTER --> CLIENT

    style Validation fill:#fff3e0,stroke:#f57c00
    style Transform fill:#e8f5e9,stroke:#388e3c
    style FM fill:#e3f2fd,stroke:#1565c0

7. Queue Monitoring and Auto-Scaling

"""
CloudWatch-based monitoring and auto-scaling configuration for
SQS-backed FM processing queues.
"""
import json
import boto3


class QueueMonitorConfig:
    """
    Generates CloudWatch alarms and auto-scaling policies
    for MangaAssist FM request queues.
    """

    @staticmethod
    def generate_cloudwatch_alarms() -> list:
        """Generate CloudWatch alarm definitions for queue monitoring."""
        return [
            {
                "AlarmName": "MangaFM-HighPriority-QueueDepth",
                "MetricName": "ApproximateNumberOfMessagesVisible",
                "Namespace": "AWS/SQS",
                "Dimensions": [
                    {"Name": "QueueName", "Value": "manga-fm-high-priority"}
                ],
                "Statistic": "Average",
                "Period": 60,
                "EvaluationPeriods": 2,
                "Threshold": 100,
                "ComparisonOperator": "GreaterThanThreshold",
                "AlarmActions": [
                    "arn:aws:sns:us-east-1:123456789012:manga-fm-alerts"
                ],
                "AlarmDescription": (
                    "High priority FM queue depth exceeds 100. "
                    "Indicates possible Bedrock throttling or consumer failure."
                )
            },
            {
                "AlarmName": "MangaFM-Normal-AgeOfOldestMessage",
                "MetricName": "ApproximateAgeOfOldestMessage",
                "Namespace": "AWS/SQS",
                "Dimensions": [
                    {"Name": "QueueName", "Value": "manga-fm-normal"}
                ],
                "Statistic": "Maximum",
                "Period": 300,
                "EvaluationPeriods": 2,
                "Threshold": 600,  # 10 minutes
                "ComparisonOperator": "GreaterThanThreshold",
                "AlarmActions": [
                    "arn:aws:sns:us-east-1:123456789012:manga-fm-alerts"
                ],
                "AlarmDescription": (
                    "Oldest message in normal queue exceeds 10 minutes. "
                    "Processing is falling behind."
                )
            },
            {
                "AlarmName": "MangaFM-DLQ-NotEmpty",
                "MetricName": "ApproximateNumberOfMessagesVisible",
                "Namespace": "AWS/SQS",
                "Dimensions": [
                    {"Name": "QueueName", "Value": "manga-fm-dlq"}
                ],
                "Statistic": "Sum",
                "Period": 300,
                "EvaluationPeriods": 1,
                "Threshold": 0,
                "ComparisonOperator": "GreaterThanThreshold",
                "AlarmActions": [
                    "arn:aws:sns:us-east-1:123456789012:manga-fm-alerts"
                ],
                "AlarmDescription": (
                    "Dead letter queue has messages. "
                    "Investigate failed FM requests immediately."
                )
            }
        ]

    @staticmethod
    def generate_autoscaling_policy() -> dict:
        """
        Generate ECS auto-scaling policy based on SQS queue depth.
        Scales FM processor tasks up/down based on backlog.
        """
        return {
            "TargetTrackingScaling": {
                "ServiceNamespace": "ecs",
                "ScalableDimension": "ecs:service:DesiredCount",
                "ResourceId": "service/manga-cluster/manga-fm-processor",
                "MinCapacity": 1,
                "MaxCapacity": 20,
                "TargetTrackingPolicies": [
                    {
                        "PolicyName": "MangaFM-QueueDepthScaling",
                        "TargetValue": 10.0,
                        "CustomizedMetricSpecification": {
                            "MetricName": "BacklogPerTask",
                            "Namespace": "MangaAssist/SQS",
                            "Statistic": "Average",
                            "Unit": "Count"
                        },
                        "ScaleInCooldown": 300,
                        "ScaleOutCooldown": 60
                    }
                ]
            },
            "BacklogPerTaskMetric": {
                "Description": (
                    "Custom metric: total queue messages / running task count. "
                    "Published by a Lambda function every 60 seconds."
                ),
                "Formula": "ApproximateNumberOfMessagesVisible / RunningTaskCount",
                "TargetValue": 10,
                "Rationale": (
                    "Each ECS task processes ~10 messages per minute with Haiku. "
                    "Target of 10 backlog per task means ~1 minute of work queued."
                )
            }
        }

8. Quick Reference — Queue Configuration Matrix

Queue Type Visibility Timeout Retention Max Receives (DLQ) Consumer Use Case
manga-fm-high-priority Standard 60s 4 days 3 Lambda (concurrency 50) Partner API overflow, urgent requests
manga-fm-normal Standard 120s 7 days 3 Lambda / ECS General async analysis
manga-fm-low-priority Standard 300s 14 days 3 ECS / EC2 Batch enrichment, nightly jobs
manga-fm-ordered.fifo FIFO 60s 7 days 3 Lambda Session-ordered multi-turn
manga-fm-session.fifo FIFO 60s 4 days 3 Lambda Per-session context preservation
manga-fm-dlq Standard 30s 14 days N/A Manual / Redrive Failed request investigation
manga-fm-dlq.fifo FIFO 30s 14 days N/A Manual / Redrive Failed ordered request investigation

End of File 2 — Asynchronous Processing and Request Validation Patterns