LOCAL PREVIEW View on GitHub

FM Throughput Optimization Architecture

AWS AIP-C01 Task 4.2 — Skill 4.2.3: Optimize FM throughput to sustain high request volumes Context: MangaAssist e-commerce chatbot — Bedrock Claude 3 (Sonnet/Haiku), OpenSearch Serverless, DynamoDB, ECS Fargate, API Gateway WebSocket. 1M messages/day, peak 20K concurrent users. Target: sustain 1,000+ requests/minute to Bedrock.


Skill Mapping

Certification Domain Task Skill
AWS AIP-C01 Domain 4 — Operational Efficiency & Optimization Task 4.2 — Optimize FM performance Skill 4.2.3 — Optimize FM throughput to sustain high request volumes

Skill scope: Design and implement throughput optimization systems that maximize the number of FM invocations per unit time while maintaining acceptable latency — covering token efficiency, batch inference, concurrency management, queue-based request handling, and backpressure strategies.


Mind Map — FM Throughput Optimization Dimensions

mindmap
  root((FM Throughput<br/>Optimization))
    Token Processing
      Input Token Minimization
        Prompt Compression
        Context Window Pruning
        Template Parameterization
      Output Token Control
        max_tokens Tuning
        Stop Sequence Optimization
        Structured Output Schemas
      Throughput Formula
        Fewer Tokens = Faster Processing
        Faster Processing = More Requests/Second
        Smaller Prompts = Higher Concurrency
    Batch Inference
      SQS-Based Batch Queue
        Embedding Updates
        Report Generation
        Catalog Enrichment
      Micro-Batching
        Similar Intent Grouping
        Window-Based Collection
        Shared Context Prefix
      Scheduling
        Off-Peak Batch Windows
        Priority-Based Dispatch
        Deadline-Aware Processing
    Concurrency Management
      Semaphore-Based Limiter
        Token Bucket Algorithm
        Fixed Window Counter
        Sliding Window Log
      Adaptive Concurrency
        Throttle Signal Detection
        P99 Latency Feedback
        Capacity Probing
      Model-Specific Limits
        Sonnet Concurrency Pool
        Haiku Concurrency Pool
        Cross-Model Balancing
    Queue Management
      Priority Queues
        Order Issues — P0
        Recommendations — P1
        Browsing — P2
      Dead-Letter Queues
        Failed Invocations
        Retry Exhaustion
        Poison Message Isolation
      Rate Shaping
        Token Bucket Admission
        Leaky Bucket Smoothing
        Burst Allowance
    Backpressure Handling
      Circuit Breaker
        Throttle Detection
        Open/Half-Open/Closed
        Recovery Probing
      Graceful Degradation
        Template Responses
        Cached Answer Fallback
        Model Downgrade Sonnet to Haiku
      Client Signaling
        WebSocket Backpressure
        Retry-After Headers
        Queue Position Updates
    Throughput Metrics
      tokens-processed-per-second
      invocations-per-minute
      throttle-rate
      queue-depth
      batch-utilization-ratio
      effective-concurrency

Why Throughput Optimization Matters for MangaAssist

MangaAssist processes 1 million messages per day with peaks of 20,000 concurrent users during manga release events and flash sales. Each user message triggers at least one Bedrock invocation, and complex flows (order lookup + recommendation) trigger two or three. Without throughput optimization, Bedrock throttling cascades into WebSocket timeouts, abandoned carts, and revenue loss.

The throughput equation for FM systems:

Effective Throughput = min(
    Bedrock Account Limit,
    Concurrency × (1 / Avg Latency per Request),
    Queue Drain Rate
)

Every optimization in this document targets one of these three bottleneck dimensions.


Token Processing Optimization — Fewer Tokens, More Throughput

The relationship between token count and throughput is direct: fewer input tokens mean faster time-to-first-token and shorter total inference time, which means the Bedrock slot is freed sooner for the next request.

Token Reduction Strategies for MangaAssist

