LOCAL PREVIEW View on GitHub

Batch Inference and Concurrency Deep-Dive

AWS AIP-C01 Task 4.2 — Skill 4.2.3: Optimize FM throughput — batch inference and concurrency patterns Context: MangaAssist e-commerce chatbot — Bedrock Claude 3 (Sonnet/Haiku), OpenSearch Serverless, DynamoDB, ECS Fargate, API Gateway WebSocket. 1M messages/day, peak 20K concurrent users. Target: sustain 1,000+ requests/minute to Bedrock.


Skill Mapping

Certification Domain Task Skill
AWS AIP-C01 Domain 4 — Operational Efficiency & Optimization Task 4.2 — Optimize FM performance Skill 4.2.3 — Optimize FM throughput — batch inference and concurrency patterns

Skill scope: Deep-dive into batch inference strategies, adaptive concurrency control, priority-based request queuing, and streaming vs synchronous throughput characteristics for Bedrock Claude 3 models.


Real-Time vs Batch — When to Use Each

The fundamental throughput decision for MangaAssist is: does this request need a response within seconds, or can it wait? Every request routed to batch processing frees real-time Bedrock capacity for user-facing chat.

Decision Matrix

flowchart TD
    START[Incoming Request] --> Q1{User waiting<br/>for response?}
    Q1 -->|Yes| Q2{Latency<br/>requirement?}
    Q1 -->|No| BATCH[Batch Processing<br/>SQS Queue]

    Q2 -->|< 2s| RT_HAIKU[Real-Time: Haiku<br/>Order status, FAQ]
    Q2 -->|< 5s| RT_SONNET[Real-Time: Sonnet<br/>Recommendations, complex queries]
    Q2 -->|< 10s| MICRO[Micro-Batch<br/>Group similar queries]

    BATCH --> Q3{Deadline?}
    Q3 -->|< 1 hour| BATCH_HIGH[Batch High Priority<br/>Catalog enrichment]
    Q3 -->|< 6 hours| BATCH_MED[Batch Medium Priority<br/>Recommendation pre-compute]
    Q3 -->|< 24 hours| BATCH_LOW[Batch Low Priority<br/>Reports, analytics]

    style RT_HAIKU fill:#2ecc71,color:#000
    style RT_SONNET fill:#3498db,color:#fff
    style MICRO fill:#f39c12,color:#000
    style BATCH_HIGH fill:#e74c3c,color:#fff
    style BATCH_MED fill:#e67e22,color:#fff
    style BATCH_LOW fill:#95a5a6,color:#000

MangaAssist Workload Distribution

Workload Category % of Total Processing Mode Model Concurrency Pool
User chat — simple queries 40% Real-time Haiku Haiku RT pool (50 slots)
User chat — complex queries 25% Real-time Sonnet Sonnet RT pool (30 slots)
User chat — duplicate queries during events 10% Micro-batch Haiku Haiku RT pool (shared)
Catalog enrichment (new manga arrivals) 8% Batch Haiku Batch pool (20 slots)
Recommendation pre-computation 7% Batch Sonnet Batch pool (10 slots)
Embedding regeneration 5% Batch Haiku Batch pool (20 slots)
Analytics and reporting 3% Batch (off-peak) Sonnet Batch pool (10 slots)
Customer sentiment analysis 2% Batch (off-peak) Haiku Batch pool (20 slots)

By routing 25% of total volume to batch processing, MangaAssist frees the equivalent of ~250 requests/minute of real-time Bedrock capacity.


Adaptive Concurrency Control

Static concurrency limits fail because Bedrock capacity fluctuates based on regional load, account-level quotas, and time of day. Adaptive concurrency discovers the optimal operating point in real-time.

The Adaptive Concurrency Algorithm

Start with N=10 concurrent Bedrock calls
Every evaluation_window (30 seconds):
    IF throttle_count == 0 AND p99_latency < target:
        N = min(N + growth_step, max_limit)       # Probe higher
    ELIF throttle_count > 0:
        N = max(N * backoff_factor, min_limit)     # Back off
    ELSE:
        N = N                                       # Hold steady

Why Start at N=10?

Starting conservatively at N=10 (not the account limit) is intentional:

  1. Cold start safety — New deployments should ramp up, not slam Bedrock at full capacity.
  2. Shared account limits — Other services in the AWS account may also be calling Bedrock.
  3. Time-of-day variation — Available capacity at 2 AM differs from noon during a manga sale.
  4. Fast convergence — With a growth step of 3-5 per window, N=10 reaches N=50 in ~4 minutes.

