LOCAL PREVIEW View on GitHub

Synchronous and Asynchronous Processing Patterns

MangaAssist context: JP Manga store chatbot on AWS — Bedrock Claude 3 (Sonnet at $3/$15 per 1M tokens input/output, Haiku at $0.25/$1.25), OpenSearch Serverless (vector store), DynamoDB (sessions/products), ECS Fargate (orchestrator), API Gateway WebSocket, ElastiCache Redis. Target: useful answer in under 3 seconds, 1M messages/day scale.


Skill Mapping

Dimension Value
Certification AWS Certified AI Practitioner — Specialty (AIP-C01)
Task 2.4 — Select and implement FM API integration patterns
Skill 2.4.1 — Flexible Model Interaction
This File 02 — Sync/Async Processing Patterns (SQS producer/consumer, batch processing, request transformation)

Skill Scope

This file covers the implementation details of synchronous and asynchronous processing patterns for MangaAssist. Synchronous patterns handle real-time chat through direct Bedrock InvokeModel calls. Asynchronous patterns use SQS queues for deferred workloads like catalog enrichment, batch synopsis generation, and content moderation. Request transformation bridges the gap between API Gateway's inbound format and what Bedrock expects, handling Japanese text encoding, prompt assembly, and response shaping.


Mind Map

mindmap
  root((Sync/Async<br/>Processing<br/>Patterns))
    Sync Processing
      Direct InvokeModel
      ECS request handler
      Redis cache layer
      DynamoDB session read
      Timeout enforcement
    Async Processing
      SQS Standard queues
      SQS FIFO queues
      Dead-letter queues (DLQ)
      Lambda consumers
      ECS long-poll consumers
    Batch Processing
      Batch window configuration
      Partial batch failure handling
      Concurrency control
      Cost optimization via batching
      Progress tracking
    Request Transformation
      API GW to internal format
      Internal to Bedrock format
      Bedrock response to client format
      Japanese text normalization
      Token counting pre-flight
    Producer/Consumer
      SQS message format design
      Visibility timeout tuning
      Message deduplication
      Callback/notification on complete
      Poison message handling

1. Synchronous Processing Pattern

1.1 ECS Request Handler

The synchronous handler in the ECS Fargate orchestrator receives requests from API Gateway, assembles the prompt with session context, calls Bedrock, and returns the response within the 3-second budget.

"""
Synchronous request handler for MangaAssist chat.
Runs inside ECS Fargate containers behind an ALB.
"""
import json
import time
import logging
import uuid
from typing import Dict, Any, Optional
from dataclasses import dataclass, asdict

import redis
import boto3
from boto3.dynamodb.conditions import Key

logger = logging.getLogger(__name__)


@dataclass
class ChatRequest:
    """Parsed chat request from the client."""
    session_id: str
    message: str
    preferred_language: str = "ja"
    model_preference: str = "auto"  # auto | fast | quality
    request_id: str = ""

    def __post_init__(self):
        if not self.request_id:
            self.request_id = str(uuid.uuid4())


@dataclass
class ChatResponse:
    """Structured response back to the client."""
    request_id: str
    session_id: str
    response_text: str
    model_used: str
    latency_ms: int
    cached: bool = False
    tokens_input: int = 0
    tokens_output: int = 0

    def to_dict(self) -> dict:
        return asdict(self)