Strategy Before (tokens) After (tokens) Reduction Throughput Impact
Prompt template parameterization 850 320 62% +165% requests/min
Context window pruning (top-3 instead of top-5 RAG chunks) 1,200 680 43% +75% requests/min
Structured output schema (JSON vs prose) N/A (output) -40% output tokens 40% +55% requests/min
System prompt compression 480 210 56% +130% requests/min
Conversation history summarization (beyond 5 turns) 2,000 400 80% +400% requests/min

Token-Efficient Prompt Design

# BEFORE: Verbose prompt — ~850 input tokens
VERBOSE_PROMPT = """
You are a helpful customer service assistant for a Japanese manga e-commerce store
called MangaAssist. You help customers with their orders, recommend manga titles,
and answer questions about shipping, returns, and product availability. Please be
polite, helpful, and knowledgeable about manga. When a customer asks about an order,
look up the order details and provide a comprehensive summary including the order
status, shipping information, estimated delivery date, and any relevant tracking
information. Format your response in a friendly conversational tone.

Customer conversation history:
{full_conversation_history}

Retrieved context from knowledge base:
{all_retrieved_chunks}

Customer's current question: {question}
"""

# AFTER: Compressed prompt — ~320 input tokens
OPTIMIZED_PROMPT = """<role>MangaAssist support agent</role>
<task>{intent_classification}</task>
<context>{top_3_chunks_only}</context>
<history>{last_3_turns_summary}</history>
<query>{question}</query>
<format>JSON: {{"response": str, "action": str|null}}</format>"""

The optimized prompt produces equivalent quality responses while consuming 62% fewer input tokens — directly translating to higher throughput because each Bedrock invocation completes faster.


Batch Inference Strategies

Not every MangaAssist workload requires real-time response. Batch inference shifts non-interactive work to queues, freeing real-time Bedrock capacity for user-facing requests.

Real-Time vs Batch Classification

Workload Type SLA Strategy
User chat response Real-time < 3 seconds Direct invoke_model
Order status lookup Real-time < 2 seconds Direct invoke_model (Haiku)
Manga recommendation generation Near-real-time < 10 seconds Micro-batch
Product description enrichment Batch < 1 hour SQS batch queue
Weekly recommendation emails Batch < 6 hours SQS batch queue (off-peak)
Embedding updates for new titles Batch < 2 hours SQS batch queue
Customer sentiment reports Batch < 24 hours SQS batch queue (lowest priority)

SQS-Based Batch Queue Architecture

flowchart LR
    subgraph Producers
        A[Catalog Service] -->|New titles| Q1
        B[Analytics Service] -->|Report requests| Q1
        C[Recommendation Engine] -->|Embedding jobs| Q1
    end

    subgraph "SQS Batch Queues"
        Q1[batch-inference-queue<br/>FIFO] --> DLQ1[batch-dlq<br/>Dead Letter Queue]
    end

    subgraph "Batch Processor — ECS Fargate"
        W1[Worker 1] -->|poll| Q1
        W2[Worker 2] -->|poll| Q1
        W3[Worker 3] -->|poll| Q1
    end

    subgraph "Bedrock — Batch Pool"
        W1 -->|invoke_model| BH[Claude 3 Haiku<br/>Batch Concurrency: 20]
        W2 -->|invoke_model| BH
        W3 -->|invoke_model| BH
    end

    subgraph "Results"
        BH --> D1[(DynamoDB<br/>Enriched Catalog)]
        BH --> D2[(OpenSearch<br/>Updated Embeddings)]
        BH --> S3[S3<br/>Generated Reports]
    end

    style Q1 fill:#ff9900,color:#000
    style DLQ1 fill:#d13212,color:#fff
    style BH fill:#1a73e8,color:#fff

Micro-Batching for Similar Intent Queries

When multiple users ask similar questions within a short window (e.g., "when does One Piece volume 108 release?"), micro-batching groups them and makes a single Bedrock invocation, distributing the response to all waiters.

import asyncio
import time
import hashlib
from dataclasses import dataclass, field
from typing import Dict, List


@dataclass
class PendingRequest:
    """A request waiting for batch processing."""
    query: str
    intent: str
    future: asyncio.Future
    timestamp: float = field(default_factory=time.time)