Concurrency Adaptation Visualization

sequenceDiagram
    participant AC as AdaptiveConcurrencyController
    participant B as Bedrock
    participant CW as CloudWatch

    Note over AC: Start: N=10

    loop Every 30s evaluation window
        AC->>B: 10 concurrent requests
        B-->>AC: All succeed, p99=800ms (target=1200ms)
        AC->>AC: N = min(10 + 5, 80) = 15
        AC->>CW: emit concurrency_limit=15

        AC->>B: 15 concurrent requests
        B-->>AC: All succeed, p99=900ms
        AC->>AC: N = min(15 + 5, 80) = 20
        AC->>CW: emit concurrency_limit=20

        AC->>B: 20 concurrent requests
        B-->>AC: All succeed, p99=1050ms
        AC->>AC: N = min(20 + 5, 80) = 25
        AC->>CW: emit concurrency_limit=25

        AC->>B: 25 concurrent requests
        B-->>AC: 2 throttled, p99=1800ms
        AC->>AC: N = max(25 * 0.7, 10) = 17
        AC->>CW: emit concurrency_limit=17, throttle_count=2

        AC->>B: 17 concurrent requests
        B-->>AC: All succeed, p99=950ms
        AC->>AC: N = min(17 + 5, 80) = 22
        Note over AC: Oscillates around optimal ~20-22
    end

AdaptiveConcurrencyController Implementation

import asyncio
import time
import math
import logging
import statistics
from dataclasses import dataclass, field
from typing import List, Optional

import boto3

logger = logging.getLogger("mangaassist.concurrency")


@dataclass
class LatencyWindow:
    """Sliding window of latency measurements."""
    samples: List[float] = field(default_factory=list)
    max_samples: int = 200

    def add(self, latency_ms: float):
        self.samples.append(latency_ms)
        if len(self.samples) > self.max_samples:
            self.samples = self.samples[-self.max_samples:]

    def p99(self) -> float:
        if not self.samples:
            return 0.0
        sorted_samples = sorted(self.samples)
        idx = int(math.ceil(0.99 * len(sorted_samples))) - 1
        return sorted_samples[max(0, idx)]

    def p50(self) -> float:
        if not self.samples:
            return 0.0
        return statistics.median(self.samples)

    def clear(self):
        self.samples.clear()