class SyncChatHandler:
    """
    Handles synchronous chat requests with caching, session management,
    and Bedrock invocation within a 3-second SLA.
    """

    def __init__(
        self,
        bedrock_manager,  # BedrockClientManager from file 01
        redis_client: redis.Redis,
        dynamodb_table_name: str = "manga-assist-sessions",
        cache_ttl: int = 300,
    ):
        self.bedrock = bedrock_manager
        self.redis = redis_client
        self.ddb = boto3.resource("dynamodb").Table(dynamodb_table_name)
        self.cache_ttl = cache_ttl

    def handle(self, raw_request: Dict[str, Any]) -> ChatResponse:
        """
        Process a synchronous chat request end-to-end.

        Timeline budget (3000ms):
          - Parse + validate:    ~10ms
          - Cache check:         ~5ms
          - Session load:        ~50ms
          - Prompt build:        ~5ms
          - Bedrock invocation:  ~2500ms (Sonnet) / ~800ms (Haiku)
          - Cache store + save:  ~100ms
          - Serialize response:  ~5ms
        """
        start = time.time()

        # 1. Parse request
        request = self._parse_request(raw_request)
        logger.info("Chat request | session=%s | lang=%s", request.session_id, request.preferred_language)

        # 2. Check response cache
        cache_key = self._build_cache_key(request)
        cached = self._check_cache(cache_key)
        if cached:
            elapsed = int((time.time() - start) * 1000)
            logger.info("Cache hit | session=%s | latency=%dms", request.session_id, elapsed)
            return ChatResponse(
                request_id=request.request_id,
                session_id=request.session_id,
                response_text=cached["response_text"],
                model_used=cached["model_used"],
                latency_ms=elapsed,
                cached=True,
            )

        # 3. Load conversation history
        history = self._load_session(request.session_id)

        # 4. Build prompt body
        model_key = self._select_model(request)
        body = self._build_body(request.message, history, model_key)

        # 5. Invoke Bedrock
        result = self.bedrock.invoke_model(model_key, body)

        # 6. Extract response text
        response_text = result["content"][0]["text"]
        tokens_in = result.get("usage", {}).get("input_tokens", 0)
        tokens_out = result.get("usage", {}).get("output_tokens", 0)

        # 7. Update session and cache (fire-and-forget for latency)
        self._update_session(request.session_id, request.message, response_text)
        self._store_cache(cache_key, {"response_text": response_text, "model_used": model_key})

        elapsed = int((time.time() - start) * 1000)
        logger.info(
            "Chat complete | session=%s | model=%s | tokens=%d/%d | latency=%dms",
            request.session_id, model_key, tokens_in, tokens_out, elapsed,
        )

        return ChatResponse(
            request_id=request.request_id,
            session_id=request.session_id,
            response_text=response_text,
            model_used=model_key,
            latency_ms=elapsed,
            tokens_input=tokens_in,
            tokens_output=tokens_out,
        )

    def _parse_request(self, raw: dict) -> ChatRequest:
        """Extract and validate fields from the raw request."""
        data = raw.get("data", {})
        return ChatRequest(
            session_id=data.get("sessionId", str(uuid.uuid4())),
            message=data["message"],
            preferred_language=data.get("preferredLanguage", "ja"),
            model_preference=data.get("modelPreference", "auto"),
        )

    def _select_model(self, request: ChatRequest) -> str:
        """Choose a model based on request preference and message complexity."""
        if request.model_preference == "fast":
            return "haiku"
        if request.model_preference == "quality":
            return "sonnet"
        # "auto" — use Haiku for short messages, Sonnet for complex ones
        if len(request.message) < 50:
            return "haiku"
        return "sonnet"

    def _build_cache_key(self, request: ChatRequest) -> str:
        """Build a Redis cache key from the request."""
        import hashlib
        content = f"{request.session_id}:{request.message}"
        digest = hashlib.sha256(content.encode("utf-8")).hexdigest()[:16]
        return f"manga:chat:{digest}"

    def _check_cache(self, key: str) -> Optional[dict]:
        """Check Redis for a cached response."""
        try:
            data = self.redis.get(key)
            if data:
                return json.loads(data)
        except redis.RedisError as exc:
            logger.warning("Redis cache check failed: %s", exc)
        return None

    def _store_cache(self, key: str, value: dict) -> None:
        """Store a response in Redis with TTL."""
        try:
            self.redis.setex(key, self.cache_ttl, json.dumps(value, ensure_ascii=False))
        except redis.RedisError as exc:
            logger.warning("Redis cache store failed: %s", exc)

    def _load_session(self, session_id: str) -> list:
        """Load conversation history from DynamoDB."""
        try:
            response = self.ddb.query(
                KeyConditionExpression=Key("sessionId").eq(session_id),
                ScanIndexForward=True,
                Limit=20,
            )
            return response.get("Items", [])
        except Exception as exc:
            logger.warning("Session load failed: %s", exc)
            return []

    def _update_session(self, session_id: str, user_msg: str, assistant_msg: str) -> None:
        """Append the latest turn to the session in DynamoDB."""
        try:
            timestamp = int(time.time() * 1000)
            self.ddb.put_item(Item={
                "sessionId": session_id,
                "timestamp": timestamp,
                "role": "user",
                "content": user_msg,
            })
            self.ddb.put_item(Item={
                "sessionId": session_id,
                "timestamp": timestamp + 1,
                "role": "assistant",
                "content": assistant_msg,
            })
        except Exception as exc:
            logger.error("Session update failed: %s", exc)

    def _build_body(self, message: str, history: list, model_key: str) -> dict:
        """Construct the Bedrock request body."""
        messages = []
        for item in history[-10:]:
            messages.append({"role": item["role"], "content": item["content"]})
        messages.append({"role": "user", "content": message})

        return {
            "anthropic_version": "bedrock-2023-05-31",
            "messages": messages,
            "system": "You are MangaAssist, a helpful JP manga store chatbot. Use polite keigo (敬語).",
        }

2. Asynchronous Processing with SQS

2.1 Queue Architecture

