Synchronous and Asynchronous Processing Patterns
MangaAssist context: JP Manga store chatbot on AWS — Bedrock Claude 3 (Sonnet at $3/$15 per 1M tokens input/output, Haiku at $0.25/$1.25), OpenSearch Serverless (vector store), DynamoDB (sessions/products), ECS Fargate (orchestrator), API Gateway WebSocket, ElastiCache Redis. Target: useful answer in under 3 seconds, 1M messages/day scale.
Skill Mapping
| Dimension | Value |
|---|---|
| Certification | AWS Certified AI Practitioner — Specialty (AIP-C01) |
| Task | 2.4 — Select and implement FM API integration patterns |
| Skill | 2.4.1 — Flexible Model Interaction |
| This File | 02 — Sync/Async Processing Patterns (SQS producer/consumer, batch processing, request transformation) |
Skill Scope
This file covers the implementation details of synchronous and asynchronous processing patterns for MangaAssist. Synchronous patterns handle real-time chat through direct Bedrock InvokeModel calls. Asynchronous patterns use SQS queues for deferred workloads like catalog enrichment, batch synopsis generation, and content moderation. Request transformation bridges the gap between API Gateway's inbound format and what Bedrock expects, handling Japanese text encoding, prompt assembly, and response shaping.
Mind Map
mindmap
root((Sync/Async<br/>Processing<br/>Patterns))
Sync Processing
Direct InvokeModel
ECS request handler
Redis cache layer
DynamoDB session read
Timeout enforcement
Async Processing
SQS Standard queues
SQS FIFO queues
Dead-letter queues (DLQ)
Lambda consumers
ECS long-poll consumers
Batch Processing
Batch window configuration
Partial batch failure handling
Concurrency control
Cost optimization via batching
Progress tracking
Request Transformation
API GW to internal format
Internal to Bedrock format
Bedrock response to client format
Japanese text normalization
Token counting pre-flight
Producer/Consumer
SQS message format design
Visibility timeout tuning
Message deduplication
Callback/notification on complete
Poison message handling
1. Synchronous Processing Pattern
1.1 ECS Request Handler
The synchronous handler in the ECS Fargate orchestrator receives requests from API Gateway, assembles the prompt with session context, calls Bedrock, and returns the response within the 3-second budget.
"""
Synchronous request handler for MangaAssist chat.
Runs inside ECS Fargate containers behind an ALB.
"""
import json
import time
import logging
import uuid
from typing import Dict, Any, Optional
from dataclasses import dataclass, asdict
import redis
import boto3
from boto3.dynamodb.conditions import Key
logger = logging.getLogger(__name__)
@dataclass
class ChatRequest:
"""Parsed chat request from the client."""
session_id: str
message: str
preferred_language: str = "ja"
model_preference: str = "auto" # auto | fast | quality
request_id: str = ""
def __post_init__(self):
if not self.request_id:
self.request_id = str(uuid.uuid4())
@dataclass
class ChatResponse:
"""Structured response back to the client."""
request_id: str
session_id: str
response_text: str
model_used: str
latency_ms: int
cached: bool = False
tokens_input: int = 0
tokens_output: int = 0
def to_dict(self) -> dict:
return asdict(self)
class SyncChatHandler:
"""
Handles synchronous chat requests with caching, session management,
and Bedrock invocation within a 3-second SLA.
"""
def __init__(
self,
bedrock_manager, # BedrockClientManager from file 01
redis_client: redis.Redis,
dynamodb_table_name: str = "manga-assist-sessions",
cache_ttl: int = 300,
):
self.bedrock = bedrock_manager
self.redis = redis_client
self.ddb = boto3.resource("dynamodb").Table(dynamodb_table_name)
self.cache_ttl = cache_ttl
def handle(self, raw_request: Dict[str, Any]) -> ChatResponse:
"""
Process a synchronous chat request end-to-end.
Timeline budget (3000ms):
- Parse + validate: ~10ms
- Cache check: ~5ms
- Session load: ~50ms
- Prompt build: ~5ms
- Bedrock invocation: ~2500ms (Sonnet) / ~800ms (Haiku)
- Cache store + save: ~100ms
- Serialize response: ~5ms
"""
start = time.time()
# 1. Parse request
request = self._parse_request(raw_request)
logger.info("Chat request | session=%s | lang=%s", request.session_id, request.preferred_language)
# 2. Check response cache
cache_key = self._build_cache_key(request)
cached = self._check_cache(cache_key)
if cached:
elapsed = int((time.time() - start) * 1000)
logger.info("Cache hit | session=%s | latency=%dms", request.session_id, elapsed)
return ChatResponse(
request_id=request.request_id,
session_id=request.session_id,
response_text=cached["response_text"],
model_used=cached["model_used"],
latency_ms=elapsed,
cached=True,
)
# 3. Load conversation history
history = self._load_session(request.session_id)
# 4. Build prompt body
model_key = self._select_model(request)
body = self._build_body(request.message, history, model_key)
# 5. Invoke Bedrock
result = self.bedrock.invoke_model(model_key, body)
# 6. Extract response text
response_text = result["content"][0]["text"]
tokens_in = result.get("usage", {}).get("input_tokens", 0)
tokens_out = result.get("usage", {}).get("output_tokens", 0)
# 7. Update session and cache (fire-and-forget for latency)
self._update_session(request.session_id, request.message, response_text)
self._store_cache(cache_key, {"response_text": response_text, "model_used": model_key})
elapsed = int((time.time() - start) * 1000)
logger.info(
"Chat complete | session=%s | model=%s | tokens=%d/%d | latency=%dms",
request.session_id, model_key, tokens_in, tokens_out, elapsed,
)
return ChatResponse(
request_id=request.request_id,
session_id=request.session_id,
response_text=response_text,
model_used=model_key,
latency_ms=elapsed,
tokens_input=tokens_in,
tokens_output=tokens_out,
)
def _parse_request(self, raw: dict) -> ChatRequest:
"""Extract and validate fields from the raw request."""
data = raw.get("data", {})
return ChatRequest(
session_id=data.get("sessionId", str(uuid.uuid4())),
message=data["message"],
preferred_language=data.get("preferredLanguage", "ja"),
model_preference=data.get("modelPreference", "auto"),
)
def _select_model(self, request: ChatRequest) -> str:
"""Choose a model based on request preference and message complexity."""
if request.model_preference == "fast":
return "haiku"
if request.model_preference == "quality":
return "sonnet"
# "auto" — use Haiku for short messages, Sonnet for complex ones
if len(request.message) < 50:
return "haiku"
return "sonnet"
def _build_cache_key(self, request: ChatRequest) -> str:
"""Build a Redis cache key from the request."""
import hashlib
content = f"{request.session_id}:{request.message}"
digest = hashlib.sha256(content.encode("utf-8")).hexdigest()[:16]
return f"manga:chat:{digest}"
def _check_cache(self, key: str) -> Optional[dict]:
"""Check Redis for a cached response."""
try:
data = self.redis.get(key)
if data:
return json.loads(data)
except redis.RedisError as exc:
logger.warning("Redis cache check failed: %s", exc)
return None
def _store_cache(self, key: str, value: dict) -> None:
"""Store a response in Redis with TTL."""
try:
self.redis.setex(key, self.cache_ttl, json.dumps(value, ensure_ascii=False))
except redis.RedisError as exc:
logger.warning("Redis cache store failed: %s", exc)
def _load_session(self, session_id: str) -> list:
"""Load conversation history from DynamoDB."""
try:
response = self.ddb.query(
KeyConditionExpression=Key("sessionId").eq(session_id),
ScanIndexForward=True,
Limit=20,
)
return response.get("Items", [])
except Exception as exc:
logger.warning("Session load failed: %s", exc)
return []
def _update_session(self, session_id: str, user_msg: str, assistant_msg: str) -> None:
"""Append the latest turn to the session in DynamoDB."""
try:
timestamp = int(time.time() * 1000)
self.ddb.put_item(Item={
"sessionId": session_id,
"timestamp": timestamp,
"role": "user",
"content": user_msg,
})
self.ddb.put_item(Item={
"sessionId": session_id,
"timestamp": timestamp + 1,
"role": "assistant",
"content": assistant_msg,
})
except Exception as exc:
logger.error("Session update failed: %s", exc)
def _build_body(self, message: str, history: list, model_key: str) -> dict:
"""Construct the Bedrock request body."""
messages = []
for item in history[-10:]:
messages.append({"role": item["role"], "content": item["content"]})
messages.append({"role": "user", "content": message})
return {
"anthropic_version": "bedrock-2023-05-31",
"messages": messages,
"system": "You are MangaAssist, a helpful JP manga store chatbot. Use polite keigo (敬語).",
}
2. Asynchronous Processing with SQS
2.1 Queue Architecture
flowchart TB
subgraph "Producers"
direction TB
P1[Admin API<br/>Catalog Enrichment]
P2[ECS Orchestrator<br/>Deferred Tasks]
P3[EventBridge<br/>Scheduled Jobs]
end
subgraph "SQS Queues"
direction TB
Q1[manga-enrichment-queue.fifo<br/>FIFO | Dedup 5min | Vis 300s]
Q2[manga-deferred-queue<br/>Standard | Vis 180s]
Q3[manga-moderation-queue<br/>Standard | Vis 120s]
DLQ1[manga-enrichment-dlq.fifo<br/>maxReceiveCount: 3]
DLQ2[manga-deferred-dlq<br/>maxReceiveCount: 5]
DLQ3[manga-moderation-dlq<br/>maxReceiveCount: 3]
Q1 -.->|after 3 failures| DLQ1
Q2 -.->|after 5 failures| DLQ2
Q3 -.->|after 3 failures| DLQ3
end
subgraph "Consumers"
direction TB
C1[Lambda<br/>Batch=10, Concurrency=5]
C2[ECS Task<br/>Long Poll 20s]
C3[Lambda<br/>Batch=1, Concurrency=10]
end
P1 --> Q1
P2 --> Q2
P3 --> Q1
P3 --> Q3
Q1 --> C1
Q2 --> C2
Q3 --> C3
C1 --> Bedrock[Amazon Bedrock]
C2 --> Bedrock
C3 --> Bedrock
C1 --> DDB[(DynamoDB<br/>Catalog)]
C2 --> SNS[SNS<br/>Callbacks]
C3 --> S3[S3<br/>Moderation Results]
2.2 SQS Producer — Catalog Enrichment
"""
SQS producer for manga catalog enrichment.
Sends enrichment jobs to a FIFO queue for ordered, deduplicated processing.
"""
import json
import time
import hashlib
import logging
from typing import List, Dict, Optional
import boto3
logger = logging.getLogger(__name__)
class CatalogEnrichmentProducer:
"""
Produces enrichment jobs for manga catalog items.
Uses FIFO queue to ensure ordered processing per manga series.
"""
def __init__(self, queue_url: str, region: str = "us-east-1"):
self.sqs = boto3.client("sqs", region_name=region)
self.queue_url = queue_url
def enrich_single(
self,
manga_id: str,
title: str,
title_jp: str,
enrichment_type: str = "synopsis",
priority: str = "normal",
) -> str:
"""
Send a single enrichment job to the queue.
Args:
manga_id: Unique manga identifier
title: English title
title_jp: Japanese title (原題)
enrichment_type: Type of enrichment (synopsis|tags|recommendations)
priority: Job priority (high|normal|low)
Returns:
SQS MessageId
"""
message_body = {
"jobType": "catalog_enrichment",
"enrichmentType": enrichment_type,
"manga": {
"id": manga_id,
"title": title,
"titleJp": title_jp,
},
"priority": priority,
"requestedAt": int(time.time()),
}
# FIFO: group by manga series for ordered processing
# Dedup: prevent duplicate enrichment within 5-minute window
dedup_id = hashlib.sha256(
f"{manga_id}:{enrichment_type}:{int(time.time() // 300)}".encode()
).hexdigest()[:128]
response = self.sqs.send_message(
QueueUrl=self.queue_url,
MessageBody=json.dumps(message_body, ensure_ascii=False),
MessageGroupId=f"manga-{manga_id[:8]}",
MessageDeduplicationId=dedup_id,
MessageAttributes={
"enrichmentType": {
"DataType": "String",
"StringValue": enrichment_type,
},
"priority": {
"DataType": "String",
"StringValue": priority,
},
},
)
msg_id = response["MessageId"]
logger.info(
"Enrichment job queued | manga=%s | type=%s | msgId=%s",
manga_id, enrichment_type, msg_id,
)
return msg_id
def enrich_batch(self, items: List[Dict], enrichment_type: str = "synopsis") -> Dict:
"""
Send a batch of enrichment jobs (up to 10 per SQS batch).
Returns:
Dict with 'successful' and 'failed' counts
"""
results = {"successful": 0, "failed": 0, "message_ids": []}
# SQS SendMessageBatch supports up to 10 messages
for batch_start in range(0, len(items), 10):
batch = items[batch_start:batch_start + 10]
entries = []
for idx, item in enumerate(batch):
dedup_id = hashlib.sha256(
f"{item['id']}:{enrichment_type}:{int(time.time() // 300)}".encode()
).hexdigest()[:128]
entries.append({
"Id": str(idx),
"MessageBody": json.dumps({
"jobType": "catalog_enrichment",
"enrichmentType": enrichment_type,
"manga": item,
"requestedAt": int(time.time()),
}, ensure_ascii=False),
"MessageGroupId": f"manga-{item['id'][:8]}",
"MessageDeduplicationId": dedup_id,
})
response = self.sqs.send_message_batch(
QueueUrl=self.queue_url,
Entries=entries,
)
results["successful"] += len(response.get("Successful", []))
results["failed"] += len(response.get("Failed", []))
results["message_ids"].extend(
[m["MessageId"] for m in response.get("Successful", [])]
)
logger.info(
"Batch enrichment queued | total=%d | success=%d | failed=%d",
len(items), results["successful"], results["failed"],
)
return results
2.3 SQS Consumer — Lambda Batch Processor
"""
Lambda consumer for manga catalog enrichment queue.
Processes batches of enrichment jobs with partial failure reporting.
"""
import json
import time
import logging
from typing import Dict, Any, List
import boto3
logger = logging.getLogger(__name__)
bedrock = boto3.client("bedrock-runtime", region_name="us-east-1")
dynamodb = boto3.resource("dynamodb").Table("manga-catalog")
ENRICHMENT_PROMPTS = {
"synopsis": (
"Generate a compelling 2-paragraph synopsis for this manga. "
"Write in both Japanese and English.\n\n"
"Title: {title}\nJapanese Title: {titleJp}\n\n"
"Provide the response as JSON with keys 'synopsis_en' and 'synopsis_jp'."
),
"tags": (
"Generate relevant tags/genres for this manga title. "
"Provide 5-10 tags in both English and Japanese.\n\n"
"Title: {title}\nJapanese Title: {titleJp}\n\n"
"Respond as JSON with keys 'tags_en' (list) and 'tags_jp' (list)."
),
"recommendations": (
"Based on this manga title, suggest 5 similar manga that fans would enjoy. "
"Include titles in both English and Japanese.\n\n"
"Title: {title}\nJapanese Title: {titleJp}\n\n"
"Respond as JSON array with objects having 'title_en' and 'title_jp' keys."
),
}
def lambda_handler(event: Dict[str, Any], context) -> Dict[str, Any]:
"""
Process SQS batch of enrichment jobs.
Uses partial batch failure reporting — returns only failed message IDs
so SQS can retry them while acknowledging successful ones.
Event source mapping config:
- BatchSize: 10
- MaximumBatchingWindowInSeconds: 30
- FunctionResponseTypes: ["ReportBatchItemFailures"]
"""
failed_items = []
batch_size = len(event["Records"])
logger.info("Processing enrichment batch | size=%d", batch_size)
for record in event["Records"]:
message_id = record["messageId"]
try:
body = json.loads(record["body"])
manga = body["manga"]
enrichment_type = body.get("enrichmentType", "synopsis")
# Build prompt from template
prompt_template = ENRICHMENT_PROMPTS.get(enrichment_type, ENRICHMENT_PROMPTS["synopsis"])
prompt = prompt_template.format(
title=manga.get("title", "Unknown"),
titleJp=manga.get("titleJp", "不明"),
)
# Invoke Bedrock (use Haiku for cost efficiency on batch jobs)
start = time.time()
response = bedrock.invoke_model(
modelId="anthropic.claude-3-haiku-20240307-v1:0",
contentType="application/json",
accept="application/json",
body=json.dumps({
"anthropic_version": "bedrock-2023-05-31",
"max_tokens": 1024,
"temperature": 0.5,
"messages": [{"role": "user", "content": prompt}],
}),
)
latency_ms = int((time.time() - start) * 1000)
result = json.loads(response["body"].read())
enrichment_data = result["content"][0]["text"]
# Parse the model's JSON response
try:
parsed_enrichment = json.loads(enrichment_data)
except json.JSONDecodeError:
parsed_enrichment = {"raw_text": enrichment_data}
# Store enrichment in DynamoDB catalog
dynamodb.update_item(
Key={"mangaId": manga["id"]},
UpdateExpression="SET #enrichment.#etype = :data, #enrichment.#updated = :ts",
ExpressionAttributeNames={
"#enrichment": "enrichment",
"#etype": enrichment_type,
"#updated": "lastUpdated",
},
ExpressionAttributeValues={
":data": parsed_enrichment,
":ts": int(time.time()),
},
)
logger.info(
"Enrichment complete | manga=%s | type=%s | latency=%dms",
manga["id"], enrichment_type, latency_ms,
)
except Exception as exc:
logger.error(
"Enrichment failed | msgId=%s | error=%s",
message_id, str(exc),
)
failed_items.append({"itemIdentifier": message_id})
logger.info(
"Batch complete | processed=%d | failed=%d",
batch_size, len(failed_items),
)
# Return failed items for SQS retry
return {"batchItemFailures": failed_items}
3. Request Transformation Pipeline
3.1 Transformation Flow
flowchart LR
subgraph "Inbound Transform"
A[Client JSON] --> B[Normalize Unicode<br/>NFC for JP]
B --> C[Validate Fields]
C --> D[Sanitize Input]
D --> E[Internal Format]
end
subgraph "Model Transform"
E --> F[Load Session<br/>Context]
F --> G[Build Messages<br/>Array]
G --> H[Add System<br/>Prompt]
H --> I[Bedrock Request<br/>Body]
end
subgraph "Outbound Transform"
J[Bedrock Response] --> K[Extract Text]
K --> L[Post-process<br/>Japanese]
L --> M[Build Client<br/>Response]
M --> N[Client JSON]
end
I --> J
3.2 Request Transformer Implementation
"""
Request transformation pipeline for MangaAssist.
Handles the full lifecycle: client → internal → Bedrock → client.
"""
import json
import re
import unicodedata
import logging
from typing import Dict, Any, Optional, List
from dataclasses import dataclass
logger = logging.getLogger(__name__)
@dataclass
class TransformContext:
"""Carries state through the transformation pipeline."""
raw_input: Dict[str, Any]
session_id: str = ""
user_message: str = ""
language: str = "ja"
conversation_history: List[Dict] = None
bedrock_body: Dict = None
bedrock_response: Dict = None
client_response: Dict = None
metadata: Dict = None
def __post_init__(self):
self.conversation_history = self.conversation_history or []
self.metadata = self.metadata or {}
class InboundTransformer:
"""Transforms client input into internal format."""
def transform(self, ctx: TransformContext) -> TransformContext:
"""Apply all inbound transformations."""
raw = ctx.raw_input
data = raw.get("data", {})
# Extract and normalize the user message
message = data.get("message", "")
message = self._normalize_unicode(message)
message = self._sanitize(message)
ctx.user_message = message
ctx.session_id = data.get("sessionId", "")
ctx.language = self._detect_language(message, data.get("preferredLanguage", "ja"))
ctx.metadata = {
"original_length": len(data.get("message", "")),
"normalized_length": len(message),
"detected_language": ctx.language,
"client_version": raw.get("metadata", {}).get("clientVersion", "unknown"),
}
logger.info(
"Inbound transform | session=%s | lang=%s | chars=%d",
ctx.session_id, ctx.language, len(message),
)
return ctx
def _normalize_unicode(self, text: str) -> str:
"""Normalize to NFC (required for consistent Japanese text)."""
return unicodedata.normalize("NFC", text)
def _sanitize(self, text: str) -> str:
"""Remove zero-width chars and excessive whitespace."""
text = re.sub(r"[\u200b-\u200f\u2028-\u202f\ufeff]", "", text)
text = re.sub(r"\n{3,}", "\n\n", text)
return text.strip()
def _detect_language(self, text: str, preferred: str) -> str:
"""Simple language detection based on character ranges."""
jp_chars = len(re.findall(r"[\u3040-\u309f\u30a0-\u30ff\u4e00-\u9fff]", text))
total = max(len(text), 1)
if jp_chars / total > 0.3:
return "ja"
return preferred
class ModelTransformer:
"""Transforms internal format into Bedrock request body."""
SYSTEM_PROMPTS = {
"ja": (
"あなたはMangaAssistです。日本のマンガ書店のチャットボットです。"
"お客様のマンガ探し、シリーズや作者に関する質問への回答、"
"おすすめの提案、注文のサポートを行います。丁寧な敬語を使用してください。"
),
"en": (
"You are MangaAssist, a chatbot for a Japanese manga store. "
"Help customers find manga, answer questions about series and authors, "
"provide recommendations, and assist with orders."
),
}
def transform(self, ctx: TransformContext) -> TransformContext:
"""Build the Bedrock request body from internal format."""
messages = []
# Add conversation history
for turn in ctx.conversation_history[-10:]:
messages.append({
"role": turn["role"],
"content": turn["content"],
})
# Add current message
messages.append({
"role": "user",
"content": ctx.user_message,
})
system_prompt = self.SYSTEM_PROMPTS.get(ctx.language, self.SYSTEM_PROMPTS["en"])
ctx.bedrock_body = {
"anthropic_version": "bedrock-2023-05-31",
"max_tokens": 4096,
"temperature": 0.3,
"system": system_prompt,
"messages": messages,
}
ctx.metadata["prompt_messages"] = len(messages)
ctx.metadata["system_language"] = ctx.language
return ctx
class OutboundTransformer:
"""Transforms Bedrock response into client format."""
def transform(self, ctx: TransformContext) -> TransformContext:
"""Build the client response from Bedrock output."""
if not ctx.bedrock_response:
ctx.client_response = {
"error": "No response from model",
"code": "MODEL_NO_RESPONSE",
}
return ctx
content = ctx.bedrock_response.get("content", [{}])
response_text = content[0].get("text", "") if content else ""
# Post-process Japanese text
response_text = self._post_process_japanese(response_text, ctx.language)
usage = ctx.bedrock_response.get("usage", {})
ctx.client_response = {
"status": "success",
"data": {
"message": response_text,
"sessionId": ctx.session_id,
"language": ctx.language,
},
"usage": {
"inputTokens": usage.get("input_tokens", 0),
"outputTokens": usage.get("output_tokens", 0),
},
"metadata": {
"model": ctx.bedrock_response.get("model", "unknown"),
"stopReason": ctx.bedrock_response.get("stop_reason", "unknown"),
},
}
return ctx
def _post_process_japanese(self, text: str, language: str) -> str:
"""Apply Japanese-specific post-processing."""
if language != "ja":
return text
# Ensure proper spacing between Japanese and ASCII text
text = re.sub(r"([\u3040-\u9fff])([a-zA-Z0-9])", r"\1 \2", text)
text = re.sub(r"([a-zA-Z0-9])([\u3040-\u9fff])", r"\1 \2", text)
return text
class TransformPipeline:
"""Orchestrates the full transformation pipeline."""
def __init__(self):
self.inbound = InboundTransformer()
self.model = ModelTransformer()
self.outbound = OutboundTransformer()
def process_inbound(self, raw_input: Dict) -> TransformContext:
"""Transform client input through inbound + model stages."""
ctx = TransformContext(raw_input=raw_input)
ctx = self.inbound.transform(ctx)
ctx = self.model.transform(ctx)
return ctx
def process_outbound(self, ctx: TransformContext) -> Dict:
"""Transform Bedrock response into client format."""
ctx = self.outbound.transform(ctx)
return ctx.client_response
4. Batch Processing Patterns
4.1 Batch Window Strategy
flowchart TD
subgraph "Batch Window Configuration"
A[SQS Event Source Mapping] --> B{Check Conditions}
B -->|BatchSize reached<br/>10 messages| C[Invoke Lambda]
B -->|Window timeout<br/>30 seconds| C
B -->|Neither| D[Continue collecting]
end
subgraph "Lambda Batch Processing"
C --> E[Receive 1-10 records]
E --> F{Process each record}
F -->|Success| G[Acknowledge]
F -->|Failure| H[Report in<br/>batchItemFailures]
G --> I[Return response]
H --> I
end
I --> J{SQS evaluates<br/>failures}
J -->|Has failures| K[Retry failed messages<br/>after visibility timeout]
J -->|No failures| L[Delete all messages]
4.2 ECS Long-Poll Consumer
For workloads that need more control than Lambda provides (long-running enrichments, stateful processing), the ECS consumer uses SQS long polling.
"""
ECS-based SQS long-poll consumer for deferred inference tasks.
Runs as a persistent process in an ECS Fargate task.
"""
import json
import time
import signal
import logging
import threading
from typing import Dict, Any, Optional
from concurrent.futures import ThreadPoolExecutor, as_completed
import boto3
logger = logging.getLogger(__name__)
class ECSQueueConsumer:
"""
Long-polling SQS consumer running in ECS Fargate.
Supports concurrent message processing and graceful shutdown.
"""
def __init__(
self,
queue_url: str,
max_workers: int = 5,
visibility_timeout: int = 180,
wait_time_seconds: int = 20,
region: str = "us-east-1",
):
self.sqs = boto3.client("sqs", region_name=region)
self.bedrock = boto3.client("bedrock-runtime", region_name=region)
self.queue_url = queue_url
self.max_workers = max_workers
self.visibility_timeout = visibility_timeout
self.wait_time = wait_time_seconds
self._running = True
self._executor = ThreadPoolExecutor(max_workers=max_workers)
# Graceful shutdown handling
signal.signal(signal.SIGTERM, self._shutdown_handler)
signal.signal(signal.SIGINT, self._shutdown_handler)
def _shutdown_handler(self, signum, frame):
"""Handle graceful shutdown signal from ECS."""
logger.info("Shutdown signal received (sig=%d), draining...", signum)
self._running = False
def run(self):
"""Main consumer loop — runs until shutdown signal."""
logger.info(
"Consumer started | queue=%s | workers=%d",
self.queue_url, self.max_workers,
)
while self._running:
try:
response = self.sqs.receive_message(
QueueUrl=self.queue_url,
MaxNumberOfMessages=10,
WaitTimeSeconds=self.wait_time,
VisibilityTimeout=self.visibility_timeout,
MessageAttributeNames=["All"],
)
messages = response.get("Messages", [])
if not messages:
continue
logger.info("Received %d messages", len(messages))
# Process messages concurrently
futures = {
self._executor.submit(self._process_message, msg): msg
for msg in messages
}
for future in as_completed(futures, timeout=self.visibility_timeout):
msg = futures[future]
try:
future.result()
# Delete successfully processed message
self.sqs.delete_message(
QueueUrl=self.queue_url,
ReceiptHandle=msg["ReceiptHandle"],
)
except Exception as exc:
logger.error(
"Message processing failed | msgId=%s | error=%s",
msg["MessageId"], str(exc),
)
# Message will return to queue after visibility timeout
except Exception as exc:
logger.error("Consumer loop error: %s", exc)
time.sleep(5)
logger.info("Consumer shutdown complete")
self._executor.shutdown(wait=True)
def _process_message(self, message: Dict[str, Any]) -> None:
"""Process a single SQS message."""
body = json.loads(message["Body"])
job_type = body.get("jobType", "unknown")
logger.info(
"Processing message | msgId=%s | jobType=%s",
message["MessageId"], job_type,
)
if job_type == "deferred_inference":
self._handle_deferred_inference(body)
elif job_type == "batch_recommendation":
self._handle_batch_recommendation(body)
else:
logger.warning("Unknown job type: %s", job_type)
def _handle_deferred_inference(self, body: Dict) -> None:
"""Handle a deferred inference job."""
prompt = body.get("prompt", "")
model_id = body.get("modelId", "anthropic.claude-3-haiku-20240307-v1:0")
callback_url = body.get("callbackUrl")
response = self.bedrock.invoke_model(
modelId=model_id,
contentType="application/json",
accept="application/json",
body=json.dumps({
"anthropic_version": "bedrock-2023-05-31",
"max_tokens": 2048,
"messages": [{"role": "user", "content": prompt}],
}),
)
result = json.loads(response["body"].read())
# If callback URL provided, notify the requester
if callback_url:
self._send_callback(callback_url, {
"status": "completed",
"jobId": body.get("jobId"),
"result": result["content"][0]["text"],
})
def _handle_batch_recommendation(self, body: Dict) -> None:
"""Handle batch recommendation generation."""
manga_ids = body.get("mangaIds", [])
for manga_id in manga_ids:
# Process each manga recommendation
logger.info("Generating recommendations for manga=%s", manga_id)
# Implementation would invoke Bedrock and store results
def _send_callback(self, url: str, payload: Dict) -> None:
"""Send completion callback via SNS or HTTP."""
import urllib.request
try:
req = urllib.request.Request(
url,
data=json.dumps(payload).encode("utf-8"),
headers={"Content-Type": "application/json"},
method="POST",
)
urllib.request.urlopen(req, timeout=10)
except Exception as exc:
logger.error("Callback failed | url=%s | error=%s", url, str(exc))
if __name__ == "__main__":
import os
consumer = ECSQueueConsumer(
queue_url=os.environ["SQS_QUEUE_URL"],
max_workers=int(os.environ.get("MAX_WORKERS", "5")),
)
consumer.run()
5. Dead-Letter Queue Processing
5.1 DLQ Reprocessor
"""
DLQ reprocessor for MangaAssist.
Inspects failed messages, classifies errors, and either retries or archives.
"""
import json
import time
import logging
from typing import Dict, List
from enum import Enum
import boto3
logger = logging.getLogger(__name__)
class FailureClass(Enum):
TRANSIENT = "transient" # Retry will likely succeed
PERMANENT = "permanent" # Message is malformed, won't ever succeed
THROTTLE = "throttle" # Rate limited, retry after delay
UNKNOWN = "unknown"
class DLQReprocessor:
"""Inspect and reprocess messages from dead-letter queues."""
def __init__(self, dlq_url: str, source_queue_url: str, region: str = "us-east-1"):
self.sqs = boto3.client("sqs", region_name=region)
self.dlq_url = dlq_url
self.source_queue_url = source_queue_url
def classify_failure(self, message: Dict) -> FailureClass:
"""Classify the failure type based on message attributes and body."""
body = json.loads(message.get("Body", "{}"))
# Check approximate receive count
receive_count = int(
message.get("Attributes", {}).get("ApproximateReceiveCount", "0")
)
# Messages that failed many times are likely permanent failures
if receive_count > 10:
return FailureClass.PERMANENT
# Check for known transient error markers
error_msg = body.get("_error", "").lower()
if "throttl" in error_msg or "rate" in error_msg:
return FailureClass.THROTTLE
if "timeout" in error_msg or "connection" in error_msg:
return FailureClass.TRANSIENT
if "validation" in error_msg or "malformed" in error_msg:
return FailureClass.PERMANENT
return FailureClass.UNKNOWN
def reprocess(self, max_messages: int = 100) -> Dict:
"""
Process messages from the DLQ.
- Transient/throttle: re-enqueue to source queue
- Permanent: archive to S3
- Unknown: flag for manual review
"""
stats = {"requeued": 0, "archived": 0, "flagged": 0}
processed = 0
while processed < max_messages:
response = self.sqs.receive_message(
QueueUrl=self.dlq_url,
MaxNumberOfMessages=10,
WaitTimeSeconds=5,
AttributeNames=["All"],
)
messages = response.get("Messages", [])
if not messages:
break
for msg in messages:
failure_class = self.classify_failure(msg)
if failure_class in (FailureClass.TRANSIENT, FailureClass.THROTTLE):
# Re-enqueue to source queue
self.sqs.send_message(
QueueUrl=self.source_queue_url,
MessageBody=msg["Body"],
)
stats["requeued"] += 1
elif failure_class == FailureClass.PERMANENT:
# Archive (would store to S3 in production)
logger.info("Archiving permanent failure | msgId=%s", msg["MessageId"])
stats["archived"] += 1
else:
logger.warning("Unknown failure | msgId=%s", msg["MessageId"])
stats["flagged"] += 1
# Delete from DLQ after handling
self.sqs.delete_message(
QueueUrl=self.dlq_url,
ReceiptHandle=msg["ReceiptHandle"],
)
processed += 1
logger.info("DLQ reprocessing complete | stats=%s", stats)
return stats
6. CloudWatch Metrics and Monitoring
6.1 Custom Metrics Architecture
flowchart LR
subgraph "Sources"
ECS[ECS Handler] -->|put_metric_data| CW
Lambda[Lambda Consumer] -->|put_metric_data| CW
APIGW[API Gateway] -->|Built-in| CW
end
subgraph "CloudWatch"
CW[CloudWatch Metrics]
CW --> D1[Dashboard:<br/>MangaAssist-Ops]
CW --> A1[Alarm:<br/>SyncLatency > 3s]
CW --> A2[Alarm:<br/>DLQ depth > 100]
CW --> A3[Alarm:<br/>Error rate > 5%]
end
A1 --> SNS[SNS Topic]
A2 --> SNS
A3 --> SNS
SNS --> OPS[Ops Team<br/>PagerDuty]
6.2 Metrics Publisher
"""
CloudWatch custom metrics publisher for MangaAssist.
Publishes latency, error rates, and queue depth metrics.
"""
import time
import logging
from typing import List, Dict
from datetime import datetime
import boto3
logger = logging.getLogger(__name__)
class MangaMetricsPublisher:
"""Publishes custom CloudWatch metrics for MangaAssist operations."""
NAMESPACE = "MangaAssist/Operations"
def __init__(self, environment: str = "prod"):
self.cw = boto3.client("cloudwatch")
self.environment = environment
self._buffer: List[Dict] = []
self._buffer_limit = 20 # CloudWatch max per PutMetricData call
def record_sync_latency(self, model: str, latency_ms: float, cached: bool = False):
"""Record synchronous invocation latency."""
self._buffer.append({
"MetricName": "SyncLatency",
"Dimensions": [
{"Name": "Environment", "Value": self.environment},
{"Name": "Model", "Value": model},
{"Name": "Cached", "Value": str(cached)},
],
"Value": latency_ms,
"Unit": "Milliseconds",
"Timestamp": datetime.utcnow(),
})
self._flush_if_full()
def record_async_job(self, job_type: str, success: bool, processing_ms: float):
"""Record async job completion."""
self._buffer.append({
"MetricName": "AsyncJobDuration",
"Dimensions": [
{"Name": "Environment", "Value": self.environment},
{"Name": "JobType", "Value": job_type},
{"Name": "Status", "Value": "success" if success else "failure"},
],
"Value": processing_ms,
"Unit": "Milliseconds",
"Timestamp": datetime.utcnow(),
})
self._flush_if_full()
def record_error(self, operation: str, error_type: str):
"""Record an error occurrence."""
self._buffer.append({
"MetricName": "ErrorCount",
"Dimensions": [
{"Name": "Environment", "Value": self.environment},
{"Name": "Operation", "Value": operation},
{"Name": "ErrorType", "Value": error_type},
],
"Value": 1,
"Unit": "Count",
"Timestamp": datetime.utcnow(),
})
self._flush_if_full()
def _flush_if_full(self):
"""Flush buffer when it reaches the CloudWatch batch limit."""
if len(self._buffer) >= self._buffer_limit:
self.flush()
def flush(self):
"""Send buffered metrics to CloudWatch."""
if not self._buffer:
return
try:
self.cw.put_metric_data(
Namespace=self.NAMESPACE,
MetricData=self._buffer,
)
logger.debug("Flushed %d metrics to CloudWatch", len(self._buffer))
except Exception as exc:
logger.error("Metrics flush failed: %s", exc)
finally:
self._buffer.clear()
Key Takeaways
| # | Takeaway |
|---|---|
| 1 | Sync path budget — Allocate ~2500ms for Bedrock InvokeModel out of the 3000ms SLA; everything else (DDB, Redis, network) must fit in ~500ms. |
| 2 | SQS FIFO for ordered workloads — Catalog enrichment uses FIFO with MessageGroupId per manga series to prevent out-of-order updates. |
| 3 | Partial batch failure — Lambda returns batchItemFailures so SQS only retries the specific messages that failed, not the entire batch. |
| 4 | Haiku for batch jobs — Async enrichment uses Haiku ($0.25/$1.25 per 1M) instead of Sonnet ($3/$15) — 12x cheaper for non-interactive workloads. |
| 5 | DLQ classification — Separate transient (retry), permanent (archive), and throttle (delayed retry) failures to avoid wasting compute on unrecoverable messages. |
| 6 | ECS long-poll consumer with graceful SIGTERM handling ensures zero message loss during ECS task replacement or scaling events. |
| 7 | Request transformation pipeline keeps concerns separated (inbound/model/outbound) and makes each stage independently testable. |
| 8 | Japanese text normalization (NFC) must happen at the inbound stage before cache key computation or prompt construction. |