class AdaptiveConcurrencyController:
    """
    Adaptive concurrency controller for Bedrock model invocations.

    Continuously adjusts the concurrency limit based on:
    - Throttle signals from Bedrock (ThrottlingException)
    - P99 latency relative to a target threshold
    - Minimum and maximum bounds per model

    MangaAssist uses separate controllers for each model:
    - Sonnet: range [5, 50], target p99 = 1200ms
    - Haiku: range [10, 80], target p99 = 600ms
    """

    def __init__(
        self,
        model_id: str,
        initial_concurrency: int = 10,
        min_concurrency: int = 5,
        max_concurrency: int = 50,
        target_p99_ms: float = 1200.0,
        growth_step: int = 5,
        backoff_factor: float = 0.7,
        evaluation_window_sec: float = 30.0,
        cloudwatch_namespace: str = "MangaAssist/Throughput",
    ):
        self.model_id = model_id
        self.min_concurrency = min_concurrency
        self.max_concurrency = max_concurrency
        self.target_p99_ms = target_p99_ms
        self.growth_step = growth_step
        self.backoff_factor = backoff_factor
        self.evaluation_window_sec = evaluation_window_sec
        self.cloudwatch_namespace = cloudwatch_namespace

        self._current_limit = initial_concurrency
        self._semaphore = asyncio.Semaphore(initial_concurrency)
        self._active = 0
        self._throttle_count = 0
        self._success_count = 0
        self._latency = LatencyWindow()
        self._evaluation_task: Optional[asyncio.Task] = None
        self._cw = boto3.client("cloudwatch")

        logger.info(
            "AdaptiveConcurrencyController: model=%s init=%d range=[%d,%d] target_p99=%dms",
            model_id, initial_concurrency, min_concurrency, max_concurrency,
            int(target_p99_ms),
        )

    @property
    def current_limit(self) -> int:
        return self._current_limit

    @property
    def active_count(self) -> int:
        return self._active

    async def start(self):
        """Start the background evaluation loop."""
        self._evaluation_task = asyncio.create_task(self._evaluation_loop())

    async def stop(self):
        """Stop the evaluation loop."""
        if self._evaluation_task:
            self._evaluation_task.cancel()
            try:
                await self._evaluation_task
            except asyncio.CancelledError:
                pass

    async def acquire(self) -> bool:
        """Acquire a concurrency slot. Blocks if at limit."""
        await self._semaphore.acquire()
        self._active += 1
        return True

    async def release(self, latency_ms: float, throttled: bool = False):
        """Release a slot and record the outcome."""
        self._active -= 1
        self._semaphore.release()

        if throttled:
            self._throttle_count += 1
        else:
            self._success_count += 1
            self._latency.add(latency_ms)

    async def _evaluation_loop(self):
        """Periodically evaluate and adjust concurrency limit."""
        while True:
            await asyncio.sleep(self.evaluation_window_sec)

            p99 = self._latency.p99()
            throttles = self._throttle_count
            successes = self._success_count
            total = throttles + successes

            old_limit = self._current_limit
            new_limit = old_limit

            if total == 0:
                # No traffic in this window — hold steady
                pass
            elif throttles > 0:
                # Throttles detected — back off
                new_limit = max(
                    self.min_concurrency,
                    int(old_limit * self.backoff_factor),
                )
                logger.warning(
                    "[%s] Throttles=%d in window, reducing %d -> %d",
                    self.model_id, throttles, old_limit, new_limit,
                )
            elif p99 < self.target_p99_ms:
                # Under latency target with no throttles — grow
                new_limit = min(
                    self.max_concurrency,
                    old_limit + self.growth_step,
                )
                logger.info(
                    "[%s] p99=%.0fms < target=%dms, growing %d -> %d",
                    self.model_id, p99, int(self.target_p99_ms), old_limit, new_limit,
                )
            elif p99 >= self.target_p99_ms * 1.5:
                # Latency significantly above target — slight backoff
                new_limit = max(
                    self.min_concurrency,
                    old_limit - self.growth_step,
                )
                logger.info(
                    "[%s] p99=%.0fms >> target=%dms, reducing %d -> %d",
                    self.model_id, p99, int(self.target_p99_ms), old_limit, new_limit,
                )
            else:
                # p99 between target and 1.5x target — hold steady
                logger.debug(
                    "[%s] p99=%.0fms near target=%dms, holding at %d",
                    self.model_id, p99, int(self.target_p99_ms), old_limit,
                )

            if new_limit != old_limit:
                await self._resize(new_limit)

            # Emit metrics
            await self._emit_metrics(p99, throttles, successes)

            # Reset window counters
            self._throttle_count = 0
            self._success_count = 0
            self._latency.clear()

    async def _resize(self, new_limit: int):
        """Resize the semaphore to the new concurrency limit."""
        delta = new_limit - self._current_limit
        if delta > 0:
            for _ in range(delta):
                self._semaphore.release()
        elif delta < 0:
            for _ in range(abs(delta)):
                await self._semaphore.acquire()
        self._current_limit = new_limit

    async def _emit_metrics(self, p99: float, throttles: int, successes: int):
        """Emit concurrency metrics to CloudWatch."""
        try:
            self._cw.put_metric_data(
                Namespace=self.cloudwatch_namespace,
                MetricData=[
                    {
                        "MetricName": "ConcurrencyLimit",
                        "Value": self._current_limit,
                        "Unit": "Count",
                        "Dimensions": [
                            {"Name": "ModelId", "Value": self.model_id},
                        ],
                    },
                    {
                        "MetricName": "ActiveConcurrency",
                        "Value": self._active,
                        "Unit": "Count",
                        "Dimensions": [
                            {"Name": "ModelId", "Value": self.model_id},
                        ],
                    },
                    {
                        "MetricName": "ThrottleCount",
                        "Value": throttles,
                        "Unit": "Count",
                        "Dimensions": [
                            {"Name": "ModelId", "Value": self.model_id},
                        ],
                    },
                    {
                        "MetricName": "P99Latency",
                        "Value": p99,
                        "Unit": "Milliseconds",
                        "Dimensions": [
                            {"Name": "ModelId", "Value": self.model_id},
                        ],
                    },
                ],
            )
        except Exception:
            logger.exception("Failed to emit CloudWatch metrics")

    def get_status(self) -> dict:
        return {
            "model_id": self.model_id,
            "current_limit": self._current_limit,
            "active": self._active,
            "p99_ms": round(self._latency.p99(), 1),
            "p50_ms": round(self._latency.p50(), 1),
            "window_throttles": self._throttle_count,
            "window_successes": self._success_count,
        }