class MicroBatchProcessor:
    """
    Groups similar-intent queries within a time window and processes them
    as a single Bedrock invocation. Effective for MangaAssist during events
    when many users ask the same questions.
    """

    def __init__(
        self,
        bedrock_client,
        batch_window_ms: int = 100,
        max_batch_size: int = 10,
        similarity_threshold: float = 0.85,
    ):
        self.bedrock = bedrock_client
        self.batch_window_ms = batch_window_ms
        self.max_batch_size = max_batch_size
        self.similarity_threshold = similarity_threshold
        self.pending_batches: Dict[str, List[PendingRequest]] = {}
        self._flush_task: asyncio.Task | None = None

    def _compute_batch_key(self, intent: str, query: str) -> str:
        """Create a grouping key from intent + normalized query prefix."""
        normalized = query.lower().strip()[:50]
        return hashlib.md5(f"{intent}:{normalized}".encode()).hexdigest()[:12]

    async def submit(self, query: str, intent: str, context: str) -> str:
        """Submit a request for micro-batch processing."""
        loop = asyncio.get_event_loop()
        future = loop.create_future()
        request = PendingRequest(query=query, intent=intent, future=future)
        batch_key = self._compute_batch_key(intent, query)

        if batch_key not in self.pending_batches:
            self.pending_batches[batch_key] = []
            # Schedule flush after batch window
            asyncio.create_task(self._schedule_flush(batch_key))

        self.pending_batches[batch_key].append(request)

        # Flush immediately if batch is full
        if len(self.pending_batches[batch_key]) >= self.max_batch_size:
            await self._flush_batch(batch_key)

        return await future

    async def _schedule_flush(self, batch_key: str):
        """Wait for batch window then flush."""
        await asyncio.sleep(self.batch_window_ms / 1000)
        if batch_key in self.pending_batches:
            await self._flush_batch(batch_key)

    async def _flush_batch(self, batch_key: str):
        """Process all pending requests in a batch with a single invocation."""
        requests = self.pending_batches.pop(batch_key, [])
        if not requests:
            return

        # Use the first query as representative
        representative = requests[0]
        try:
            response = await self.bedrock.invoke_model(
                modelId="anthropic.claude-3-haiku-20240307-v1:0",
                body={
                    "anthropic_version": "bedrock-2023-05-31",
                    "messages": [{"role": "user", "content": representative.query}],
                    "max_tokens": 300,
                },
            )
            result = response["body"]
            # Distribute same response to all waiters
            for req in requests:
                if not req.future.done():
                    req.future.set_result(result)
        except Exception as e:
            for req in requests:
                if not req.future.done():
                    req.future.set_exception(e)

Concurrent Model Invocation Management

Bedrock has per-account, per-model concurrency limits. Exceeding them produces ThrottlingException. MangaAssist uses a semaphore-based concurrency limiter with adaptive capacity that responds to real-time throttle signals.

Architecture — Concurrency Management System

flowchart TB
    subgraph "API Gateway WebSocket"
        WS[Incoming Messages<br/>~700/min average<br/>~1200/min peak]
    end

    subgraph "ECS Fargate — Orchestrator"
        WS --> RC[Request Classifier<br/>Intent + Priority]
        RC -->|Real-time P0| CM
        RC -->|Real-time P1| CM
        RC -->|Batch P2| SQS[SQS Batch Queue]

        CM[ConcurrencyManager<br/>Adaptive Semaphore]
        CM -->|acquire| POOL

        subgraph POOL["Bedrock Concurrency Pool"]
            S1[Sonnet Pool<br/>max=30, current=24]
            H1[Haiku Pool<br/>max=50, current=38]
        end

        POOL -->|throttle signal| AC[Adaptive Controller]
        POOL -->|success signal| AC
        AC -->|adjust limits| CM
    end

    subgraph "Bedrock"
        S1 --> BS[Claude 3 Sonnet]
        H1 --> BH[Claude 3 Haiku]
    end

    subgraph "Fallback"
        CM -->|circuit open| FB[Template Response<br/>+ Cache Lookup]
    end

    style CM fill:#ff9900,color:#000
    style AC fill:#1a73e8,color:#fff
    style FB fill:#d13212,color:#fff