flowchart TB
    subgraph "Producers"
        direction TB
        P1[Admin API<br/>Catalog Enrichment]
        P2[ECS Orchestrator<br/>Deferred Tasks]
        P3[EventBridge<br/>Scheduled Jobs]
    end

    subgraph "SQS Queues"
        direction TB
        Q1[manga-enrichment-queue.fifo<br/>FIFO | Dedup 5min | Vis 300s]
        Q2[manga-deferred-queue<br/>Standard | Vis 180s]
        Q3[manga-moderation-queue<br/>Standard | Vis 120s]

        DLQ1[manga-enrichment-dlq.fifo<br/>maxReceiveCount: 3]
        DLQ2[manga-deferred-dlq<br/>maxReceiveCount: 5]
        DLQ3[manga-moderation-dlq<br/>maxReceiveCount: 3]

        Q1 -.->|after 3 failures| DLQ1
        Q2 -.->|after 5 failures| DLQ2
        Q3 -.->|after 3 failures| DLQ3
    end

    subgraph "Consumers"
        direction TB
        C1[Lambda<br/>Batch=10, Concurrency=5]
        C2[ECS Task<br/>Long Poll 20s]
        C3[Lambda<br/>Batch=1, Concurrency=10]
    end

    P1 --> Q1
    P2 --> Q2
    P3 --> Q1
    P3 --> Q3

    Q1 --> C1
    Q2 --> C2
    Q3 --> C3

    C1 --> Bedrock[Amazon Bedrock]
    C2 --> Bedrock
    C3 --> Bedrock

    C1 --> DDB[(DynamoDB<br/>Catalog)]
    C2 --> SNS[SNS<br/>Callbacks]
    C3 --> S3[S3<br/>Moderation Results]

2.2 SQS Producer — Catalog Enrichment

"""
SQS producer for manga catalog enrichment.
Sends enrichment jobs to a FIFO queue for ordered, deduplicated processing.
"""
import json
import time
import hashlib
import logging
from typing import List, Dict, Optional

import boto3

logger = logging.getLogger(__name__)


class CatalogEnrichmentProducer:
    """
    Produces enrichment jobs for manga catalog items.
    Uses FIFO queue to ensure ordered processing per manga series.
    """

    def __init__(self, queue_url: str, region: str = "us-east-1"):
        self.sqs = boto3.client("sqs", region_name=region)
        self.queue_url = queue_url

    def enrich_single(
        self,
        manga_id: str,
        title: str,
        title_jp: str,
        enrichment_type: str = "synopsis",
        priority: str = "normal",
    ) -> str:
        """
        Send a single enrichment job to the queue.

        Args:
            manga_id: Unique manga identifier
            title: English title
            title_jp: Japanese title (原題)
            enrichment_type: Type of enrichment (synopsis|tags|recommendations)
            priority: Job priority (high|normal|low)

        Returns:
            SQS MessageId
        """
        message_body = {
            "jobType": "catalog_enrichment",
            "enrichmentType": enrichment_type,
            "manga": {
                "id": manga_id,
                "title": title,
                "titleJp": title_jp,
            },
            "priority": priority,
            "requestedAt": int(time.time()),
        }

        # FIFO: group by manga series for ordered processing
        # Dedup: prevent duplicate enrichment within 5-minute window
        dedup_id = hashlib.sha256(
            f"{manga_id}:{enrichment_type}:{int(time.time() // 300)}".encode()
        ).hexdigest()[:128]

        response = self.sqs.send_message(
            QueueUrl=self.queue_url,
            MessageBody=json.dumps(message_body, ensure_ascii=False),
            MessageGroupId=f"manga-{manga_id[:8]}",
            MessageDeduplicationId=dedup_id,
            MessageAttributes={
                "enrichmentType": {
                    "DataType": "String",
                    "StringValue": enrichment_type,
                },
                "priority": {
                    "DataType": "String",
                    "StringValue": priority,
                },
            },
        )

        msg_id = response["MessageId"]
        logger.info(
            "Enrichment job queued | manga=%s | type=%s | msgId=%s",
            manga_id, enrichment_type, msg_id,
        )
        return msg_id

    def enrich_batch(self, items: List[Dict], enrichment_type: str = "synopsis") -> Dict:
        """
        Send a batch of enrichment jobs (up to 10 per SQS batch).

        Returns:
            Dict with 'successful' and 'failed' counts
        """
        results = {"successful": 0, "failed": 0, "message_ids": []}

        # SQS SendMessageBatch supports up to 10 messages
        for batch_start in range(0, len(items), 10):
            batch = items[batch_start:batch_start + 10]
            entries = []

            for idx, item in enumerate(batch):
                dedup_id = hashlib.sha256(
                    f"{item['id']}:{enrichment_type}:{int(time.time() // 300)}".encode()
                ).hexdigest()[:128]

                entries.append({
                    "Id": str(idx),
                    "MessageBody": json.dumps({
                        "jobType": "catalog_enrichment",
                        "enrichmentType": enrichment_type,
                        "manga": item,
                        "requestedAt": int(time.time()),
                    }, ensure_ascii=False),
                    "MessageGroupId": f"manga-{item['id'][:8]}",
                    "MessageDeduplicationId": dedup_id,
                })

            response = self.sqs.send_message_batch(
                QueueUrl=self.queue_url,
                Entries=entries,
            )

            results["successful"] += len(response.get("Successful", []))
            results["failed"] += len(response.get("Failed", []))
            results["message_ids"].extend(
                [m["MessageId"] for m in response.get("Successful", [])]
            )

        logger.info(
            "Batch enrichment queued | total=%d | success=%d | failed=%d",
            len(items), results["successful"], results["failed"],
        )
        return results

2.3 SQS Consumer — Lambda Batch Processor

"""
Lambda consumer for manga catalog enrichment queue.
Processes batches of enrichment jobs with partial failure reporting.
"""
import json
import time
import logging
from typing import Dict, Any, List

import boto3