Priority-Based Request Queue

During MangaAssist peak events (manga releases, flash sales), not all requests are equal. A customer with a payment problem must be served before a browsing query. The priority queue enforces this ordering.

Priority Levels

Priority Weight Category Intent Examples Queue Behavior
P0 — Critical 100 Order/Payment order_status, payment_failed, cancel_order Preempts P1-P3, dedicated Haiku slot
P1 — High 50 Purchase Intent recommend_manga, check_availability, add_to_cart Standard real-time processing
P2 — Medium 20 Browsing describe_series, genre_browse, author_info Degradable to cached/template
P3 — Low 1 Background embedding_update, catalog_enrich, report_gen SQS batch queue, no SLA

VIP Customer Priority Boost

MangaAssist identifies VIP customers (top 5% by purchase history) and boosts their priority by one level: - VIP browsing (P2) becomes High (P1) - VIP purchase intent (P1) becomes Critical (P0)

PriorityRequestQueue Implementation

import asyncio
import heapq
import time
import logging
from dataclasses import dataclass, field
from typing import Any, Optional
from enum import IntEnum

logger = logging.getLogger("mangaassist.priority_queue")


class Priority(IntEnum):
    """Request priority levels — lower value = higher priority."""
    CRITICAL = 0   # P0: Order/Payment issues
    HIGH = 1       # P1: Purchase intent
    MEDIUM = 2     # P2: Browsing assistance
    LOW = 3        # P3: Background/batch


INTENT_PRIORITY_MAP = {
    # P0 — Critical
    "order_status": Priority.CRITICAL,
    "payment_failed": Priority.CRITICAL,
    "cancel_order": Priority.CRITICAL,
    "refund_request": Priority.CRITICAL,
    "shipping_issue": Priority.CRITICAL,

    # P1 — High
    "recommend_manga": Priority.HIGH,
    "check_availability": Priority.HIGH,
    "add_to_cart": Priority.HIGH,
    "price_inquiry": Priority.HIGH,

    # P2 — Medium
    "describe_series": Priority.MEDIUM,
    "genre_browse": Priority.MEDIUM,
    "author_info": Priority.MEDIUM,
    "general_question": Priority.MEDIUM,

    # P3 — Low (batch)
    "embedding_update": Priority.LOW,
    "catalog_enrich": Priority.LOW,
    "report_generation": Priority.LOW,
}


@dataclass(order=True)
class PrioritizedRequest:
    """A request with priority ordering for the heap queue."""
    priority: int
    timestamp: float = field(compare=True)
    sequence: int = field(compare=True)
    request: Any = field(compare=False)
    future: asyncio.Future = field(compare=False, repr=False)


