Batch Inference and Concurrency Deep-Dive
AWS AIP-C01 Task 4.2 — Skill 4.2.3: Optimize FM throughput — batch inference and concurrency patterns Context: MangaAssist e-commerce chatbot — Bedrock Claude 3 (Sonnet/Haiku), OpenSearch Serverless, DynamoDB, ECS Fargate, API Gateway WebSocket. 1M messages/day, peak 20K concurrent users. Target: sustain 1,000+ requests/minute to Bedrock.
Skill Mapping
| Certification | Domain | Task | Skill |
|---|---|---|---|
| AWS AIP-C01 | Domain 4 — Operational Efficiency & Optimization | Task 4.2 — Optimize FM performance | Skill 4.2.3 — Optimize FM throughput — batch inference and concurrency patterns |
Skill scope: Deep-dive into batch inference strategies, adaptive concurrency control, priority-based request queuing, and streaming vs synchronous throughput characteristics for Bedrock Claude 3 models.
Real-Time vs Batch — When to Use Each
The fundamental throughput decision for MangaAssist is: does this request need a response within seconds, or can it wait? Every request routed to batch processing frees real-time Bedrock capacity for user-facing chat.
Decision Matrix
flowchart TD
START[Incoming Request] --> Q1{User waiting<br/>for response?}
Q1 -->|Yes| Q2{Latency<br/>requirement?}
Q1 -->|No| BATCH[Batch Processing<br/>SQS Queue]
Q2 -->|< 2s| RT_HAIKU[Real-Time: Haiku<br/>Order status, FAQ]
Q2 -->|< 5s| RT_SONNET[Real-Time: Sonnet<br/>Recommendations, complex queries]
Q2 -->|< 10s| MICRO[Micro-Batch<br/>Group similar queries]
BATCH --> Q3{Deadline?}
Q3 -->|< 1 hour| BATCH_HIGH[Batch High Priority<br/>Catalog enrichment]
Q3 -->|< 6 hours| BATCH_MED[Batch Medium Priority<br/>Recommendation pre-compute]
Q3 -->|< 24 hours| BATCH_LOW[Batch Low Priority<br/>Reports, analytics]
style RT_HAIKU fill:#2ecc71,color:#000
style RT_SONNET fill:#3498db,color:#fff
style MICRO fill:#f39c12,color:#000
style BATCH_HIGH fill:#e74c3c,color:#fff
style BATCH_MED fill:#e67e22,color:#fff
style BATCH_LOW fill:#95a5a6,color:#000
MangaAssist Workload Distribution
| Workload Category | % of Total | Processing Mode | Model | Concurrency Pool |
|---|---|---|---|---|
| User chat — simple queries | 40% | Real-time | Haiku | Haiku RT pool (50 slots) |
| User chat — complex queries | 25% | Real-time | Sonnet | Sonnet RT pool (30 slots) |
| User chat — duplicate queries during events | 10% | Micro-batch | Haiku | Haiku RT pool (shared) |
| Catalog enrichment (new manga arrivals) | 8% | Batch | Haiku | Batch pool (20 slots) |
| Recommendation pre-computation | 7% | Batch | Sonnet | Batch pool (10 slots) |
| Embedding regeneration | 5% | Batch | Haiku | Batch pool (20 slots) |
| Analytics and reporting | 3% | Batch (off-peak) | Sonnet | Batch pool (10 slots) |
| Customer sentiment analysis | 2% | Batch (off-peak) | Haiku | Batch pool (20 slots) |
By routing 25% of total volume to batch processing, MangaAssist frees the equivalent of ~250 requests/minute of real-time Bedrock capacity.
Adaptive Concurrency Control
Static concurrency limits fail because Bedrock capacity fluctuates based on regional load, account-level quotas, and time of day. Adaptive concurrency discovers the optimal operating point in real-time.
The Adaptive Concurrency Algorithm
Start with N=10 concurrent Bedrock calls
Every evaluation_window (30 seconds):
IF throttle_count == 0 AND p99_latency < target:
N = min(N + growth_step, max_limit) # Probe higher
ELIF throttle_count > 0:
N = max(N * backoff_factor, min_limit) # Back off
ELSE:
N = N # Hold steady
Why Start at N=10?
Starting conservatively at N=10 (not the account limit) is intentional:
- Cold start safety — New deployments should ramp up, not slam Bedrock at full capacity.
- Shared account limits — Other services in the AWS account may also be calling Bedrock.
- Time-of-day variation — Available capacity at 2 AM differs from noon during a manga sale.
- Fast convergence — With a growth step of 3-5 per window, N=10 reaches N=50 in ~4 minutes.
Concurrency Adaptation Visualization
sequenceDiagram
participant AC as AdaptiveConcurrencyController
participant B as Bedrock
participant CW as CloudWatch
Note over AC: Start: N=10
loop Every 30s evaluation window
AC->>B: 10 concurrent requests
B-->>AC: All succeed, p99=800ms (target=1200ms)
AC->>AC: N = min(10 + 5, 80) = 15
AC->>CW: emit concurrency_limit=15
AC->>B: 15 concurrent requests
B-->>AC: All succeed, p99=900ms
AC->>AC: N = min(15 + 5, 80) = 20
AC->>CW: emit concurrency_limit=20
AC->>B: 20 concurrent requests
B-->>AC: All succeed, p99=1050ms
AC->>AC: N = min(20 + 5, 80) = 25
AC->>CW: emit concurrency_limit=25
AC->>B: 25 concurrent requests
B-->>AC: 2 throttled, p99=1800ms
AC->>AC: N = max(25 * 0.7, 10) = 17
AC->>CW: emit concurrency_limit=17, throttle_count=2
AC->>B: 17 concurrent requests
B-->>AC: All succeed, p99=950ms
AC->>AC: N = min(17 + 5, 80) = 22
Note over AC: Oscillates around optimal ~20-22
end
AdaptiveConcurrencyController Implementation
import asyncio
import time
import math
import logging
import statistics
from dataclasses import dataclass, field
from typing import List, Optional
import boto3
logger = logging.getLogger("mangaassist.concurrency")
@dataclass
class LatencyWindow:
"""Sliding window of latency measurements."""
samples: List[float] = field(default_factory=list)
max_samples: int = 200
def add(self, latency_ms: float):
self.samples.append(latency_ms)
if len(self.samples) > self.max_samples:
self.samples = self.samples[-self.max_samples:]
def p99(self) -> float:
if not self.samples:
return 0.0
sorted_samples = sorted(self.samples)
idx = int(math.ceil(0.99 * len(sorted_samples))) - 1
return sorted_samples[max(0, idx)]
def p50(self) -> float:
if not self.samples:
return 0.0
return statistics.median(self.samples)
def clear(self):
self.samples.clear()
class AdaptiveConcurrencyController:
"""
Adaptive concurrency controller for Bedrock model invocations.
Continuously adjusts the concurrency limit based on:
- Throttle signals from Bedrock (ThrottlingException)
- P99 latency relative to a target threshold
- Minimum and maximum bounds per model
MangaAssist uses separate controllers for each model:
- Sonnet: range [5, 50], target p99 = 1200ms
- Haiku: range [10, 80], target p99 = 600ms
"""
def __init__(
self,
model_id: str,
initial_concurrency: int = 10,
min_concurrency: int = 5,
max_concurrency: int = 50,
target_p99_ms: float = 1200.0,
growth_step: int = 5,
backoff_factor: float = 0.7,
evaluation_window_sec: float = 30.0,
cloudwatch_namespace: str = "MangaAssist/Throughput",
):
self.model_id = model_id
self.min_concurrency = min_concurrency
self.max_concurrency = max_concurrency
self.target_p99_ms = target_p99_ms
self.growth_step = growth_step
self.backoff_factor = backoff_factor
self.evaluation_window_sec = evaluation_window_sec
self.cloudwatch_namespace = cloudwatch_namespace
self._current_limit = initial_concurrency
self._semaphore = asyncio.Semaphore(initial_concurrency)
self._active = 0
self._throttle_count = 0
self._success_count = 0
self._latency = LatencyWindow()
self._evaluation_task: Optional[asyncio.Task] = None
self._cw = boto3.client("cloudwatch")
logger.info(
"AdaptiveConcurrencyController: model=%s init=%d range=[%d,%d] target_p99=%dms",
model_id, initial_concurrency, min_concurrency, max_concurrency,
int(target_p99_ms),
)
@property
def current_limit(self) -> int:
return self._current_limit
@property
def active_count(self) -> int:
return self._active
async def start(self):
"""Start the background evaluation loop."""
self._evaluation_task = asyncio.create_task(self._evaluation_loop())
async def stop(self):
"""Stop the evaluation loop."""
if self._evaluation_task:
self._evaluation_task.cancel()
try:
await self._evaluation_task
except asyncio.CancelledError:
pass
async def acquire(self) -> bool:
"""Acquire a concurrency slot. Blocks if at limit."""
await self._semaphore.acquire()
self._active += 1
return True
async def release(self, latency_ms: float, throttled: bool = False):
"""Release a slot and record the outcome."""
self._active -= 1
self._semaphore.release()
if throttled:
self._throttle_count += 1
else:
self._success_count += 1
self._latency.add(latency_ms)
async def _evaluation_loop(self):
"""Periodically evaluate and adjust concurrency limit."""
while True:
await asyncio.sleep(self.evaluation_window_sec)
p99 = self._latency.p99()
throttles = self._throttle_count
successes = self._success_count
total = throttles + successes
old_limit = self._current_limit
new_limit = old_limit
if total == 0:
# No traffic in this window — hold steady
pass
elif throttles > 0:
# Throttles detected — back off
new_limit = max(
self.min_concurrency,
int(old_limit * self.backoff_factor),
)
logger.warning(
"[%s] Throttles=%d in window, reducing %d -> %d",
self.model_id, throttles, old_limit, new_limit,
)
elif p99 < self.target_p99_ms:
# Under latency target with no throttles — grow
new_limit = min(
self.max_concurrency,
old_limit + self.growth_step,
)
logger.info(
"[%s] p99=%.0fms < target=%dms, growing %d -> %d",
self.model_id, p99, int(self.target_p99_ms), old_limit, new_limit,
)
elif p99 >= self.target_p99_ms * 1.5:
# Latency significantly above target — slight backoff
new_limit = max(
self.min_concurrency,
old_limit - self.growth_step,
)
logger.info(
"[%s] p99=%.0fms >> target=%dms, reducing %d -> %d",
self.model_id, p99, int(self.target_p99_ms), old_limit, new_limit,
)
else:
# p99 between target and 1.5x target — hold steady
logger.debug(
"[%s] p99=%.0fms near target=%dms, holding at %d",
self.model_id, p99, int(self.target_p99_ms), old_limit,
)
if new_limit != old_limit:
await self._resize(new_limit)
# Emit metrics
await self._emit_metrics(p99, throttles, successes)
# Reset window counters
self._throttle_count = 0
self._success_count = 0
self._latency.clear()
async def _resize(self, new_limit: int):
"""Resize the semaphore to the new concurrency limit."""
delta = new_limit - self._current_limit
if delta > 0:
for _ in range(delta):
self._semaphore.release()
elif delta < 0:
for _ in range(abs(delta)):
await self._semaphore.acquire()
self._current_limit = new_limit
async def _emit_metrics(self, p99: float, throttles: int, successes: int):
"""Emit concurrency metrics to CloudWatch."""
try:
self._cw.put_metric_data(
Namespace=self.cloudwatch_namespace,
MetricData=[
{
"MetricName": "ConcurrencyLimit",
"Value": self._current_limit,
"Unit": "Count",
"Dimensions": [
{"Name": "ModelId", "Value": self.model_id},
],
},
{
"MetricName": "ActiveConcurrency",
"Value": self._active,
"Unit": "Count",
"Dimensions": [
{"Name": "ModelId", "Value": self.model_id},
],
},
{
"MetricName": "ThrottleCount",
"Value": throttles,
"Unit": "Count",
"Dimensions": [
{"Name": "ModelId", "Value": self.model_id},
],
},
{
"MetricName": "P99Latency",
"Value": p99,
"Unit": "Milliseconds",
"Dimensions": [
{"Name": "ModelId", "Value": self.model_id},
],
},
],
)
except Exception:
logger.exception("Failed to emit CloudWatch metrics")
def get_status(self) -> dict:
return {
"model_id": self.model_id,
"current_limit": self._current_limit,
"active": self._active,
"p99_ms": round(self._latency.p99(), 1),
"p50_ms": round(self._latency.p50(), 1),
"window_throttles": self._throttle_count,
"window_successes": self._success_count,
}
Priority-Based Request Queue
During MangaAssist peak events (manga releases, flash sales), not all requests are equal. A customer with a payment problem must be served before a browsing query. The priority queue enforces this ordering.
Priority Levels
| Priority | Weight | Category | Intent Examples | Queue Behavior |
|---|---|---|---|---|
| P0 — Critical | 100 | Order/Payment | order_status, payment_failed, cancel_order |
Preempts P1-P3, dedicated Haiku slot |
| P1 — High | 50 | Purchase Intent | recommend_manga, check_availability, add_to_cart |
Standard real-time processing |
| P2 — Medium | 20 | Browsing | describe_series, genre_browse, author_info |
Degradable to cached/template |
| P3 — Low | 1 | Background | embedding_update, catalog_enrich, report_gen |
SQS batch queue, no SLA |
VIP Customer Priority Boost
MangaAssist identifies VIP customers (top 5% by purchase history) and boosts their priority by one level: - VIP browsing (P2) becomes High (P1) - VIP purchase intent (P1) becomes Critical (P0)
PriorityRequestQueue Implementation
import asyncio
import heapq
import time
import logging
from dataclasses import dataclass, field
from typing import Any, Optional
from enum import IntEnum
logger = logging.getLogger("mangaassist.priority_queue")
class Priority(IntEnum):
"""Request priority levels — lower value = higher priority."""
CRITICAL = 0 # P0: Order/Payment issues
HIGH = 1 # P1: Purchase intent
MEDIUM = 2 # P2: Browsing assistance
LOW = 3 # P3: Background/batch
INTENT_PRIORITY_MAP = {
# P0 — Critical
"order_status": Priority.CRITICAL,
"payment_failed": Priority.CRITICAL,
"cancel_order": Priority.CRITICAL,
"refund_request": Priority.CRITICAL,
"shipping_issue": Priority.CRITICAL,
# P1 — High
"recommend_manga": Priority.HIGH,
"check_availability": Priority.HIGH,
"add_to_cart": Priority.HIGH,
"price_inquiry": Priority.HIGH,
# P2 — Medium
"describe_series": Priority.MEDIUM,
"genre_browse": Priority.MEDIUM,
"author_info": Priority.MEDIUM,
"general_question": Priority.MEDIUM,
# P3 — Low (batch)
"embedding_update": Priority.LOW,
"catalog_enrich": Priority.LOW,
"report_generation": Priority.LOW,
}
@dataclass(order=True)
class PrioritizedRequest:
"""A request with priority ordering for the heap queue."""
priority: int
timestamp: float = field(compare=True)
sequence: int = field(compare=True)
request: Any = field(compare=False)
future: asyncio.Future = field(compare=False, repr=False)
class PriorityRequestQueue:
"""
Priority queue for MangaAssist Bedrock requests.
Ensures high-priority requests (order issues, payments) are processed
before low-priority requests (browsing, batch) when Bedrock capacity
is constrained.
Features:
- Heap-based priority ordering (O(log n) enqueue/dequeue)
- VIP customer priority boost
- Starvation prevention via aging
- Queue depth metrics per priority level
- Configurable max queue size with overflow rejection
"""
def __init__(
self,
max_queue_size: int = 1000,
aging_interval_sec: float = 30.0,
aging_boost: int = 1,
vip_boost: int = 1,
):
self.max_queue_size = max_queue_size
self.aging_interval_sec = aging_interval_sec
self.aging_boost = aging_boost
self.vip_boost = vip_boost
self._heap: list[PrioritizedRequest] = []
self._sequence = 0
self._lock = asyncio.Lock()
self._not_empty = asyncio.Event()
self._aging_task: Optional[asyncio.Task] = None
# Metrics
self._enqueued_by_priority = {p: 0 for p in Priority}
self._dequeued_by_priority = {p: 0 for p in Priority}
self._rejected_overflow = 0
async def start(self):
"""Start the aging background task."""
self._aging_task = asyncio.create_task(self._aging_loop())
async def stop(self):
"""Stop the aging task."""
if self._aging_task:
self._aging_task.cancel()
def classify_priority(self, intent: str, is_vip: bool = False) -> Priority:
"""Determine request priority from intent and customer tier."""
base_priority = INTENT_PRIORITY_MAP.get(intent, Priority.MEDIUM)
if is_vip and base_priority > Priority.CRITICAL:
boosted = Priority(max(0, base_priority - self.vip_boost))
logger.debug("VIP boost: %s -> %s for intent=%s", base_priority.name, boosted.name, intent)
return boosted
return base_priority
async def enqueue(self, request: dict, intent: str, is_vip: bool = False) -> asyncio.Future:
"""
Add a request to the priority queue.
Returns a Future that resolves when the request is processed.
Raises OverflowError if queue is full.
"""
async with self._lock:
if len(self._heap) >= self.max_queue_size:
self._rejected_overflow += 1
raise OverflowError(
f"Priority queue full ({self.max_queue_size}). "
f"Request rejected for intent={intent}"
)
priority = self.classify_priority(intent, is_vip)
self._sequence += 1
future = asyncio.get_event_loop().create_future()
entry = PrioritizedRequest(
priority=priority.value,
timestamp=time.time(),
sequence=self._sequence,
request=request,
future=future,
)
heapq.heappush(self._heap, entry)
self._not_empty.set()
self._enqueued_by_priority[priority] += 1
logger.debug(
"Enqueued: priority=%s, queue_depth=%d, intent=%s, vip=%s",
priority.name, len(self._heap), intent, is_vip,
)
return future
async def dequeue(self) -> PrioritizedRequest:
"""
Remove and return the highest-priority request.
Blocks until a request is available.
"""
while True:
await self._not_empty.wait()
async with self._lock:
if self._heap:
entry = heapq.heappop(self._heap)
if not self._heap:
self._not_empty.clear()
priority = Priority(entry.priority)
self._dequeued_by_priority[priority] += 1
return entry
async def _aging_loop(self):
"""
Periodically boost priority of long-waiting requests to prevent starvation.
Without aging, a sustained flood of P0 requests could starve P2 requests
indefinitely. Aging promotes old P2 requests to P1 after aging_interval.
"""
while True:
await asyncio.sleep(self.aging_interval_sec)
async with self._lock:
aged_count = 0
new_heap = []
now = time.time()
for entry in self._heap:
wait_time = now - entry.timestamp
age_intervals = int(wait_time / self.aging_interval_sec)
if age_intervals > 0 and entry.priority > Priority.CRITICAL:
new_priority = max(
Priority.CRITICAL,
entry.priority - (age_intervals * self.aging_boost),
)
if new_priority != entry.priority:
entry.priority = new_priority
aged_count += 1
new_heap.append(entry)
if aged_count > 0:
heapq.heapify(new_heap)
self._heap = new_heap
logger.info("Aged %d requests in priority queue", aged_count)
def get_depth_by_priority(self) -> dict:
"""Return current queue depth broken down by priority level."""
counts = {p.name: 0 for p in Priority}
for entry in self._heap:
priority_name = Priority(entry.priority).name
counts[priority_name] += 1
return counts
def get_metrics(self) -> dict:
"""Return queue metrics for CloudWatch."""
return {
"total_depth": len(self._heap),
"depth_by_priority": self.get_depth_by_priority(),
"enqueued_by_priority": {p.name: v for p, v in self._enqueued_by_priority.items()},
"dequeued_by_priority": {p.name: v for p, v in self._dequeued_by_priority.items()},
"rejected_overflow": self._rejected_overflow,
}
invoke_model vs invoke_model_with_response_stream — Throughput Characteristics
Bedrock offers two invocation modes. The choice between them has direct throughput implications.
Comparison
| Characteristic | invoke_model (Synchronous) |
invoke_model_with_response_stream (Streaming) |
|---|---|---|
| Connection held | Until full response generated | Until full response generated |
| Time to first byte | After full generation | After first token (~200ms) |
| Bedrock slot occupancy | Same duration | Same duration |
| Throughput impact | Neutral | Neutral (same slot duration) |
| Client perceived latency | High (waits for complete response) | Low (sees tokens incrementally) |
| Network efficiency | Single payload | Chunked transfer |
| Error handling | Simple (one response) | Complex (partial response possible) |
| Best for | Batch processing, short responses | User-facing chat, long responses |
Key Insight: Streaming Does Not Increase Throughput
A common misconception is that streaming responses frees Bedrock capacity sooner. It does not. The Bedrock model slot is occupied for the same duration regardless of mode. Streaming improves perceived latency (time to first token) but not throughput (requests per minute).
MangaAssist strategy: - User-facing chat: streaming (better UX, perceived responsiveness via WebSocket) - Batch processing: synchronous (simpler error handling, no partial response risk) - Order status lookups (Haiku, short responses): synchronous (response is < 100 tokens, streaming overhead not worth it)
Error Handling — Exponential Backoff and Dead-Letter Strategy
Backoff Strategy for Bedrock Throttling
flowchart TD
REQ[Bedrock Request] --> INVOKE[invoke_model]
INVOKE -->|200 OK| SUCCESS[Process Response]
INVOKE -->|ThrottlingException| R1{Attempt 1}
R1 -->|Wait 1s + jitter| RETRY1[Retry]
RETRY1 -->|200 OK| SUCCESS
RETRY1 -->|ThrottlingException| R2{Attempt 2}
R2 -->|Wait 2s + jitter| RETRY2[Retry]
RETRY2 -->|200 OK| SUCCESS
RETRY2 -->|ThrottlingException| R3{Attempt 3}
R3 -->|Wait 4s + jitter| RETRY3[Retry]
RETRY3 -->|200 OK| SUCCESS
RETRY3 -->|ThrottlingException| DEGRADE{Request Type?}
DEGRADE -->|Real-time| FALLBACK[Graceful Degradation<br/>Cache / Template / Model Downgrade]
DEGRADE -->|Batch| DLQ[Dead-Letter Queue]
style SUCCESS fill:#2ecc71,color:#000
style FALLBACK fill:#f39c12,color:#000
style DLQ fill:#e74c3c,color:#fff
Backoff Formula
import random
def calculate_backoff(attempt: int, base_delay: float = 1.0, max_delay: float = 30.0) -> float:
"""
Exponential backoff with full jitter.
attempt 1: 0 to 1s
attempt 2: 0 to 2s
attempt 3: 0 to 4s
attempt 4: 0 to 8s
...
Capped at max_delay.
Full jitter (vs equal jitter) reduces thundering herd when
multiple MangaAssist ECS tasks retry simultaneously.
"""
exponential_delay = min(max_delay, base_delay * (2 ** (attempt - 1)))
return random.uniform(0, exponential_delay)
Dead-Letter Queue Handling
Messages reach the DLQ for two reasons:
- Retryable errors exhausted — Throttling persisted beyond max retries. These should be reprocessed after the peak subsides.
- Non-retryable errors — Invalid prompt, model validation error, malformed request. These need investigation and will never succeed as-is.
import json
import logging
import boto3
logger = logging.getLogger("mangaassist.dlq")
class DLQProcessor:
"""
Processes messages from the dead-letter queue.
Runs as a scheduled Lambda (every 15 minutes) to:
1. Classify DLQ messages as retryable or non-retryable
2. Re-enqueue retryable messages to the primary queue
3. Log and alert on non-retryable messages
"""
RETRYABLE_ERRORS = {
"ThrottlingException",
"ServiceUnavailableException",
"InternalServerException",
}
NON_RETRYABLE_ERRORS = {
"ValidationException",
"AccessDeniedException",
"ModelNotReadyException",
"ResourceNotFoundException",
}
def __init__(self, dlq_url: str, primary_queue_url: str, sns_topic_arn: str):
self.dlq_url = dlq_url
self.primary_queue_url = primary_queue_url
self.sns_topic_arn = sns_topic_arn
self.sqs = boto3.client("sqs")
self.sns = boto3.client("sns")
def process_batch(self, max_messages: int = 10) -> dict:
"""Process a batch of DLQ messages."""
response = self.sqs.receive_message(
QueueUrl=self.dlq_url,
MaxNumberOfMessages=max_messages,
WaitTimeSeconds=5,
)
messages = response.get("Messages", [])
stats = {"retried": 0, "discarded": 0, "alerted": 0}
for msg in messages:
body = json.loads(msg["Body"])
error_type = body.get("error_type", "Unknown")
if error_type in self.RETRYABLE_ERRORS:
# Re-enqueue to primary queue
self.sqs.send_message(
QueueUrl=self.primary_queue_url,
MessageBody=json.dumps(body.get("original_request", body)),
)
stats["retried"] += 1
logger.info("Re-enqueued retryable DLQ message: %s", msg["MessageId"])
elif error_type in self.NON_RETRYABLE_ERRORS:
# Alert and discard
self.sns.publish(
TopicArn=self.sns_topic_arn,
Subject=f"MangaAssist DLQ: Non-Retryable Error ({error_type})",
Message=json.dumps(body, indent=2),
)
stats["alerted"] += 1
stats["discarded"] += 1
logger.error("Non-retryable DLQ message discarded: %s", msg["MessageId"])
else:
# Unknown error — alert for investigation
self.sns.publish(
TopicArn=self.sns_topic_arn,
Subject=f"MangaAssist DLQ: Unknown Error ({error_type})",
Message=json.dumps(body, indent=2),
)
stats["alerted"] += 1
logger.warning("Unknown DLQ error type: %s", error_type)
# Delete from DLQ after processing
self.sqs.delete_message(
QueueUrl=self.dlq_url,
ReceiptHandle=msg["ReceiptHandle"],
)
return stats
Batch Processing Pipeline — End-to-End Sequence
sequenceDiagram
participant CS as Catalog Service
participant SQS as SQS FIFO Queue
participant W as Batch Worker (ECS)
participant AC as AdaptiveConcurrency<br/>Controller
participant B as Bedrock Haiku
participant DB as DynamoDB
participant DLQ as Dead-Letter Queue
participant CW as CloudWatch
CS->>SQS: SendMessage (new manga title batch)
Note over SQS: 50 titles queued<br/>MessageGroupId = "catalog"
loop For each message
W->>SQS: ReceiveMessage (long poll 20s)
SQS-->>W: Message (title metadata)
W->>AC: acquire()
AC-->>W: slot granted (17/20 active)
W->>B: invoke_model (catalog enrichment prompt)
alt Success
B-->>W: 200 OK (description + tags)
W->>AC: release(latency=450ms, throttled=false)
W->>DB: PutItem (enriched catalog entry)
W->>SQS: DeleteMessage
W->>CW: PutMetricData (batch_job_success)
else ThrottlingException
B-->>W: 429 ThrottlingException
W->>AC: release(throttled=true)
Note over AC: Reduces limit 20 -> 14
W->>W: Exponential backoff (1s + jitter)
W->>B: Retry invoke_model
else Max Retries Exceeded
W->>DLQ: SendMessage (failed job + error)
W->>SQS: DeleteMessage
W->>CW: PutMetricData (batch_job_dlq)
DLQ-->>CW: DLQ Alarm (depth > 0)
end
end
Note over AC: Evaluation window (30s)
AC->>AC: p99=420ms < target=600ms<br/>throttles=0 -> grow 14 -> 19
AC->>CW: PutMetricData (concurrency_limit=19)
Key Takeaways
-
Route 25% of traffic to batch — Catalog enrichment, embeddings, reports, and pre-computation do not need real-time response. Moving them to SQS frees Bedrock capacity for user-facing chat.
-
Start concurrency at N=10 and let it adapt — Starting conservatively and growing based on real signals is safer than guessing the right limit. The adaptive algorithm converges within 2-4 minutes.
-
Priority queuing prevents revenue loss — During peak events, a customer with a payment problem must not wait behind 500 browsing queries. Priority ordering with VIP boost ensures the highest-value requests are served first.
-
Streaming does not increase throughput — It improves perceived latency for users but does not free Bedrock model slots any faster. Use it for UX, not for throughput.
-
Dead-letter queues need active processing — A DLQ that nobody reads is a silent failure. Scheduled processing that classifies, re-enqueues retryable errors, and alerts on non-retryable errors turns the DLQ into an operational tool.
-
Aging prevents starvation — Without aging, a sustained flood of P0 requests starves all P2 requests indefinitely. Periodic priority promotion ensures every request eventually gets processed.