logger = logging.getLogger(__name__)

bedrock = boto3.client("bedrock-runtime", region_name="us-east-1")
dynamodb = boto3.resource("dynamodb").Table("manga-catalog")


ENRICHMENT_PROMPTS = {
    "synopsis": (
        "Generate a compelling 2-paragraph synopsis for this manga. "
        "Write in both Japanese and English.\n\n"
        "Title: {title}\nJapanese Title: {titleJp}\n\n"
        "Provide the response as JSON with keys 'synopsis_en' and 'synopsis_jp'."
    ),
    "tags": (
        "Generate relevant tags/genres for this manga title. "
        "Provide 5-10 tags in both English and Japanese.\n\n"
        "Title: {title}\nJapanese Title: {titleJp}\n\n"
        "Respond as JSON with keys 'tags_en' (list) and 'tags_jp' (list)."
    ),
    "recommendations": (
        "Based on this manga title, suggest 5 similar manga that fans would enjoy. "
        "Include titles in both English and Japanese.\n\n"
        "Title: {title}\nJapanese Title: {titleJp}\n\n"
        "Respond as JSON array with objects having 'title_en' and 'title_jp' keys."
    ),
}


def lambda_handler(event: Dict[str, Any], context) -> Dict[str, Any]:
    """
    Process SQS batch of enrichment jobs.

    Uses partial batch failure reporting — returns only failed message IDs
    so SQS can retry them while acknowledging successful ones.

    Event source mapping config:
      - BatchSize: 10
      - MaximumBatchingWindowInSeconds: 30
      - FunctionResponseTypes: ["ReportBatchItemFailures"]
    """
    failed_items = []
    batch_size = len(event["Records"])
    logger.info("Processing enrichment batch | size=%d", batch_size)

    for record in event["Records"]:
        message_id = record["messageId"]
        try:
            body = json.loads(record["body"])
            manga = body["manga"]
            enrichment_type = body.get("enrichmentType", "synopsis")

            # Build prompt from template
            prompt_template = ENRICHMENT_PROMPTS.get(enrichment_type, ENRICHMENT_PROMPTS["synopsis"])
            prompt = prompt_template.format(
                title=manga.get("title", "Unknown"),
                titleJp=manga.get("titleJp", "不明"),
            )

            # Invoke Bedrock (use Haiku for cost efficiency on batch jobs)
            start = time.time()
            response = bedrock.invoke_model(
                modelId="anthropic.claude-3-haiku-20240307-v1:0",
                contentType="application/json",
                accept="application/json",
                body=json.dumps({
                    "anthropic_version": "bedrock-2023-05-31",
                    "max_tokens": 1024,
                    "temperature": 0.5,
                    "messages": [{"role": "user", "content": prompt}],
                }),
            )
            latency_ms = int((time.time() - start) * 1000)

            result = json.loads(response["body"].read())
            enrichment_data = result["content"][0]["text"]

            # Parse the model's JSON response
            try:
                parsed_enrichment = json.loads(enrichment_data)
            except json.JSONDecodeError:
                parsed_enrichment = {"raw_text": enrichment_data}

            # Store enrichment in DynamoDB catalog
            dynamodb.update_item(
                Key={"mangaId": manga["id"]},
                UpdateExpression="SET #enrichment.#etype = :data, #enrichment.#updated = :ts",
                ExpressionAttributeNames={
                    "#enrichment": "enrichment",
                    "#etype": enrichment_type,
                    "#updated": "lastUpdated",
                },
                ExpressionAttributeValues={
                    ":data": parsed_enrichment,
                    ":ts": int(time.time()),
                },
            )

            logger.info(
                "Enrichment complete | manga=%s | type=%s | latency=%dms",
                manga["id"], enrichment_type, latency_ms,
            )

        except Exception as exc:
            logger.error(
                "Enrichment failed | msgId=%s | error=%s",
                message_id, str(exc),
            )
            failed_items.append({"itemIdentifier": message_id})

    logger.info(
        "Batch complete | processed=%d | failed=%d",
        batch_size, len(failed_items),
    )

    # Return failed items for SQS retry
    return {"batchItemFailures": failed_items}

3. Request Transformation Pipeline

3.1 Transformation Flow

flowchart LR
    subgraph "Inbound Transform"
        A[Client JSON] --> B[Normalize Unicode<br/>NFC for JP]
        B --> C[Validate Fields]
        C --> D[Sanitize Input]
        D --> E[Internal Format]
    end

    subgraph "Model Transform"
        E --> F[Load Session<br/>Context]
        F --> G[Build Messages<br/>Array]
        G --> H[Add System<br/>Prompt]
        H --> I[Bedrock Request<br/>Body]
    end

    subgraph "Outbound Transform"
        J[Bedrock Response] --> K[Extract Text]
        K --> L[Post-process<br/>Japanese]
        L --> M[Build Client<br/>Response]
        M --> N[Client JSON]
    end

    I --> J

3.2 Request Transformer Implementation

"""
Request transformation pipeline for MangaAssist.
Handles the full lifecycle: client → internal → Bedrock → client.
"""
import json
import re
import unicodedata
import logging
from typing import Dict, Any, Optional, List
from dataclasses import dataclass