class PriorityRequestQueue:
    """
    Priority queue for MangaAssist Bedrock requests.

    Ensures high-priority requests (order issues, payments) are processed
    before low-priority requests (browsing, batch) when Bedrock capacity
    is constrained.

    Features:
    - Heap-based priority ordering (O(log n) enqueue/dequeue)
    - VIP customer priority boost
    - Starvation prevention via aging
    - Queue depth metrics per priority level
    - Configurable max queue size with overflow rejection
    """

    def __init__(
        self,
        max_queue_size: int = 1000,
        aging_interval_sec: float = 30.0,
        aging_boost: int = 1,
        vip_boost: int = 1,
    ):
        self.max_queue_size = max_queue_size
        self.aging_interval_sec = aging_interval_sec
        self.aging_boost = aging_boost
        self.vip_boost = vip_boost

        self._heap: list[PrioritizedRequest] = []
        self._sequence = 0
        self._lock = asyncio.Lock()
        self._not_empty = asyncio.Event()
        self._aging_task: Optional[asyncio.Task] = None

        # Metrics
        self._enqueued_by_priority = {p: 0 for p in Priority}
        self._dequeued_by_priority = {p: 0 for p in Priority}
        self._rejected_overflow = 0

    async def start(self):
        """Start the aging background task."""
        self._aging_task = asyncio.create_task(self._aging_loop())

    async def stop(self):
        """Stop the aging task."""
        if self._aging_task:
            self._aging_task.cancel()

    def classify_priority(self, intent: str, is_vip: bool = False) -> Priority:
        """Determine request priority from intent and customer tier."""
        base_priority = INTENT_PRIORITY_MAP.get(intent, Priority.MEDIUM)

        if is_vip and base_priority > Priority.CRITICAL:
            boosted = Priority(max(0, base_priority - self.vip_boost))
            logger.debug("VIP boost: %s -> %s for intent=%s", base_priority.name, boosted.name, intent)
            return boosted

        return base_priority

    async def enqueue(self, request: dict, intent: str, is_vip: bool = False) -> asyncio.Future:
        """
        Add a request to the priority queue.

        Returns a Future that resolves when the request is processed.
        Raises OverflowError if queue is full.
        """
        async with self._lock:
            if len(self._heap) >= self.max_queue_size:
                self._rejected_overflow += 1
                raise OverflowError(
                    f"Priority queue full ({self.max_queue_size}). "
                    f"Request rejected for intent={intent}"
                )

            priority = self.classify_priority(intent, is_vip)
            self._sequence += 1
            future = asyncio.get_event_loop().create_future()

            entry = PrioritizedRequest(
                priority=priority.value,
                timestamp=time.time(),
                sequence=self._sequence,
                request=request,
                future=future,
            )

            heapq.heappush(self._heap, entry)
            self._not_empty.set()
            self._enqueued_by_priority[priority] += 1

            logger.debug(
                "Enqueued: priority=%s, queue_depth=%d, intent=%s, vip=%s",
                priority.name, len(self._heap), intent, is_vip,
            )
            return future

    async def dequeue(self) -> PrioritizedRequest:
        """
        Remove and return the highest-priority request.
        Blocks until a request is available.
        """
        while True:
            await self._not_empty.wait()
            async with self._lock:
                if self._heap:
                    entry = heapq.heappop(self._heap)
                    if not self._heap:
                        self._not_empty.clear()

                    priority = Priority(entry.priority)
                    self._dequeued_by_priority[priority] += 1
                    return entry

    async def _aging_loop(self):
        """
        Periodically boost priority of long-waiting requests to prevent starvation.

        Without aging, a sustained flood of P0 requests could starve P2 requests
        indefinitely. Aging promotes old P2 requests to P1 after aging_interval.
        """
        while True:
            await asyncio.sleep(self.aging_interval_sec)
            async with self._lock:
                aged_count = 0
                new_heap = []
                now = time.time()

                for entry in self._heap:
                    wait_time = now - entry.timestamp
                    age_intervals = int(wait_time / self.aging_interval_sec)

                    if age_intervals > 0 and entry.priority > Priority.CRITICAL:
                        new_priority = max(
                            Priority.CRITICAL,
                            entry.priority - (age_intervals * self.aging_boost),
                        )
                        if new_priority != entry.priority:
                            entry.priority = new_priority
                            aged_count += 1

                    new_heap.append(entry)

                if aged_count > 0:
                    heapq.heapify(new_heap)
                    self._heap = new_heap
                    logger.info("Aged %d requests in priority queue", aged_count)

    def get_depth_by_priority(self) -> dict:
        """Return current queue depth broken down by priority level."""
        counts = {p.name: 0 for p in Priority}
        for entry in self._heap:
            priority_name = Priority(entry.priority).name
            counts[priority_name] += 1
        return counts

    def get_metrics(self) -> dict:
        """Return queue metrics for CloudWatch."""
        return {
            "total_depth": len(self._heap),
            "depth_by_priority": self.get_depth_by_priority(),
            "enqueued_by_priority": {p.name: v for p, v in self._enqueued_by_priority.items()},
            "dequeued_by_priority": {p.name: v for p, v in self._dequeued_by_priority.items()},
            "rejected_overflow": self._rejected_overflow,
        }

invoke_model vs invoke_model_with_response_stream — Throughput Characteristics

Bedrock offers two invocation modes. The choice between them has direct throughput implications.

Comparison

Characteristic invoke_model (Synchronous) invoke_model_with_response_stream (Streaming)
Connection held Until full response generated Until full response generated
Time to first byte After full generation After first token (~200ms)
Bedrock slot occupancy Same duration Same duration
Throughput impact Neutral Neutral (same slot duration)
Client perceived latency High (waits for complete response) Low (sees tokens incrementally)
Network efficiency Single payload Chunked transfer
Error handling Simple (one response) Complex (partial response possible)
Best for Batch processing, short responses User-facing chat, long responses