ConcurrencyManager with Adaptive Semaphore

import asyncio
import time
import logging
from dataclasses import dataclass, field
from enum import Enum
from typing import Optional

logger = logging.getLogger("mangaassist.throughput")


class CircuitState(Enum):
    CLOSED = "closed"          # Normal operation
    OPEN = "open"              # Rejecting requests (Bedrock overloaded)
    HALF_OPEN = "half_open"    # Probing recovery


@dataclass
class ThroughputMetrics:
    """Rolling metrics for throughput monitoring."""
    invocations_total: int = 0
    throttles_total: int = 0
    successes_total: int = 0
    tokens_processed: int = 0
    window_start: float = field(default_factory=time.time)
    window_duration_sec: float = 60.0

    @property
    def invocations_per_minute(self) -> float:
        elapsed = max(time.time() - self.window_start, 1.0)
        return (self.invocations_total / elapsed) * 60

    @property
    def throttle_rate(self) -> float:
        if self.invocations_total == 0:
            return 0.0
        return self.throttles_total / self.invocations_total

    @property
    def tokens_per_second(self) -> float:
        elapsed = max(time.time() - self.window_start, 1.0)
        return self.tokens_processed / elapsed

    def reset_if_window_expired(self):
        if time.time() - self.window_start >= self.window_duration_sec:
            self.invocations_total = 0
            self.throttles_total = 0
            self.successes_total = 0
            self.tokens_processed = 0
            self.window_start = time.time()