logger = logging.getLogger(__name__)


@dataclass
class TransformContext:
    """Carries state through the transformation pipeline."""
    raw_input: Dict[str, Any]
    session_id: str = ""
    user_message: str = ""
    language: str = "ja"
    conversation_history: List[Dict] = None
    bedrock_body: Dict = None
    bedrock_response: Dict = None
    client_response: Dict = None
    metadata: Dict = None

    def __post_init__(self):
        self.conversation_history = self.conversation_history or []
        self.metadata = self.metadata or {}


class InboundTransformer:
    """Transforms client input into internal format."""

    def transform(self, ctx: TransformContext) -> TransformContext:
        """Apply all inbound transformations."""
        raw = ctx.raw_input
        data = raw.get("data", {})

        # Extract and normalize the user message
        message = data.get("message", "")
        message = self._normalize_unicode(message)
        message = self._sanitize(message)

        ctx.user_message = message
        ctx.session_id = data.get("sessionId", "")
        ctx.language = self._detect_language(message, data.get("preferredLanguage", "ja"))

        ctx.metadata = {
            "original_length": len(data.get("message", "")),
            "normalized_length": len(message),
            "detected_language": ctx.language,
            "client_version": raw.get("metadata", {}).get("clientVersion", "unknown"),
        }

        logger.info(
            "Inbound transform | session=%s | lang=%s | chars=%d",
            ctx.session_id, ctx.language, len(message),
        )
        return ctx

    def _normalize_unicode(self, text: str) -> str:
        """Normalize to NFC (required for consistent Japanese text)."""
        return unicodedata.normalize("NFC", text)

    def _sanitize(self, text: str) -> str:
        """Remove zero-width chars and excessive whitespace."""
        text = re.sub(r"[\u200b-\u200f\u2028-\u202f\ufeff]", "", text)
        text = re.sub(r"\n{3,}", "\n\n", text)
        return text.strip()

    def _detect_language(self, text: str, preferred: str) -> str:
        """Simple language detection based on character ranges."""
        jp_chars = len(re.findall(r"[\u3040-\u309f\u30a0-\u30ff\u4e00-\u9fff]", text))
        total = max(len(text), 1)
        if jp_chars / total > 0.3:
            return "ja"
        return preferred


class ModelTransformer:
    """Transforms internal format into Bedrock request body."""

    SYSTEM_PROMPTS = {
        "ja": (
            "あなたはMangaAssistです。日本のマンガ書店のチャットボットです。"
            "お客様のマンガ探し、シリーズや作者に関する質問への回答、"
            "おすすめの提案、注文のサポートを行います。丁寧な敬語を使用してください。"
        ),
        "en": (
            "You are MangaAssist, a chatbot for a Japanese manga store. "
            "Help customers find manga, answer questions about series and authors, "
            "provide recommendations, and assist with orders."
        ),
    }

    def transform(self, ctx: TransformContext) -> TransformContext:
        """Build the Bedrock request body from internal format."""
        messages = []

        # Add conversation history
        for turn in ctx.conversation_history[-10:]:
            messages.append({
                "role": turn["role"],
                "content": turn["content"],
            })

        # Add current message
        messages.append({
            "role": "user",
            "content": ctx.user_message,
        })

        system_prompt = self.SYSTEM_PROMPTS.get(ctx.language, self.SYSTEM_PROMPTS["en"])

        ctx.bedrock_body = {
            "anthropic_version": "bedrock-2023-05-31",
            "max_tokens": 4096,
            "temperature": 0.3,
            "system": system_prompt,
            "messages": messages,
        }

        ctx.metadata["prompt_messages"] = len(messages)
        ctx.metadata["system_language"] = ctx.language

        return ctx


class OutboundTransformer:
    """Transforms Bedrock response into client format."""

    def transform(self, ctx: TransformContext) -> TransformContext:
        """Build the client response from Bedrock output."""
        if not ctx.bedrock_response:
            ctx.client_response = {
                "error": "No response from model",
                "code": "MODEL_NO_RESPONSE",
            }
            return ctx

        content = ctx.bedrock_response.get("content", [{}])
        response_text = content[0].get("text", "") if content else ""

        # Post-process Japanese text
        response_text = self._post_process_japanese(response_text, ctx.language)

        usage = ctx.bedrock_response.get("usage", {})

        ctx.client_response = {
            "status": "success",
            "data": {
                "message": response_text,
                "sessionId": ctx.session_id,
                "language": ctx.language,
            },
            "usage": {
                "inputTokens": usage.get("input_tokens", 0),
                "outputTokens": usage.get("output_tokens", 0),
            },
            "metadata": {
                "model": ctx.bedrock_response.get("model", "unknown"),
                "stopReason": ctx.bedrock_response.get("stop_reason", "unknown"),
            },
        }
        return ctx

    def _post_process_japanese(self, text: str, language: str) -> str:
        """Apply Japanese-specific post-processing."""
        if language != "ja":
            return text

        # Ensure proper spacing between Japanese and ASCII text
        text = re.sub(r"([\u3040-\u9fff])([a-zA-Z0-9])", r"\1 \2", text)
        text = re.sub(r"([a-zA-Z0-9])([\u3040-\u9fff])", r"\1 \2", text)

        return text