Key Insight: Streaming Does Not Increase Throughput

A common misconception is that streaming responses frees Bedrock capacity sooner. It does not. The Bedrock model slot is occupied for the same duration regardless of mode. Streaming improves perceived latency (time to first token) but not throughput (requests per minute).

MangaAssist strategy: - User-facing chat: streaming (better UX, perceived responsiveness via WebSocket) - Batch processing: synchronous (simpler error handling, no partial response risk) - Order status lookups (Haiku, short responses): synchronous (response is < 100 tokens, streaming overhead not worth it)


Error Handling — Exponential Backoff and Dead-Letter Strategy

Backoff Strategy for Bedrock Throttling

flowchart TD
    REQ[Bedrock Request] --> INVOKE[invoke_model]
    INVOKE -->|200 OK| SUCCESS[Process Response]
    INVOKE -->|ThrottlingException| R1{Attempt 1}

    R1 -->|Wait 1s + jitter| RETRY1[Retry]
    RETRY1 -->|200 OK| SUCCESS
    RETRY1 -->|ThrottlingException| R2{Attempt 2}

    R2 -->|Wait 2s + jitter| RETRY2[Retry]
    RETRY2 -->|200 OK| SUCCESS
    RETRY2 -->|ThrottlingException| R3{Attempt 3}

    R3 -->|Wait 4s + jitter| RETRY3[Retry]
    RETRY3 -->|200 OK| SUCCESS
    RETRY3 -->|ThrottlingException| DEGRADE{Request Type?}

    DEGRADE -->|Real-time| FALLBACK[Graceful Degradation<br/>Cache / Template / Model Downgrade]
    DEGRADE -->|Batch| DLQ[Dead-Letter Queue]

    style SUCCESS fill:#2ecc71,color:#000
    style FALLBACK fill:#f39c12,color:#000
    style DLQ fill:#e74c3c,color:#fff

Backoff Formula

import random

def calculate_backoff(attempt: int, base_delay: float = 1.0, max_delay: float = 30.0) -> float:
    """
    Exponential backoff with full jitter.

    attempt 1: 0 to 1s
    attempt 2: 0 to 2s
    attempt 3: 0 to 4s
    attempt 4: 0 to 8s
    ...
    Capped at max_delay.

    Full jitter (vs equal jitter) reduces thundering herd when
    multiple MangaAssist ECS tasks retry simultaneously.
    """
    exponential_delay = min(max_delay, base_delay * (2 ** (attempt - 1)))
    return random.uniform(0, exponential_delay)

Dead-Letter Queue Handling

Messages reach the DLQ for two reasons:

  1. Retryable errors exhausted — Throttling persisted beyond max retries. These should be reprocessed after the peak subsides.
  2. Non-retryable errors — Invalid prompt, model validation error, malformed request. These need investigation and will never succeed as-is.
import json
import logging

import boto3

logger = logging.getLogger("mangaassist.dlq")