class ConcurrencyManager:
    """
    Adaptive semaphore-based concurrency limiter for Bedrock invocations.

    Adjusts concurrency limits dynamically based on:
    - ThrottlingException rate from Bedrock
    - P99 latency feedback
    - Circuit breaker state

    MangaAssist configuration:
    - Sonnet pool: starts at 30 concurrent, range [10, 50]
    - Haiku pool: starts at 50 concurrent, range [20, 80]
    """

    def __init__(
        self,
        model_id: str,
        initial_concurrency: int = 30,
        min_concurrency: int = 10,
        max_concurrency: int = 50,
        throttle_backoff_factor: float = 0.7,
        success_growth_factor: float = 1.1,
        circuit_open_duration_sec: float = 30.0,
        throttle_threshold: float = 0.10,
    ):
        self.model_id = model_id
        self.current_limit = initial_concurrency
        self.min_concurrency = min_concurrency
        self.max_concurrency = max_concurrency
        self.throttle_backoff_factor = throttle_backoff_factor
        self.success_growth_factor = success_growth_factor
        self.circuit_open_duration_sec = circuit_open_duration_sec
        self.throttle_threshold = throttle_threshold

        self._semaphore = asyncio.Semaphore(initial_concurrency)
        self._active_count = 0
        self._circuit_state = CircuitState.CLOSED
        self._circuit_opened_at: Optional[float] = None
        self.metrics = ThroughputMetrics()

        logger.info(
            "ConcurrencyManager initialized: model=%s, concurrency=%d",
            model_id, initial_concurrency,
        )

    @property
    def circuit_state(self) -> CircuitState:
        return self._circuit_state

    @property
    def active_count(self) -> int:
        return self._active_count

    async def acquire(self) -> bool:
        """
        Acquire a concurrency slot. Returns False if circuit is open.
        """
        self.metrics.reset_if_window_expired()

        # Circuit breaker check
        if self._circuit_state == CircuitState.OPEN:
            if time.time() - self._circuit_opened_at >= self.circuit_open_duration_sec:
                self._circuit_state = CircuitState.HALF_OPEN
                logger.info("Circuit half-open: probing Bedrock for %s", self.model_id)
            else:
                return False

        if self._circuit_state == CircuitState.HALF_OPEN:
            # Only allow 1 probe request
            if self._active_count >= 1:
                return False

        await self._semaphore.acquire()
        self._active_count += 1
        self.metrics.invocations_total += 1
        return True

    async def release(self, success: bool, tokens_used: int = 0, throttled: bool = False):
        """
        Release a concurrency slot and feed back result for adaptation.
        """
        self._active_count -= 1
        self._semaphore.release()

        if throttled:
            self.metrics.throttles_total += 1
            await self._handle_throttle()
        elif success:
            self.metrics.successes_total += 1
            self.metrics.tokens_processed += tokens_used
            await self._handle_success()

    async def _handle_throttle(self):
        """Reduce concurrency on throttle signal."""
        if self._circuit_state == CircuitState.HALF_OPEN:
            # Probe failed — reopen circuit
            self._circuit_state = CircuitState.OPEN
            self._circuit_opened_at = time.time()
            logger.warning("Circuit reopened for %s after probe throttle", self.model_id)
            return

        new_limit = max(
            self.min_concurrency,
            int(self.current_limit * self.throttle_backoff_factor),
        )
        if new_limit != self.current_limit:
            logger.warning(
                "Throttle detected: reducing concurrency %d -> %d for %s",
                self.current_limit, new_limit, self.model_id,
            )
            await self._resize_semaphore(new_limit)

        # Open circuit if throttle rate exceeds threshold
        if self.metrics.throttle_rate >= self.throttle_threshold:
            self._circuit_state = CircuitState.OPEN
            self._circuit_opened_at = time.time()
            logger.error(
                "Circuit OPEN for %s: throttle_rate=%.2f >= %.2f",
                self.model_id, self.metrics.throttle_rate, self.throttle_threshold,
            )

    async def _handle_success(self):
        """Gradually increase concurrency on sustained success."""
        if self._circuit_state == CircuitState.HALF_OPEN:
            self._circuit_state = CircuitState.CLOSED
            logger.info("Circuit closed for %s after successful probe", self.model_id)
            return

        # Only grow if throttle rate is near zero and we have enough samples
        if (
            self.metrics.throttle_rate < 0.01
            and self.metrics.invocations_total >= 20
            and self.current_limit < self.max_concurrency
        ):
            new_limit = min(
                self.max_concurrency,
                int(self.current_limit * self.success_growth_factor),
            )
            if new_limit != self.current_limit:
                logger.info(
                    "Growing concurrency %d -> %d for %s",
                    self.current_limit, new_limit, self.model_id,
                )
                await self._resize_semaphore(new_limit)

    async def _resize_semaphore(self, new_limit: int):
        """Resize the semaphore to a new limit."""
        delta = new_limit - self.current_limit
        if delta > 0:
            for _ in range(delta):
                self._semaphore.release()
        elif delta < 0:
            for _ in range(abs(delta)):
                await self._semaphore.acquire()
        self.current_limit = new_limit

    def get_status(self) -> dict:
        """Return current status for CloudWatch metrics emission."""
        return {
            "model_id": self.model_id,
            "current_limit": self.current_limit,
            "active_count": self._active_count,
            "circuit_state": self._circuit_state.value,
            "invocations_per_minute": round(self.metrics.invocations_per_minute, 1),
            "throttle_rate": round(self.metrics.throttle_rate, 4),
            "tokens_per_second": round(self.metrics.tokens_per_second, 1),
        }

Queue-Based Request Management

MangaAssist uses a priority queue system to ensure high-value requests (order issues, payment problems) are processed before low-priority requests (browsing recommendations) when Bedrock capacity is constrained.

Priority Queue Design

Priority Category Examples Target SLA Queue
P0 — Critical Order/Payment issues "Where is my order?", "Payment failed" < 2s Real-time, dedicated Haiku pool
P1 — High Purchase intent "Recommend manga like...", "Is this in stock?" < 3s Real-time, shared Sonnet pool
P2 — Medium Browsing assistance "Tell me about this series", "What genres?" < 5s Real-time with degradation allowed
P3 — Low Batch workloads Embedding updates, report generation < 1 hour SQS batch queue

Dead-Letter Queue Strategy