class TransformPipeline:
    """Orchestrates the full transformation pipeline."""

    def __init__(self):
        self.inbound = InboundTransformer()
        self.model = ModelTransformer()
        self.outbound = OutboundTransformer()

    def process_inbound(self, raw_input: Dict) -> TransformContext:
        """Transform client input through inbound + model stages."""
        ctx = TransformContext(raw_input=raw_input)
        ctx = self.inbound.transform(ctx)
        ctx = self.model.transform(ctx)
        return ctx

    def process_outbound(self, ctx: TransformContext) -> Dict:
        """Transform Bedrock response into client format."""
        ctx = self.outbound.transform(ctx)
        return ctx.client_response

4. Batch Processing Patterns

4.1 Batch Window Strategy

flowchart TD
    subgraph "Batch Window Configuration"
        A[SQS Event Source Mapping] --> B{Check Conditions}
        B -->|BatchSize reached<br/>10 messages| C[Invoke Lambda]
        B -->|Window timeout<br/>30 seconds| C
        B -->|Neither| D[Continue collecting]
    end

    subgraph "Lambda Batch Processing"
        C --> E[Receive 1-10 records]
        E --> F{Process each record}
        F -->|Success| G[Acknowledge]
        F -->|Failure| H[Report in<br/>batchItemFailures]
        G --> I[Return response]
        H --> I
    end

    I --> J{SQS evaluates<br/>failures}
    J -->|Has failures| K[Retry failed messages<br/>after visibility timeout]
    J -->|No failures| L[Delete all messages]

4.2 ECS Long-Poll Consumer

For workloads that need more control than Lambda provides (long-running enrichments, stateful processing), the ECS consumer uses SQS long polling.

"""
ECS-based SQS long-poll consumer for deferred inference tasks.
Runs as a persistent process in an ECS Fargate task.
"""
import json
import time
import signal
import logging
import threading
from typing import Dict, Any, Optional
from concurrent.futures import ThreadPoolExecutor, as_completed

import boto3

logger = logging.getLogger(__name__)


class ECSQueueConsumer:
    """
    Long-polling SQS consumer running in ECS Fargate.
    Supports concurrent message processing and graceful shutdown.
    """

    def __init__(
        self,
        queue_url: str,
        max_workers: int = 5,
        visibility_timeout: int = 180,
        wait_time_seconds: int = 20,
        region: str = "us-east-1",
    ):
        self.sqs = boto3.client("sqs", region_name=region)
        self.bedrock = boto3.client("bedrock-runtime", region_name=region)
        self.queue_url = queue_url
        self.max_workers = max_workers
        self.visibility_timeout = visibility_timeout
        self.wait_time = wait_time_seconds
        self._running = True
        self._executor = ThreadPoolExecutor(max_workers=max_workers)

        # Graceful shutdown handling
        signal.signal(signal.SIGTERM, self._shutdown_handler)
        signal.signal(signal.SIGINT, self._shutdown_handler)

    def _shutdown_handler(self, signum, frame):
        """Handle graceful shutdown signal from ECS."""
        logger.info("Shutdown signal received (sig=%d), draining...", signum)
        self._running = False

    def run(self):
        """Main consumer loop — runs until shutdown signal."""
        logger.info(
            "Consumer started | queue=%s | workers=%d",
            self.queue_url, self.max_workers,
        )

        while self._running:
            try:
                response = self.sqs.receive_message(
                    QueueUrl=self.queue_url,
                    MaxNumberOfMessages=10,
                    WaitTimeSeconds=self.wait_time,
                    VisibilityTimeout=self.visibility_timeout,
                    MessageAttributeNames=["All"],
                )

                messages = response.get("Messages", [])
                if not messages:
                    continue

                logger.info("Received %d messages", len(messages))

                # Process messages concurrently
                futures = {
                    self._executor.submit(self._process_message, msg): msg
                    for msg in messages
                }

                for future in as_completed(futures, timeout=self.visibility_timeout):
                    msg = futures[future]
                    try:
                        future.result()
                        # Delete successfully processed message
                        self.sqs.delete_message(
                            QueueUrl=self.queue_url,
                            ReceiptHandle=msg["ReceiptHandle"],
                        )
                    except Exception as exc:
                        logger.error(
                            "Message processing failed | msgId=%s | error=%s",
                            msg["MessageId"], str(exc),
                        )
                        # Message will return to queue after visibility timeout

            except Exception as exc:
                logger.error("Consumer loop error: %s", exc)
                time.sleep(5)

        logger.info("Consumer shutdown complete")
        self._executor.shutdown(wait=True)

    def _process_message(self, message: Dict[str, Any]) -> None:
        """Process a single SQS message."""
        body = json.loads(message["Body"])
        job_type = body.get("jobType", "unknown")

        logger.info(
            "Processing message | msgId=%s | jobType=%s",
            message["MessageId"], job_type,
        )

        if job_type == "deferred_inference":
            self._handle_deferred_inference(body)
        elif job_type == "batch_recommendation":
            self._handle_batch_recommendation(body)
        else:
            logger.warning("Unknown job type: %s", job_type)

    def _handle_deferred_inference(self, body: Dict) -> None:
        """Handle a deferred inference job."""
        prompt = body.get("prompt", "")
        model_id = body.get("modelId", "anthropic.claude-3-haiku-20240307-v1:0")
        callback_url = body.get("callbackUrl")

        response = self.bedrock.invoke_model(
            modelId=model_id,
            contentType="application/json",
            accept="application/json",
            body=json.dumps({
                "anthropic_version": "bedrock-2023-05-31",
                "max_tokens": 2048,
                "messages": [{"role": "user", "content": prompt}],
            }),
        )

        result = json.loads(response["body"].read())

        # If callback URL provided, notify the requester
        if callback_url:
            self._send_callback(callback_url, {
                "status": "completed",
                "jobId": body.get("jobId"),
                "result": result["content"][0]["text"],
            })

    def _handle_batch_recommendation(self, body: Dict) -> None:
        """Handle batch recommendation generation."""
        manga_ids = body.get("mangaIds", [])
        for manga_id in manga_ids:
            # Process each manga recommendation
            logger.info("Generating recommendations for manga=%s", manga_id)
            # Implementation would invoke Bedrock and store results

    def _send_callback(self, url: str, payload: Dict) -> None:
        """Send completion callback via SNS or HTTP."""
        import urllib.request
        try:
            req = urllib.request.Request(
                url,
                data=json.dumps(payload).encode("utf-8"),
                headers={"Content-Type": "application/json"},
                method="POST",
            )
            urllib.request.urlopen(req, timeout=10)
        except Exception as exc:
            logger.error("Callback failed | url=%s | error=%s", url, str(exc))