class DLQProcessor:
    """
    Processes messages from the dead-letter queue.

    Runs as a scheduled Lambda (every 15 minutes) to:
    1. Classify DLQ messages as retryable or non-retryable
    2. Re-enqueue retryable messages to the primary queue
    3. Log and alert on non-retryable messages
    """

    RETRYABLE_ERRORS = {
        "ThrottlingException",
        "ServiceUnavailableException",
        "InternalServerException",
    }

    NON_RETRYABLE_ERRORS = {
        "ValidationException",
        "AccessDeniedException",
        "ModelNotReadyException",
        "ResourceNotFoundException",
    }

    def __init__(self, dlq_url: str, primary_queue_url: str, sns_topic_arn: str):
        self.dlq_url = dlq_url
        self.primary_queue_url = primary_queue_url
        self.sns_topic_arn = sns_topic_arn
        self.sqs = boto3.client("sqs")
        self.sns = boto3.client("sns")

    def process_batch(self, max_messages: int = 10) -> dict:
        """Process a batch of DLQ messages."""
        response = self.sqs.receive_message(
            QueueUrl=self.dlq_url,
            MaxNumberOfMessages=max_messages,
            WaitTimeSeconds=5,
        )

        messages = response.get("Messages", [])
        stats = {"retried": 0, "discarded": 0, "alerted": 0}

        for msg in messages:
            body = json.loads(msg["Body"])
            error_type = body.get("error_type", "Unknown")

            if error_type in self.RETRYABLE_ERRORS:
                # Re-enqueue to primary queue
                self.sqs.send_message(
                    QueueUrl=self.primary_queue_url,
                    MessageBody=json.dumps(body.get("original_request", body)),
                )
                stats["retried"] += 1
                logger.info("Re-enqueued retryable DLQ message: %s", msg["MessageId"])

            elif error_type in self.NON_RETRYABLE_ERRORS:
                # Alert and discard
                self.sns.publish(
                    TopicArn=self.sns_topic_arn,
                    Subject=f"MangaAssist DLQ: Non-Retryable Error ({error_type})",
                    Message=json.dumps(body, indent=2),
                )
                stats["alerted"] += 1
                stats["discarded"] += 1
                logger.error("Non-retryable DLQ message discarded: %s", msg["MessageId"])

            else:
                # Unknown error — alert for investigation
                self.sns.publish(
                    TopicArn=self.sns_topic_arn,
                    Subject=f"MangaAssist DLQ: Unknown Error ({error_type})",
                    Message=json.dumps(body, indent=2),
                )
                stats["alerted"] += 1
                logger.warning("Unknown DLQ error type: %s", error_type)

            # Delete from DLQ after processing
            self.sqs.delete_message(
                QueueUrl=self.dlq_url,
                ReceiptHandle=msg["ReceiptHandle"],
            )

        return stats

Batch Processing Pipeline — End-to-End Sequence

sequenceDiagram
    participant CS as Catalog Service
    participant SQS as SQS FIFO Queue
    participant W as Batch Worker (ECS)
    participant AC as AdaptiveConcurrency<br/>Controller
    participant B as Bedrock Haiku
    participant DB as DynamoDB
    participant DLQ as Dead-Letter Queue
    participant CW as CloudWatch

    CS->>SQS: SendMessage (new manga title batch)
    Note over SQS: 50 titles queued<br/>MessageGroupId = "catalog"

    loop For each message
        W->>SQS: ReceiveMessage (long poll 20s)
        SQS-->>W: Message (title metadata)

        W->>AC: acquire()
        AC-->>W: slot granted (17/20 active)

        W->>B: invoke_model (catalog enrichment prompt)

        alt Success
            B-->>W: 200 OK (description + tags)
            W->>AC: release(latency=450ms, throttled=false)
            W->>DB: PutItem (enriched catalog entry)
            W->>SQS: DeleteMessage
            W->>CW: PutMetricData (batch_job_success)

        else ThrottlingException
            B-->>W: 429 ThrottlingException
            W->>AC: release(throttled=true)
            Note over AC: Reduces limit 20 -> 14
            W->>W: Exponential backoff (1s + jitter)
            W->>B: Retry invoke_model

        else Max Retries Exceeded
            W->>DLQ: SendMessage (failed job + error)
            W->>SQS: DeleteMessage
            W->>CW: PutMetricData (batch_job_dlq)
            DLQ-->>CW: DLQ Alarm (depth > 0)
        end
    end

    Note over AC: Evaluation window (30s)
    AC->>AC: p99=420ms < target=600ms<br/>throttles=0 -> grow 14 -> 19
    AC->>CW: PutMetricData (concurrency_limit=19)

Key Takeaways

  1. Route 25% of traffic to batch — Catalog enrichment, embeddings, reports, and pre-computation do not need real-time response. Moving them to SQS frees Bedrock capacity for user-facing chat.

  2. Start concurrency at N=10 and let it adapt — Starting conservatively and growing based on real signals is safer than guessing the right limit. The adaptive algorithm converges within 2-4 minutes.

  3. Priority queuing prevents revenue loss — During peak events, a customer with a payment problem must not wait behind 500 browsing queries. Priority ordering with VIP boost ensures the highest-value requests are served first.

  4. Streaming does not increase throughput — It improves perceived latency for users but does not free Bedrock model slots any faster. Use it for UX, not for throughput.

  5. Dead-letter queues need active processing — A DLQ that nobody reads is a silent failure. Scheduled processing that classifies, re-enqueues retryable errors, and alerts on non-retryable errors turns the DLQ into an operational tool.

  6. Aging prevents starvation — Without aging, a sustained flood of P0 requests starves all P2 requests indefinitely. Periodic priority promotion ensures every request eventually gets processed.