flowchart LR
    subgraph "Primary Processing"
        Q[Priority Queue] --> P[Processor]
        P -->|invoke_model| B[Bedrock]
    end

    subgraph "Retry Logic"
        B -->|ThrottlingException| R{Retry<br/>count < 3?}
        R -->|Yes| EXP[Exponential Backoff<br/>1s, 2s, 4s + jitter]
        EXP --> P
        R -->|No| DLQ
    end

    subgraph "Dead-Letter Handling"
        DLQ[Dead-Letter Queue<br/>MaxReceiveCount=3] --> ALARM[CloudWatch Alarm<br/>DLQ depth > 0]
        ALARM --> SNS[SNS Notification]
        DLQ --> PROC[DLQ Processor<br/>Scheduled Lambda]
        PROC -->|Retryable?| Q
        PROC -->|Non-retryable| LOG[Error Log +<br/>Customer Notification]
    end

    style DLQ fill:#d13212,color:#fff
    style ALARM fill:#ff9900,color:#000

Backpressure Handling

When Bedrock capacity is saturated, MangaAssist must avoid cascading failures. The backpressure system uses three layers of defense.

Layer 1 — Circuit Breaker

The ConcurrencyManager above includes circuit breaker logic. When the throttle rate exceeds 10%, the circuit opens for 30 seconds, rejecting new requests immediately instead of queuing them behind failing invocations.

Layer 2 — Graceful Degradation

When the circuit is open, MangaAssist does not return errors to users. Instead, it falls back through a degradation chain:

1. Try Sonnet → THROTTLED
2. Try Haiku (simpler model, higher limits) → THROTTLED
3. Try cached response (DynamoDB, exact or fuzzy match) → MISS
4. Return template response based on detected intent

Layer 3 — Client Signaling via WebSocket

# WebSocket backpressure message to client
{
    "type": "system",
    "status": "high_load",
    "message": "We're experiencing high demand. Your response may take a moment.",
    "estimated_wait_sec": 8,
    "queue_position": 42
}

Token Throughput Metrics Dashboard

Key Metrics for CloudWatch

Metric Unit Target Alarm Threshold Description
TokensProcessedPerSecond Count/sec > 5,000 < 2,000 Total input+output tokens processed across all models
InvocationsPerMinute Count/min > 1,000 < 500 Successful Bedrock invocations per minute
ThrottleRate Ratio < 0.02 > 0.05 ThrottlingExceptions / Total invocations
EffectiveConcurrency Count 60-80% of limit < 40% of limit Active concurrent invocations / Configured limit
QueueDepth Count < 100 > 500 Messages waiting in priority queue
BatchQueueAge Seconds < 600 > 3,600 Age of oldest message in batch queue
DLQDepth Count 0 > 0 Messages in dead-letter queue
DegradationRate Ratio < 0.01 > 0.05 Requests served by fallback / Total requests

Batch Inference Processor with SQS

import json
import asyncio
import logging
import boto3
from botocore.exceptions import ClientError
from dataclasses import dataclass
from typing import Optional

logger = logging.getLogger("mangaassist.batch")


@dataclass
class BatchJob:
    """Represents a batch inference job from SQS."""
    message_id: str
    receipt_handle: str
    job_type: str          # "embedding_update" | "report_generation" | "catalog_enrichment"
    payload: dict
    priority: int          # 0 = highest
    retry_count: int = 0