if __name__ == "__main__":
    import os
    consumer = ECSQueueConsumer(
        queue_url=os.environ["SQS_QUEUE_URL"],
        max_workers=int(os.environ.get("MAX_WORKERS", "5")),
    )
    consumer.run()

5. Dead-Letter Queue Processing

5.1 DLQ Reprocessor

"""
DLQ reprocessor for MangaAssist.
Inspects failed messages, classifies errors, and either retries or archives.
"""
import json
import time
import logging
from typing import Dict, List
from enum import Enum

import boto3

logger = logging.getLogger(__name__)


class FailureClass(Enum):
    TRANSIENT = "transient"        # Retry will likely succeed
    PERMANENT = "permanent"        # Message is malformed, won't ever succeed
    THROTTLE = "throttle"          # Rate limited, retry after delay
    UNKNOWN = "unknown"


class DLQReprocessor:
    """Inspect and reprocess messages from dead-letter queues."""

    def __init__(self, dlq_url: str, source_queue_url: str, region: str = "us-east-1"):
        self.sqs = boto3.client("sqs", region_name=region)
        self.dlq_url = dlq_url
        self.source_queue_url = source_queue_url

    def classify_failure(self, message: Dict) -> FailureClass:
        """Classify the failure type based on message attributes and body."""
        body = json.loads(message.get("Body", "{}"))

        # Check approximate receive count
        receive_count = int(
            message.get("Attributes", {}).get("ApproximateReceiveCount", "0")
        )

        # Messages that failed many times are likely permanent failures
        if receive_count > 10:
            return FailureClass.PERMANENT

        # Check for known transient error markers
        error_msg = body.get("_error", "").lower()
        if "throttl" in error_msg or "rate" in error_msg:
            return FailureClass.THROTTLE
        if "timeout" in error_msg or "connection" in error_msg:
            return FailureClass.TRANSIENT
        if "validation" in error_msg or "malformed" in error_msg:
            return FailureClass.PERMANENT

        return FailureClass.UNKNOWN

    def reprocess(self, max_messages: int = 100) -> Dict:
        """
        Process messages from the DLQ.
        - Transient/throttle: re-enqueue to source queue
        - Permanent: archive to S3
        - Unknown: flag for manual review
        """
        stats = {"requeued": 0, "archived": 0, "flagged": 0}

        processed = 0
        while processed < max_messages:
            response = self.sqs.receive_message(
                QueueUrl=self.dlq_url,
                MaxNumberOfMessages=10,
                WaitTimeSeconds=5,
                AttributeNames=["All"],
            )

            messages = response.get("Messages", [])
            if not messages:
                break

            for msg in messages:
                failure_class = self.classify_failure(msg)

                if failure_class in (FailureClass.TRANSIENT, FailureClass.THROTTLE):
                    # Re-enqueue to source queue
                    self.sqs.send_message(
                        QueueUrl=self.source_queue_url,
                        MessageBody=msg["Body"],
                    )
                    stats["requeued"] += 1

                elif failure_class == FailureClass.PERMANENT:
                    # Archive (would store to S3 in production)
                    logger.info("Archiving permanent failure | msgId=%s", msg["MessageId"])
                    stats["archived"] += 1

                else:
                    logger.warning("Unknown failure | msgId=%s", msg["MessageId"])
                    stats["flagged"] += 1

                # Delete from DLQ after handling
                self.sqs.delete_message(
                    QueueUrl=self.dlq_url,
                    ReceiptHandle=msg["ReceiptHandle"],
                )

                processed += 1

        logger.info("DLQ reprocessing complete | stats=%s", stats)
        return stats

