FM Throughput Optimization Architecture
AWS AIP-C01 Task 4.2 — Skill 4.2.3: Optimize FM throughput to sustain high request volumes Context: MangaAssist e-commerce chatbot — Bedrock Claude 3 (Sonnet/Haiku), OpenSearch Serverless, DynamoDB, ECS Fargate, API Gateway WebSocket. 1M messages/day, peak 20K concurrent users. Target: sustain 1,000+ requests/minute to Bedrock.
Skill Mapping
| Certification | Domain | Task | Skill |
|---|---|---|---|
| AWS AIP-C01 | Domain 4 — Operational Efficiency & Optimization | Task 4.2 — Optimize FM performance | Skill 4.2.3 — Optimize FM throughput to sustain high request volumes |
Skill scope: Design and implement throughput optimization systems that maximize the number of FM invocations per unit time while maintaining acceptable latency — covering token efficiency, batch inference, concurrency management, queue-based request handling, and backpressure strategies.
Mind Map — FM Throughput Optimization Dimensions
mindmap
root((FM Throughput<br/>Optimization))
Token Processing
Input Token Minimization
Prompt Compression
Context Window Pruning
Template Parameterization
Output Token Control
max_tokens Tuning
Stop Sequence Optimization
Structured Output Schemas
Throughput Formula
Fewer Tokens = Faster Processing
Faster Processing = More Requests/Second
Smaller Prompts = Higher Concurrency
Batch Inference
SQS-Based Batch Queue
Embedding Updates
Report Generation
Catalog Enrichment
Micro-Batching
Similar Intent Grouping
Window-Based Collection
Shared Context Prefix
Scheduling
Off-Peak Batch Windows
Priority-Based Dispatch
Deadline-Aware Processing
Concurrency Management
Semaphore-Based Limiter
Token Bucket Algorithm
Fixed Window Counter
Sliding Window Log
Adaptive Concurrency
Throttle Signal Detection
P99 Latency Feedback
Capacity Probing
Model-Specific Limits
Sonnet Concurrency Pool
Haiku Concurrency Pool
Cross-Model Balancing
Queue Management
Priority Queues
Order Issues — P0
Recommendations — P1
Browsing — P2
Dead-Letter Queues
Failed Invocations
Retry Exhaustion
Poison Message Isolation
Rate Shaping
Token Bucket Admission
Leaky Bucket Smoothing
Burst Allowance
Backpressure Handling
Circuit Breaker
Throttle Detection
Open/Half-Open/Closed
Recovery Probing
Graceful Degradation
Template Responses
Cached Answer Fallback
Model Downgrade Sonnet to Haiku
Client Signaling
WebSocket Backpressure
Retry-After Headers
Queue Position Updates
Throughput Metrics
tokens-processed-per-second
invocations-per-minute
throttle-rate
queue-depth
batch-utilization-ratio
effective-concurrency
Why Throughput Optimization Matters for MangaAssist
MangaAssist processes 1 million messages per day with peaks of 20,000 concurrent users during manga release events and flash sales. Each user message triggers at least one Bedrock invocation, and complex flows (order lookup + recommendation) trigger two or three. Without throughput optimization, Bedrock throttling cascades into WebSocket timeouts, abandoned carts, and revenue loss.
The throughput equation for FM systems:
Effective Throughput = min(
Bedrock Account Limit,
Concurrency × (1 / Avg Latency per Request),
Queue Drain Rate
)
Every optimization in this document targets one of these three bottleneck dimensions.
Token Processing Optimization — Fewer Tokens, More Throughput
The relationship between token count and throughput is direct: fewer input tokens mean faster time-to-first-token and shorter total inference time, which means the Bedrock slot is freed sooner for the next request.
Token Reduction Strategies for MangaAssist
| Strategy | Before (tokens) | After (tokens) | Reduction | Throughput Impact |
|---|---|---|---|---|
| Prompt template parameterization | 850 | 320 | 62% | +165% requests/min |
| Context window pruning (top-3 instead of top-5 RAG chunks) | 1,200 | 680 | 43% | +75% requests/min |
| Structured output schema (JSON vs prose) | N/A (output) | -40% output tokens | 40% | +55% requests/min |
| System prompt compression | 480 | 210 | 56% | +130% requests/min |
| Conversation history summarization (beyond 5 turns) | 2,000 | 400 | 80% | +400% requests/min |
Token-Efficient Prompt Design
# BEFORE: Verbose prompt — ~850 input tokens
VERBOSE_PROMPT = """
You are a helpful customer service assistant for a Japanese manga e-commerce store
called MangaAssist. You help customers with their orders, recommend manga titles,
and answer questions about shipping, returns, and product availability. Please be
polite, helpful, and knowledgeable about manga. When a customer asks about an order,
look up the order details and provide a comprehensive summary including the order
status, shipping information, estimated delivery date, and any relevant tracking
information. Format your response in a friendly conversational tone.
Customer conversation history:
{full_conversation_history}
Retrieved context from knowledge base:
{all_retrieved_chunks}
Customer's current question: {question}
"""
# AFTER: Compressed prompt — ~320 input tokens
OPTIMIZED_PROMPT = """<role>MangaAssist support agent</role>
<task>{intent_classification}</task>
<context>{top_3_chunks_only}</context>
<history>{last_3_turns_summary}</history>
<query>{question}</query>
<format>JSON: {{"response": str, "action": str|null}}</format>"""
The optimized prompt produces equivalent quality responses while consuming 62% fewer input tokens — directly translating to higher throughput because each Bedrock invocation completes faster.
Batch Inference Strategies
Not every MangaAssist workload requires real-time response. Batch inference shifts non-interactive work to queues, freeing real-time Bedrock capacity for user-facing requests.
Real-Time vs Batch Classification
| Workload | Type | SLA | Strategy |
|---|---|---|---|
| User chat response | Real-time | < 3 seconds | Direct invoke_model |
| Order status lookup | Real-time | < 2 seconds | Direct invoke_model (Haiku) |
| Manga recommendation generation | Near-real-time | < 10 seconds | Micro-batch |
| Product description enrichment | Batch | < 1 hour | SQS batch queue |
| Weekly recommendation emails | Batch | < 6 hours | SQS batch queue (off-peak) |
| Embedding updates for new titles | Batch | < 2 hours | SQS batch queue |
| Customer sentiment reports | Batch | < 24 hours | SQS batch queue (lowest priority) |
SQS-Based Batch Queue Architecture
flowchart LR
subgraph Producers
A[Catalog Service] -->|New titles| Q1
B[Analytics Service] -->|Report requests| Q1
C[Recommendation Engine] -->|Embedding jobs| Q1
end
subgraph "SQS Batch Queues"
Q1[batch-inference-queue<br/>FIFO] --> DLQ1[batch-dlq<br/>Dead Letter Queue]
end
subgraph "Batch Processor — ECS Fargate"
W1[Worker 1] -->|poll| Q1
W2[Worker 2] -->|poll| Q1
W3[Worker 3] -->|poll| Q1
end
subgraph "Bedrock — Batch Pool"
W1 -->|invoke_model| BH[Claude 3 Haiku<br/>Batch Concurrency: 20]
W2 -->|invoke_model| BH
W3 -->|invoke_model| BH
end
subgraph "Results"
BH --> D1[(DynamoDB<br/>Enriched Catalog)]
BH --> D2[(OpenSearch<br/>Updated Embeddings)]
BH --> S3[S3<br/>Generated Reports]
end
style Q1 fill:#ff9900,color:#000
style DLQ1 fill:#d13212,color:#fff
style BH fill:#1a73e8,color:#fff
Micro-Batching for Similar Intent Queries
When multiple users ask similar questions within a short window (e.g., "when does One Piece volume 108 release?"), micro-batching groups them and makes a single Bedrock invocation, distributing the response to all waiters.
import asyncio
import time
import hashlib
from dataclasses import dataclass, field
from typing import Dict, List
@dataclass
class PendingRequest:
"""A request waiting for batch processing."""
query: str
intent: str
future: asyncio.Future
timestamp: float = field(default_factory=time.time)
class MicroBatchProcessor:
"""
Groups similar-intent queries within a time window and processes them
as a single Bedrock invocation. Effective for MangaAssist during events
when many users ask the same questions.
"""
def __init__(
self,
bedrock_client,
batch_window_ms: int = 100,
max_batch_size: int = 10,
similarity_threshold: float = 0.85,
):
self.bedrock = bedrock_client
self.batch_window_ms = batch_window_ms
self.max_batch_size = max_batch_size
self.similarity_threshold = similarity_threshold
self.pending_batches: Dict[str, List[PendingRequest]] = {}
self._flush_task: asyncio.Task | None = None
def _compute_batch_key(self, intent: str, query: str) -> str:
"""Create a grouping key from intent + normalized query prefix."""
normalized = query.lower().strip()[:50]
return hashlib.md5(f"{intent}:{normalized}".encode()).hexdigest()[:12]
async def submit(self, query: str, intent: str, context: str) -> str:
"""Submit a request for micro-batch processing."""
loop = asyncio.get_event_loop()
future = loop.create_future()
request = PendingRequest(query=query, intent=intent, future=future)
batch_key = self._compute_batch_key(intent, query)
if batch_key not in self.pending_batches:
self.pending_batches[batch_key] = []
# Schedule flush after batch window
asyncio.create_task(self._schedule_flush(batch_key))
self.pending_batches[batch_key].append(request)
# Flush immediately if batch is full
if len(self.pending_batches[batch_key]) >= self.max_batch_size:
await self._flush_batch(batch_key)
return await future
async def _schedule_flush(self, batch_key: str):
"""Wait for batch window then flush."""
await asyncio.sleep(self.batch_window_ms / 1000)
if batch_key in self.pending_batches:
await self._flush_batch(batch_key)
async def _flush_batch(self, batch_key: str):
"""Process all pending requests in a batch with a single invocation."""
requests = self.pending_batches.pop(batch_key, [])
if not requests:
return
# Use the first query as representative
representative = requests[0]
try:
response = await self.bedrock.invoke_model(
modelId="anthropic.claude-3-haiku-20240307-v1:0",
body={
"anthropic_version": "bedrock-2023-05-31",
"messages": [{"role": "user", "content": representative.query}],
"max_tokens": 300,
},
)
result = response["body"]
# Distribute same response to all waiters
for req in requests:
if not req.future.done():
req.future.set_result(result)
except Exception as e:
for req in requests:
if not req.future.done():
req.future.set_exception(e)
Concurrent Model Invocation Management
Bedrock has per-account, per-model concurrency limits. Exceeding them produces ThrottlingException. MangaAssist uses a semaphore-based concurrency limiter with adaptive capacity that responds to real-time throttle signals.
Architecture — Concurrency Management System
flowchart TB
subgraph "API Gateway WebSocket"
WS[Incoming Messages<br/>~700/min average<br/>~1200/min peak]
end
subgraph "ECS Fargate — Orchestrator"
WS --> RC[Request Classifier<br/>Intent + Priority]
RC -->|Real-time P0| CM
RC -->|Real-time P1| CM
RC -->|Batch P2| SQS[SQS Batch Queue]
CM[ConcurrencyManager<br/>Adaptive Semaphore]
CM -->|acquire| POOL
subgraph POOL["Bedrock Concurrency Pool"]
S1[Sonnet Pool<br/>max=30, current=24]
H1[Haiku Pool<br/>max=50, current=38]
end
POOL -->|throttle signal| AC[Adaptive Controller]
POOL -->|success signal| AC
AC -->|adjust limits| CM
end
subgraph "Bedrock"
S1 --> BS[Claude 3 Sonnet]
H1 --> BH[Claude 3 Haiku]
end
subgraph "Fallback"
CM -->|circuit open| FB[Template Response<br/>+ Cache Lookup]
end
style CM fill:#ff9900,color:#000
style AC fill:#1a73e8,color:#fff
style FB fill:#d13212,color:#fff
ConcurrencyManager with Adaptive Semaphore
import asyncio
import time
import logging
from dataclasses import dataclass, field
from enum import Enum
from typing import Optional
logger = logging.getLogger("mangaassist.throughput")
class CircuitState(Enum):
CLOSED = "closed" # Normal operation
OPEN = "open" # Rejecting requests (Bedrock overloaded)
HALF_OPEN = "half_open" # Probing recovery
@dataclass
class ThroughputMetrics:
"""Rolling metrics for throughput monitoring."""
invocations_total: int = 0
throttles_total: int = 0
successes_total: int = 0
tokens_processed: int = 0
window_start: float = field(default_factory=time.time)
window_duration_sec: float = 60.0
@property
def invocations_per_minute(self) -> float:
elapsed = max(time.time() - self.window_start, 1.0)
return (self.invocations_total / elapsed) * 60
@property
def throttle_rate(self) -> float:
if self.invocations_total == 0:
return 0.0
return self.throttles_total / self.invocations_total
@property
def tokens_per_second(self) -> float:
elapsed = max(time.time() - self.window_start, 1.0)
return self.tokens_processed / elapsed
def reset_if_window_expired(self):
if time.time() - self.window_start >= self.window_duration_sec:
self.invocations_total = 0
self.throttles_total = 0
self.successes_total = 0
self.tokens_processed = 0
self.window_start = time.time()
class ConcurrencyManager:
"""
Adaptive semaphore-based concurrency limiter for Bedrock invocations.
Adjusts concurrency limits dynamically based on:
- ThrottlingException rate from Bedrock
- P99 latency feedback
- Circuit breaker state
MangaAssist configuration:
- Sonnet pool: starts at 30 concurrent, range [10, 50]
- Haiku pool: starts at 50 concurrent, range [20, 80]
"""
def __init__(
self,
model_id: str,
initial_concurrency: int = 30,
min_concurrency: int = 10,
max_concurrency: int = 50,
throttle_backoff_factor: float = 0.7,
success_growth_factor: float = 1.1,
circuit_open_duration_sec: float = 30.0,
throttle_threshold: float = 0.10,
):
self.model_id = model_id
self.current_limit = initial_concurrency
self.min_concurrency = min_concurrency
self.max_concurrency = max_concurrency
self.throttle_backoff_factor = throttle_backoff_factor
self.success_growth_factor = success_growth_factor
self.circuit_open_duration_sec = circuit_open_duration_sec
self.throttle_threshold = throttle_threshold
self._semaphore = asyncio.Semaphore(initial_concurrency)
self._active_count = 0
self._circuit_state = CircuitState.CLOSED
self._circuit_opened_at: Optional[float] = None
self.metrics = ThroughputMetrics()
logger.info(
"ConcurrencyManager initialized: model=%s, concurrency=%d",
model_id, initial_concurrency,
)
@property
def circuit_state(self) -> CircuitState:
return self._circuit_state
@property
def active_count(self) -> int:
return self._active_count
async def acquire(self) -> bool:
"""
Acquire a concurrency slot. Returns False if circuit is open.
"""
self.metrics.reset_if_window_expired()
# Circuit breaker check
if self._circuit_state == CircuitState.OPEN:
if time.time() - self._circuit_opened_at >= self.circuit_open_duration_sec:
self._circuit_state = CircuitState.HALF_OPEN
logger.info("Circuit half-open: probing Bedrock for %s", self.model_id)
else:
return False
if self._circuit_state == CircuitState.HALF_OPEN:
# Only allow 1 probe request
if self._active_count >= 1:
return False
await self._semaphore.acquire()
self._active_count += 1
self.metrics.invocations_total += 1
return True
async def release(self, success: bool, tokens_used: int = 0, throttled: bool = False):
"""
Release a concurrency slot and feed back result for adaptation.
"""
self._active_count -= 1
self._semaphore.release()
if throttled:
self.metrics.throttles_total += 1
await self._handle_throttle()
elif success:
self.metrics.successes_total += 1
self.metrics.tokens_processed += tokens_used
await self._handle_success()
async def _handle_throttle(self):
"""Reduce concurrency on throttle signal."""
if self._circuit_state == CircuitState.HALF_OPEN:
# Probe failed — reopen circuit
self._circuit_state = CircuitState.OPEN
self._circuit_opened_at = time.time()
logger.warning("Circuit reopened for %s after probe throttle", self.model_id)
return
new_limit = max(
self.min_concurrency,
int(self.current_limit * self.throttle_backoff_factor),
)
if new_limit != self.current_limit:
logger.warning(
"Throttle detected: reducing concurrency %d -> %d for %s",
self.current_limit, new_limit, self.model_id,
)
await self._resize_semaphore(new_limit)
# Open circuit if throttle rate exceeds threshold
if self.metrics.throttle_rate >= self.throttle_threshold:
self._circuit_state = CircuitState.OPEN
self._circuit_opened_at = time.time()
logger.error(
"Circuit OPEN for %s: throttle_rate=%.2f >= %.2f",
self.model_id, self.metrics.throttle_rate, self.throttle_threshold,
)
async def _handle_success(self):
"""Gradually increase concurrency on sustained success."""
if self._circuit_state == CircuitState.HALF_OPEN:
self._circuit_state = CircuitState.CLOSED
logger.info("Circuit closed for %s after successful probe", self.model_id)
return
# Only grow if throttle rate is near zero and we have enough samples
if (
self.metrics.throttle_rate < 0.01
and self.metrics.invocations_total >= 20
and self.current_limit < self.max_concurrency
):
new_limit = min(
self.max_concurrency,
int(self.current_limit * self.success_growth_factor),
)
if new_limit != self.current_limit:
logger.info(
"Growing concurrency %d -> %d for %s",
self.current_limit, new_limit, self.model_id,
)
await self._resize_semaphore(new_limit)
async def _resize_semaphore(self, new_limit: int):
"""Resize the semaphore to a new limit."""
delta = new_limit - self.current_limit
if delta > 0:
for _ in range(delta):
self._semaphore.release()
elif delta < 0:
for _ in range(abs(delta)):
await self._semaphore.acquire()
self.current_limit = new_limit
def get_status(self) -> dict:
"""Return current status for CloudWatch metrics emission."""
return {
"model_id": self.model_id,
"current_limit": self.current_limit,
"active_count": self._active_count,
"circuit_state": self._circuit_state.value,
"invocations_per_minute": round(self.metrics.invocations_per_minute, 1),
"throttle_rate": round(self.metrics.throttle_rate, 4),
"tokens_per_second": round(self.metrics.tokens_per_second, 1),
}
Queue-Based Request Management
MangaAssist uses a priority queue system to ensure high-value requests (order issues, payment problems) are processed before low-priority requests (browsing recommendations) when Bedrock capacity is constrained.
Priority Queue Design
| Priority | Category | Examples | Target SLA | Queue |
|---|---|---|---|---|
| P0 — Critical | Order/Payment issues | "Where is my order?", "Payment failed" | < 2s | Real-time, dedicated Haiku pool |
| P1 — High | Purchase intent | "Recommend manga like...", "Is this in stock?" | < 3s | Real-time, shared Sonnet pool |
| P2 — Medium | Browsing assistance | "Tell me about this series", "What genres?" | < 5s | Real-time with degradation allowed |
| P3 — Low | Batch workloads | Embedding updates, report generation | < 1 hour | SQS batch queue |
Dead-Letter Queue Strategy
flowchart LR
subgraph "Primary Processing"
Q[Priority Queue] --> P[Processor]
P -->|invoke_model| B[Bedrock]
end
subgraph "Retry Logic"
B -->|ThrottlingException| R{Retry<br/>count < 3?}
R -->|Yes| EXP[Exponential Backoff<br/>1s, 2s, 4s + jitter]
EXP --> P
R -->|No| DLQ
end
subgraph "Dead-Letter Handling"
DLQ[Dead-Letter Queue<br/>MaxReceiveCount=3] --> ALARM[CloudWatch Alarm<br/>DLQ depth > 0]
ALARM --> SNS[SNS Notification]
DLQ --> PROC[DLQ Processor<br/>Scheduled Lambda]
PROC -->|Retryable?| Q
PROC -->|Non-retryable| LOG[Error Log +<br/>Customer Notification]
end
style DLQ fill:#d13212,color:#fff
style ALARM fill:#ff9900,color:#000
Backpressure Handling
When Bedrock capacity is saturated, MangaAssist must avoid cascading failures. The backpressure system uses three layers of defense.
Layer 1 — Circuit Breaker
The ConcurrencyManager above includes circuit breaker logic. When the throttle rate exceeds 10%, the circuit opens for 30 seconds, rejecting new requests immediately instead of queuing them behind failing invocations.
Layer 2 — Graceful Degradation
When the circuit is open, MangaAssist does not return errors to users. Instead, it falls back through a degradation chain:
1. Try Sonnet → THROTTLED
2. Try Haiku (simpler model, higher limits) → THROTTLED
3. Try cached response (DynamoDB, exact or fuzzy match) → MISS
4. Return template response based on detected intent
Layer 3 — Client Signaling via WebSocket
# WebSocket backpressure message to client
{
"type": "system",
"status": "high_load",
"message": "We're experiencing high demand. Your response may take a moment.",
"estimated_wait_sec": 8,
"queue_position": 42
}
Token Throughput Metrics Dashboard
Key Metrics for CloudWatch
| Metric | Unit | Target | Alarm Threshold | Description |
|---|---|---|---|---|
TokensProcessedPerSecond |
Count/sec | > 5,000 | < 2,000 | Total input+output tokens processed across all models |
InvocationsPerMinute |
Count/min | > 1,000 | < 500 | Successful Bedrock invocations per minute |
ThrottleRate |
Ratio | < 0.02 | > 0.05 | ThrottlingExceptions / Total invocations |
EffectiveConcurrency |
Count | 60-80% of limit | < 40% of limit | Active concurrent invocations / Configured limit |
QueueDepth |
Count | < 100 | > 500 | Messages waiting in priority queue |
BatchQueueAge |
Seconds | < 600 | > 3,600 | Age of oldest message in batch queue |
DLQDepth |
Count | 0 | > 0 | Messages in dead-letter queue |
DegradationRate |
Ratio | < 0.01 | > 0.05 | Requests served by fallback / Total requests |
Batch Inference Processor with SQS
import json
import asyncio
import logging
import boto3
from botocore.exceptions import ClientError
from dataclasses import dataclass
from typing import Optional
logger = logging.getLogger("mangaassist.batch")
@dataclass
class BatchJob:
"""Represents a batch inference job from SQS."""
message_id: str
receipt_handle: str
job_type: str # "embedding_update" | "report_generation" | "catalog_enrichment"
payload: dict
priority: int # 0 = highest
retry_count: int = 0
class BatchInferenceProcessor:
"""
SQS-based batch inference processor for non-real-time MangaAssist workloads.
Processes jobs from an SQS FIFO queue using a dedicated Haiku concurrency
pool, separate from real-time user-facing traffic. Jobs that fail after
max retries are sent to a dead-letter queue.
Designed for:
- Product description enrichment (new manga titles)
- Embedding regeneration (catalog updates)
- Weekly recommendation batch generation
- Customer sentiment report generation
"""
def __init__(
self,
queue_url: str,
dlq_url: str,
concurrency_manager: "ConcurrencyManager",
max_workers: int = 5,
visibility_timeout_sec: int = 300,
max_retries: int = 3,
):
self.queue_url = queue_url
self.dlq_url = dlq_url
self.concurrency_manager = concurrency_manager
self.max_workers = max_workers
self.visibility_timeout_sec = visibility_timeout_sec
self.max_retries = max_retries
self.sqs = boto3.client("sqs")
self.bedrock = boto3.client("bedrock-runtime")
self._running = False
self._workers: list[asyncio.Task] = []
async def start(self):
"""Start batch processing workers."""
self._running = True
self._workers = [
asyncio.create_task(self._worker_loop(i))
for i in range(self.max_workers)
]
logger.info(
"BatchInferenceProcessor started: workers=%d, queue=%s",
self.max_workers, self.queue_url,
)
async def stop(self):
"""Gracefully stop all workers."""
self._running = False
for worker in self._workers:
worker.cancel()
await asyncio.gather(*self._workers, return_exceptions=True)
logger.info("BatchInferenceProcessor stopped")
async def _worker_loop(self, worker_id: int):
"""Main worker loop: poll SQS, process, acknowledge."""
while self._running:
try:
response = self.sqs.receive_message(
QueueUrl=self.queue_url,
MaxNumberOfMessages=1,
WaitTimeSeconds=20,
VisibilityTimeout=self.visibility_timeout_sec,
AttributeNames=["ApproximateReceiveCount"],
)
messages = response.get("Messages", [])
if not messages:
continue
for msg in messages:
job = self._parse_message(msg)
if job:
await self._process_job(worker_id, job)
except asyncio.CancelledError:
break
except Exception:
logger.exception("Worker %d encountered error", worker_id)
await asyncio.sleep(5)
def _parse_message(self, message: dict) -> Optional[BatchJob]:
"""Parse SQS message into a BatchJob."""
try:
body = json.loads(message["Body"])
receive_count = int(
message.get("Attributes", {}).get("ApproximateReceiveCount", "1")
)
return BatchJob(
message_id=message["MessageId"],
receipt_handle=message["ReceiptHandle"],
job_type=body["job_type"],
payload=body["payload"],
priority=body.get("priority", 2),
retry_count=receive_count - 1,
)
except (KeyError, json.JSONDecodeError):
logger.error("Failed to parse message: %s", message["MessageId"])
return None
async def _process_job(self, worker_id: int, job: BatchJob):
"""Process a single batch job through Bedrock."""
acquired = await self.concurrency_manager.acquire()
if not acquired:
logger.warning(
"Worker %d: circuit open, returning job %s to queue",
worker_id, job.message_id,
)
return # Message becomes visible again after visibility timeout
try:
prompt = self._build_prompt(job)
response = self.bedrock.invoke_model(
modelId="anthropic.claude-3-haiku-20240307-v1:0",
contentType="application/json",
accept="application/json",
body=json.dumps({
"anthropic_version": "bedrock-2023-05-31",
"messages": [{"role": "user", "content": prompt}],
"max_tokens": 500,
}),
)
result = json.loads(response["body"].read())
await self._store_result(job, result)
# Delete message on success
self.sqs.delete_message(
QueueUrl=self.queue_url,
ReceiptHandle=job.receipt_handle,
)
await self.concurrency_manager.release(success=True, tokens_used=200)
logger.info("Job %s completed by worker %d", job.message_id, worker_id)
except ClientError as e:
error_code = e.response["Error"]["Code"]
throttled = error_code == "ThrottlingException"
await self.concurrency_manager.release(
success=False, throttled=throttled,
)
if job.retry_count >= self.max_retries:
await self._send_to_dlq(job, str(e))
else:
logger.warning(
"Job %s failed (attempt %d/%d): %s",
job.message_id, job.retry_count + 1, self.max_retries, error_code,
)
except Exception as e:
await self.concurrency_manager.release(success=False)
logger.exception("Unexpected error processing job %s", job.message_id)
if job.retry_count >= self.max_retries:
await self._send_to_dlq(job, str(e))
def _build_prompt(self, job: BatchJob) -> str:
"""Build prompt based on job type."""
if job.job_type == "catalog_enrichment":
return (
f"Generate a concise product description for this manga title:\n"
f"Title: {job.payload['title']}\n"
f"Author: {job.payload['author']}\n"
f"Genre: {job.payload['genre']}\n"
f"Format: JSON with keys: description, tags, audience"
)
elif job.job_type == "embedding_update":
return (
f"Summarize this manga for embedding generation:\n"
f"{job.payload['content']}\n"
f"Output: single paragraph, max 100 words"
)
elif job.job_type == "report_generation":
return (
f"Analyze these customer interaction metrics:\n"
f"{json.dumps(job.payload['metrics'])}\n"
f"Output: JSON with keys: summary, trends, recommendations"
)
return job.payload.get("prompt", "")
async def _store_result(self, job: BatchJob, result: dict):
"""Store batch job result to appropriate destination."""
# Implementation varies by job type — DynamoDB, OpenSearch, S3
logger.info("Stored result for job %s (type=%s)", job.message_id, job.job_type)
async def _send_to_dlq(self, job: BatchJob, error: str):
"""Send failed job to dead-letter queue for investigation."""
self.sqs.send_message(
QueueUrl=self.dlq_url,
MessageBody=json.dumps({
"original_job": {
"message_id": job.message_id,
"job_type": job.job_type,
"payload": job.payload,
},
"error": error,
"retry_count": job.retry_count,
}),
)
# Delete from primary queue
self.sqs.delete_message(
QueueUrl=self.queue_url,
ReceiptHandle=job.receipt_handle,
)
logger.error(
"Job %s sent to DLQ after %d retries: %s",
job.message_id, job.retry_count, error,
)
Throughput Strategy Comparison
| Strategy | Requests/Min Capacity | Latency Impact | Complexity | Best For |
|---|---|---|---|---|
| Token minimization | +60-165% over baseline | -40-60% per request | Low | All workloads — do this first |
| Micro-batching | +30-50% for similar queries | +50-100ms (batch window) | Medium | High-traffic identical queries |
| SQS batch queue | Offloads 20-30% of traffic | N/A (async) | Medium | Non-real-time workloads |
| Adaptive concurrency | +20-40% utilization | Neutral | High | Maximizing Bedrock capacity usage |
| Priority queuing | Neutral (redistribution) | -30% for P0 requests | Medium | Mixed-priority workloads |
| Circuit breaker | Prevents cascading failure | +0ms (fast reject) | Medium | Overload protection |
| Model downgrade (Sonnet to Haiku) | +200-300% | +0ms (Haiku is faster) | Low | Degradation fallback |
| Conversation summarization | +100-400% | -50-80% per request | Medium | Multi-turn conversations |
Combined Throughput Impact for MangaAssist
Baseline: ~400 requests/min
+ Token minimization: ~700 requests/min (+75%)
+ Batch offload (non-real-time): ~850 requests/min (+21%, real-time freed)
+ Adaptive concurrency: ~1,020 requests/min (+20%)
+ Priority queuing + degradation: ~1,100 requests/min (sustained with graceful fallback)
Target achieved: 1,000+ requests/min sustained
Key Takeaways
-
Token reduction is the highest-ROI throughput optimization — fewer tokens per request means faster processing, which means more requests per second. Always optimize prompts before adding infrastructure complexity.
-
Separate real-time from batch traffic — Batch workloads (embeddings, reports, enrichment) should never compete with user-facing chat for Bedrock capacity. SQS queues with dedicated concurrency pools enforce this isolation.
-
Adaptive concurrency beats static limits — Static concurrency settings either waste capacity (set too low) or cause throttle storms (set too high). Adaptive systems continuously probe the optimal operating point.
-
Backpressure must be end-to-end — Circuit breakers at the Bedrock layer are necessary but not sufficient. Graceful degradation (cached responses, template fallbacks, model downgrades) ensures users always get a response.
-
Dead-letter queues are not optional — Every queue needs a DLQ with monitoring. A growing DLQ is an early warning of misconfigured retries, prompt errors, or Bedrock service issues.