class BatchInferenceProcessor:
    """
    SQS-based batch inference processor for non-real-time MangaAssist workloads.

    Processes jobs from an SQS FIFO queue using a dedicated Haiku concurrency
    pool, separate from real-time user-facing traffic. Jobs that fail after
    max retries are sent to a dead-letter queue.

    Designed for:
    - Product description enrichment (new manga titles)
    - Embedding regeneration (catalog updates)
    - Weekly recommendation batch generation
    - Customer sentiment report generation
    """

    def __init__(
        self,
        queue_url: str,
        dlq_url: str,
        concurrency_manager: "ConcurrencyManager",
        max_workers: int = 5,
        visibility_timeout_sec: int = 300,
        max_retries: int = 3,
    ):
        self.queue_url = queue_url
        self.dlq_url = dlq_url
        self.concurrency_manager = concurrency_manager
        self.max_workers = max_workers
        self.visibility_timeout_sec = visibility_timeout_sec
        self.max_retries = max_retries

        self.sqs = boto3.client("sqs")
        self.bedrock = boto3.client("bedrock-runtime")
        self._running = False
        self._workers: list[asyncio.Task] = []

    async def start(self):
        """Start batch processing workers."""
        self._running = True
        self._workers = [
            asyncio.create_task(self._worker_loop(i))
            for i in range(self.max_workers)
        ]
        logger.info(
            "BatchInferenceProcessor started: workers=%d, queue=%s",
            self.max_workers, self.queue_url,
        )

    async def stop(self):
        """Gracefully stop all workers."""
        self._running = False
        for worker in self._workers:
            worker.cancel()
        await asyncio.gather(*self._workers, return_exceptions=True)
        logger.info("BatchInferenceProcessor stopped")

    async def _worker_loop(self, worker_id: int):
        """Main worker loop: poll SQS, process, acknowledge."""
        while self._running:
            try:
                response = self.sqs.receive_message(
                    QueueUrl=self.queue_url,
                    MaxNumberOfMessages=1,
                    WaitTimeSeconds=20,
                    VisibilityTimeout=self.visibility_timeout_sec,
                    AttributeNames=["ApproximateReceiveCount"],
                )

                messages = response.get("Messages", [])
                if not messages:
                    continue

                for msg in messages:
                    job = self._parse_message(msg)
                    if job:
                        await self._process_job(worker_id, job)

            except asyncio.CancelledError:
                break
            except Exception:
                logger.exception("Worker %d encountered error", worker_id)
                await asyncio.sleep(5)

    def _parse_message(self, message: dict) -> Optional[BatchJob]:
        """Parse SQS message into a BatchJob."""
        try:
            body = json.loads(message["Body"])
            receive_count = int(
                message.get("Attributes", {}).get("ApproximateReceiveCount", "1")
            )
            return BatchJob(
                message_id=message["MessageId"],
                receipt_handle=message["ReceiptHandle"],
                job_type=body["job_type"],
                payload=body["payload"],
                priority=body.get("priority", 2),
                retry_count=receive_count - 1,
            )
        except (KeyError, json.JSONDecodeError):
            logger.error("Failed to parse message: %s", message["MessageId"])
            return None

    async def _process_job(self, worker_id: int, job: BatchJob):
        """Process a single batch job through Bedrock."""
        acquired = await self.concurrency_manager.acquire()
        if not acquired:
            logger.warning(
                "Worker %d: circuit open, returning job %s to queue",
                worker_id, job.message_id,
            )
            return  # Message becomes visible again after visibility timeout

        try:
            prompt = self._build_prompt(job)
            response = self.bedrock.invoke_model(
                modelId="anthropic.claude-3-haiku-20240307-v1:0",
                contentType="application/json",
                accept="application/json",
                body=json.dumps({
                    "anthropic_version": "bedrock-2023-05-31",
                    "messages": [{"role": "user", "content": prompt}],
                    "max_tokens": 500,
                }),
            )

            result = json.loads(response["body"].read())
            await self._store_result(job, result)

            # Delete message on success
            self.sqs.delete_message(
                QueueUrl=self.queue_url,
                ReceiptHandle=job.receipt_handle,
            )
            await self.concurrency_manager.release(success=True, tokens_used=200)
            logger.info("Job %s completed by worker %d", job.message_id, worker_id)

        except ClientError as e:
            error_code = e.response["Error"]["Code"]
            throttled = error_code == "ThrottlingException"
            await self.concurrency_manager.release(
                success=False, throttled=throttled,
            )
            if job.retry_count >= self.max_retries:
                await self._send_to_dlq(job, str(e))
            else:
                logger.warning(
                    "Job %s failed (attempt %d/%d): %s",
                    job.message_id, job.retry_count + 1, self.max_retries, error_code,
                )
        except Exception as e:
            await self.concurrency_manager.release(success=False)
            logger.exception("Unexpected error processing job %s", job.message_id)
            if job.retry_count >= self.max_retries:
                await self._send_to_dlq(job, str(e))

    def _build_prompt(self, job: BatchJob) -> str:
        """Build prompt based on job type."""
        if job.job_type == "catalog_enrichment":
            return (
                f"Generate a concise product description for this manga title:\n"
                f"Title: {job.payload['title']}\n"
                f"Author: {job.payload['author']}\n"
                f"Genre: {job.payload['genre']}\n"
                f"Format: JSON with keys: description, tags, audience"
            )
        elif job.job_type == "embedding_update":
            return (
                f"Summarize this manga for embedding generation:\n"
                f"{job.payload['content']}\n"
                f"Output: single paragraph, max 100 words"
            )
        elif job.job_type == "report_generation":
            return (
                f"Analyze these customer interaction metrics:\n"
                f"{json.dumps(job.payload['metrics'])}\n"
                f"Output: JSON with keys: summary, trends, recommendations"
            )
        return job.payload.get("prompt", "")

    async def _store_result(self, job: BatchJob, result: dict):
        """Store batch job result to appropriate destination."""
        # Implementation varies by job type — DynamoDB, OpenSearch, S3
        logger.info("Stored result for job %s (type=%s)", job.message_id, job.job_type)

    async def _send_to_dlq(self, job: BatchJob, error: str):
        """Send failed job to dead-letter queue for investigation."""
        self.sqs.send_message(
            QueueUrl=self.dlq_url,
            MessageBody=json.dumps({
                "original_job": {
                    "message_id": job.message_id,
                    "job_type": job.job_type,
                    "payload": job.payload,
                },
                "error": error,
                "retry_count": job.retry_count,
            }),
        )
        # Delete from primary queue
        self.sqs.delete_message(
            QueueUrl=self.queue_url,
            ReceiptHandle=job.receipt_handle,
        )
        logger.error(
            "Job %s sent to DLQ after %d retries: %s",
            job.message_id, job.retry_count, error,
        )