6. CloudWatch Metrics and Monitoring

6.1 Custom Metrics Architecture

flowchart LR
    subgraph "Sources"
        ECS[ECS Handler] -->|put_metric_data| CW
        Lambda[Lambda Consumer] -->|put_metric_data| CW
        APIGW[API Gateway] -->|Built-in| CW
    end

    subgraph "CloudWatch"
        CW[CloudWatch Metrics]
        CW --> D1[Dashboard:<br/>MangaAssist-Ops]
        CW --> A1[Alarm:<br/>SyncLatency > 3s]
        CW --> A2[Alarm:<br/>DLQ depth > 100]
        CW --> A3[Alarm:<br/>Error rate > 5%]
    end

    A1 --> SNS[SNS Topic]
    A2 --> SNS
    A3 --> SNS
    SNS --> OPS[Ops Team<br/>PagerDuty]

6.2 Metrics Publisher

"""
CloudWatch custom metrics publisher for MangaAssist.
Publishes latency, error rates, and queue depth metrics.
"""
import time
import logging
from typing import List, Dict
from datetime import datetime

import boto3

logger = logging.getLogger(__name__)


class MangaMetricsPublisher:
    """Publishes custom CloudWatch metrics for MangaAssist operations."""

    NAMESPACE = "MangaAssist/Operations"

    def __init__(self, environment: str = "prod"):
        self.cw = boto3.client("cloudwatch")
        self.environment = environment
        self._buffer: List[Dict] = []
        self._buffer_limit = 20  # CloudWatch max per PutMetricData call

    def record_sync_latency(self, model: str, latency_ms: float, cached: bool = False):
        """Record synchronous invocation latency."""
        self._buffer.append({
            "MetricName": "SyncLatency",
            "Dimensions": [
                {"Name": "Environment", "Value": self.environment},
                {"Name": "Model", "Value": model},
                {"Name": "Cached", "Value": str(cached)},
            ],
            "Value": latency_ms,
            "Unit": "Milliseconds",
            "Timestamp": datetime.utcnow(),
        })
        self._flush_if_full()

    def record_async_job(self, job_type: str, success: bool, processing_ms: float):
        """Record async job completion."""
        self._buffer.append({
            "MetricName": "AsyncJobDuration",
            "Dimensions": [
                {"Name": "Environment", "Value": self.environment},
                {"Name": "JobType", "Value": job_type},
                {"Name": "Status", "Value": "success" if success else "failure"},
            ],
            "Value": processing_ms,
            "Unit": "Milliseconds",
            "Timestamp": datetime.utcnow(),
        })
        self._flush_if_full()

    def record_error(self, operation: str, error_type: str):
        """Record an error occurrence."""
        self._buffer.append({
            "MetricName": "ErrorCount",
            "Dimensions": [
                {"Name": "Environment", "Value": self.environment},
                {"Name": "Operation", "Value": operation},
                {"Name": "ErrorType", "Value": error_type},
            ],
            "Value": 1,
            "Unit": "Count",
            "Timestamp": datetime.utcnow(),
        })
        self._flush_if_full()

    def _flush_if_full(self):
        """Flush buffer when it reaches the CloudWatch batch limit."""
        if len(self._buffer) >= self._buffer_limit:
            self.flush()

    def flush(self):
        """Send buffered metrics to CloudWatch."""
        if not self._buffer:
            return

        try:
            self.cw.put_metric_data(
                Namespace=self.NAMESPACE,
                MetricData=self._buffer,
            )
            logger.debug("Flushed %d metrics to CloudWatch", len(self._buffer))
        except Exception as exc:
            logger.error("Metrics flush failed: %s", exc)
        finally:
            self._buffer.clear()

Key Takeaways

# Takeaway
1 Sync path budget — Allocate ~2500ms for Bedrock InvokeModel out of the 3000ms SLA; everything else (DDB, Redis, network) must fit in ~500ms.
2 SQS FIFO for ordered workloads — Catalog enrichment uses FIFO with MessageGroupId per manga series to prevent out-of-order updates.
3 Partial batch failure — Lambda returns batchItemFailures so SQS only retries the specific messages that failed, not the entire batch.
4 Haiku for batch jobs — Async enrichment uses Haiku ($0.25/$1.25 per 1M) instead of Sonnet ($3/$15) — 12x cheaper for non-interactive workloads.
5 DLQ classification — Separate transient (retry), permanent (archive), and throttle (delayed retry) failures to avoid wasting compute on unrecoverable messages.
6 ECS long-poll consumer with graceful SIGTERM handling ensures zero message loss during ECS task replacement or scaling events.
7 Request transformation pipeline keeps concerns separated (inbound/model/outbound) and makes each stage independently testable.
8 Japanese text normalization (NFC) must happen at the inbound stage before cache key computation or prompt construction.