Throughput Strategy Comparison

Strategy Requests/Min Capacity Latency Impact Complexity Best For
Token minimization +60-165% over baseline -40-60% per request Low All workloads — do this first
Micro-batching +30-50% for similar queries +50-100ms (batch window) Medium High-traffic identical queries
SQS batch queue Offloads 20-30% of traffic N/A (async) Medium Non-real-time workloads
Adaptive concurrency +20-40% utilization Neutral High Maximizing Bedrock capacity usage
Priority queuing Neutral (redistribution) -30% for P0 requests Medium Mixed-priority workloads
Circuit breaker Prevents cascading failure +0ms (fast reject) Medium Overload protection
Model downgrade (Sonnet to Haiku) +200-300% +0ms (Haiku is faster) Low Degradation fallback
Conversation summarization +100-400% -50-80% per request Medium Multi-turn conversations

Combined Throughput Impact for MangaAssist

Baseline:                          ~400 requests/min
+ Token minimization:              ~700 requests/min  (+75%)
+ Batch offload (non-real-time):   ~850 requests/min  (+21%, real-time freed)
+ Adaptive concurrency:            ~1,020 requests/min (+20%)
+ Priority queuing + degradation:  ~1,100 requests/min (sustained with graceful fallback)

Target achieved: 1,000+ requests/min sustained

Key Takeaways

  1. Token reduction is the highest-ROI throughput optimization — fewer tokens per request means faster processing, which means more requests per second. Always optimize prompts before adding infrastructure complexity.

  2. Separate real-time from batch traffic — Batch workloads (embeddings, reports, enrichment) should never compete with user-facing chat for Bedrock capacity. SQS queues with dedicated concurrency pools enforce this isolation.

  3. Adaptive concurrency beats static limits — Static concurrency settings either waste capacity (set too low) or cause throttle storms (set too high). Adaptive systems continuously probe the optimal operating point.

  4. Backpressure must be end-to-end — Circuit breakers at the Bedrock layer are necessary but not sufficient. Graceful degradation (cached responses, template fallbacks, model downgrades) ensures users always get a response.

  5. Dead-letter queues are not optional — Every queue needs a DLQ with monitoring. A growing DLQ is an early warning of misconfigured retries, prompt errors, or Bedrock service issues.