Asynchronous Processing and Request Validation Patterns
MangaAssist context: JP Manga store chatbot on AWS — Bedrock Claude 3 (Sonnet at $3/$15 per 1M tokens input/output, Haiku at $0.25/$1.25), OpenSearch Serverless (vector store), DynamoDB (sessions/products), ECS Fargate (orchestrator), API Gateway WebSocket, ElastiCache Redis. Target: useful answer in under 3 seconds, 1M messages/day scale.
Skill Mapping
| Attribute | Value |
|---|---|
| Certification | AWS Certified AI Practitioner (AIP-C01) |
| Domain | 2 — Implementation and Integration of Foundation Models |
| Task | 2.4 — Design model interaction systems for generative AI applications |
| Skill | 2.4.1 — Create flexible model interaction systems (Bedrock APIs for synchronous requests, language-specific SDKs and SQS for asynchronous processing, API Gateway for custom API clients with request validation) |
1. SQS Queue Design for FM Requests
1.1 Queue Architecture Overview
flowchart TB
subgraph Producers["Request Producers"]
API["API Gateway<br/>REST Endpoints"]
WS["WebSocket API<br/>Batch Submissions"]
CRON["EventBridge<br/>Scheduled Jobs"]
STEP["Step Functions<br/>Workflow Steps"]
end
subgraph Queues["SQS Queue Topology"]
direction TB
subgraph Standard["Standard Queues (At-Least-Once)"]
HIGH["manga-fm-high-priority<br/>Visibility: 60s<br/>Retention: 4 days"]
NORMAL["manga-fm-normal<br/>Visibility: 120s<br/>Retention: 7 days"]
LOW["manga-fm-low-priority<br/>Visibility: 300s<br/>Retention: 14 days"]
end
subgraph FIFO["FIFO Queues (Exactly-Once, Ordered)"]
ORDERED["manga-fm-ordered.fifo<br/>MessageGroupId: per-user<br/>Dedup: 5 min window"]
SESSION["manga-fm-session.fifo<br/>MessageGroupId: session_id<br/>Ordered per session"]
end
subgraph DLQs["Dead Letter Queues"]
DLQ_STD["manga-fm-dlq<br/>Max receives: 3<br/>Retention: 14 days"]
DLQ_FIFO["manga-fm-dlq.fifo<br/>Max receives: 3<br/>Retention: 14 days"]
end
end
subgraph Consumers["Request Consumers"]
LAMBDA["Lambda Workers<br/>(short tasks, < 15 min)"]
ECS["ECS Fargate Tasks<br/>(long-running analysis)"]
EC2["EC2 Workers<br/>(GPU-adjacent, bulk)"]
end
subgraph Results["Result Delivery"]
DDB["DynamoDB<br/>Status Tracking"]
S3["S3 Bucket<br/>Result Storage"]
SNS["SNS Topic<br/>Completion Notify"]
CALLBACK["Webhook<br/>Callback URL"]
end
API --> HIGH
API --> NORMAL
WS --> LOW
CRON --> LOW
STEP --> ORDERED
HIGH --> LAMBDA
NORMAL --> LAMBDA
NORMAL --> ECS
LOW --> ECS
LOW --> EC2
ORDERED --> LAMBDA
SESSION --> LAMBDA
HIGH -->|"3 failures"| DLQ_STD
NORMAL -->|"3 failures"| DLQ_STD
LOW -->|"3 failures"| DLQ_STD
ORDERED -->|"3 failures"| DLQ_FIFO
LAMBDA --> DDB
LAMBDA --> S3
ECS --> DDB
ECS --> S3
LAMBDA --> SNS
ECS --> SNS
SNS --> CALLBACK
style Standard fill:#e8f5e9,stroke:#388e3c
style FIFO fill:#e3f2fd,stroke:#1565c0
style DLQs fill:#fce4ec,stroke:#c62828
1.2 FIFO vs Standard Queue Selection
| Factor | Standard Queue | FIFO Queue | MangaAssist Decision |
|---|---|---|---|
| Throughput | Unlimited (batched) | 300 msg/s (batch), 3000 msg/s (high throughput) | Standard for bulk; FIFO for session-ordered |
| Ordering | Best-effort | Strict per MessageGroupId | FIFO needed for multi-turn context preservation |
| Deduplication | Not built-in | 5-min dedup window | FIFO avoids duplicate FM calls on retries |
| Delivery | At-least-once | Exactly-once processing | FIFO avoids double billing from duplicate invocations |
| Cost | $0.40/1M requests | $0.50/1M requests | Standard for non-ordered batch work |
| Max message size | 256 KB | 256 KB | Both sufficient for FM request payloads |
| Retention | 1 min to 14 days | 1 min to 14 days | 7 days standard, 14 days DLQ for investigation |
MangaAssist queue assignment rules:
- Standard queue — Batch catalog enrichment, content moderation, genre classification (order irrelevant).
- FIFO queue — Multi-turn conversation continuations, ordered per-user request sequences where context must be preserved.
- High-priority standard — Partner API real-time requests that narrowly miss the sync SLA but need fast turnaround.
1.3 Dead Letter Queue (DLQ) Configuration
"""
DLQ configuration and monitoring for MangaAssist FM request queues.
"""
import json
import time
import logging
from datetime import datetime, timedelta
import boto3
logger = logging.getLogger(__name__)
class DLQManager:
"""
Manages dead letter queues for failed FM requests.
Provides monitoring, redriving, and alerting capabilities.
DLQ triggers:
- Bedrock ThrottlingException after all retries exhausted
- Bedrock ModelTimeoutException (model overloaded)
- Malformed request body that fails deserialization
- Lambda/ECS consumer timeout before processing completes
"""
def __init__(self, region: str = "us-east-1"):
self._sqs = boto3.client("sqs", region_name=region)
self._cloudwatch = boto3.client("cloudwatch", region_name=region)
self._sns = boto3.client("sns", region_name=region)
self._dlq_urls = {
"standard": self._get_queue_url("manga-fm-dlq"),
"fifo": self._get_queue_url("manga-fm-dlq.fifo")
}
self._alert_topic_arn = "arn:aws:sns:us-east-1:123456789012:manga-fm-alerts"
def _get_queue_url(self, queue_name: str) -> str:
"""Resolve queue URL from name."""
response = self._sqs.get_queue_url(QueueName=queue_name)
return response["QueueUrl"]
def get_dlq_depth(self, queue_type: str = "standard") -> int:
"""
Get the current approximate message count in a DLQ.
Args:
queue_type: 'standard' or 'fifo'
Returns:
Approximate number of messages in the DLQ
"""
url = self._dlq_urls[queue_type]
attrs = self._sqs.get_queue_attributes(
QueueUrl=url,
AttributeNames=["ApproximateNumberOfMessages"]
)
return int(attrs["Attributes"]["ApproximateNumberOfMessages"])
def sample_dlq_messages(
self, queue_type: str = "standard", max_messages: int = 5
) -> list:
"""
Sample messages from the DLQ without deleting them.
Visibility timeout set to 0 so messages remain available.
"""
url = self._dlq_urls[queue_type]
response = self._sqs.receive_message(
QueueUrl=url,
MaxNumberOfMessages=min(max_messages, 10),
VisibilityTimeout=0, # Peek — do not hide message
MessageAttributeNames=["All"],
AttributeNames=["All"]
)
messages = response.get("Messages", [])
sampled = []
for msg in messages:
body = json.loads(msg["Body"])
sampled.append({
"message_id": msg["MessageId"],
"request_id": body.get("request_id", "unknown"),
"prompt_preview": body.get("prompt", "")[:100],
"model_id": body.get("model_id", "unknown"),
"sent_timestamp": msg["Attributes"].get("SentTimestamp"),
"receive_count": int(msg["Attributes"].get("ApproximateReceiveCount", 0)),
"first_receive": msg["Attributes"].get("ApproximateFirstReceiveTimestamp")
})
return sampled
def redrive_messages(
self,
queue_type: str = "standard",
target_queue_name: str = "manga-fm-normal",
max_messages: int = 100,
filter_fn: callable = None
) -> dict:
"""
Redrive messages from DLQ back to a source queue for reprocessing.
Args:
queue_type: Which DLQ to drain
target_queue_name: Destination queue for reprocessing
max_messages: Maximum messages to redrive
filter_fn: Optional filter — only redrive if filter_fn(body) returns True
Returns:
Summary with counts of redriven, filtered, and failed messages
"""
dlq_url = self._dlq_urls[queue_type]
target_url = self._get_queue_url(target_queue_name)
redriven = 0
filtered = 0
failed = 0
processed = 0
while processed < max_messages:
batch_size = min(10, max_messages - processed)
response = self._sqs.receive_message(
QueueUrl=dlq_url,
MaxNumberOfMessages=batch_size,
VisibilityTimeout=30,
MessageAttributeNames=["All"],
WaitTimeSeconds=1
)
messages = response.get("Messages", [])
if not messages:
break # DLQ is empty
for msg in messages:
processed += 1
body = json.loads(msg["Body"])
# Apply optional filter
if filter_fn and not filter_fn(body):
filtered += 1
# Release visibility so message stays in DLQ
self._sqs.change_message_visibility(
QueueUrl=dlq_url,
ReceiptHandle=msg["ReceiptHandle"],
VisibilityTimeout=0
)
continue
try:
# Send to target queue
send_kwargs = {
"QueueUrl": target_url,
"MessageBody": msg["Body"],
"MessageAttributes": {
k: v for k, v in msg.get("MessageAttributes", {}).items()
}
}
# Add FIFO attributes if target is FIFO
if target_queue_name.endswith(".fifo"):
send_kwargs["MessageGroupId"] = body.get(
"message_group_id", "redrive"
)
send_kwargs["MessageDeduplicationId"] = (
f"redrive-{msg['MessageId']}"
)
self._sqs.send_message(**send_kwargs)
# Delete from DLQ after successful send
self._sqs.delete_message(
QueueUrl=dlq_url,
ReceiptHandle=msg["ReceiptHandle"]
)
redriven += 1
except Exception as e:
logger.error(f"Redrive failed for {msg['MessageId']}: {e}")
failed += 1
summary = {
"redriven": redriven,
"filtered": filtered,
"failed": failed,
"total_processed": processed
}
logger.info(f"DLQ redrive complete: {summary}")
return summary
def check_and_alert(self, threshold: int = 50):
"""
Check DLQ depth and send SNS alert if threshold exceeded.
Called periodically by EventBridge scheduled rule.
"""
for queue_type in ["standard", "fifo"]:
depth = self.get_dlq_depth(queue_type)
# Emit CloudWatch metric
self._cloudwatch.put_metric_data(
Namespace="MangaAssist/SQS",
MetricData=[{
"MetricName": "DLQDepth",
"Value": depth,
"Unit": "Count",
"Dimensions": [
{"Name": "QueueType", "Value": queue_type},
{"Name": "Service", "Value": "MangaAssist"}
]
}]
)
if depth > threshold:
self._sns.publish(
TopicArn=self._alert_topic_arn,
Subject=f"[MangaAssist] DLQ Alert: {queue_type} depth = {depth}",
Message=json.dumps({
"alert": "DLQ depth threshold exceeded",
"queue_type": queue_type,
"depth": depth,
"threshold": threshold,
"timestamp": datetime.utcnow().isoformat(),
"action_required": (
"Investigate failed FM requests. Common causes: "
"Bedrock throttling, malformed requests, consumer timeouts. "
"Sample messages with DLQManager.sample_dlq_messages() "
"and redrive after root cause is resolved."
)
}, indent=2)
)
1.4 Visibility Timeout Strategy
Visibility Timeout Guidelines for FM Request Queues:
Queue: manga-fm-high-priority
Model: Haiku (fast)
Expected processing: 2-5s
Visibility timeout: 60s
Rationale: 12x headroom for cold starts and transient delays.
If Lambda times out at 30s, message reappears quickly.
Queue: manga-fm-normal
Model: Haiku or Sonnet
Expected processing: 5-30s
Visibility timeout: 120s
Rationale: Sonnet can take 10-15s; add buffer for retries within
the consumer. 2 min prevents premature re-delivery.
Queue: manga-fm-low-priority
Model: Sonnet (complex analysis)
Expected processing: 30s-5min
Visibility timeout: 300s
Rationale: Long-running enrichment tasks. If ECS task takes longer,
it extends visibility via ChangeMessageVisibility API.
Queue: manga-fm-ordered.fifo
Model: Varies
Expected processing: 2-30s
Visibility timeout: 60s
Rationale: Per-session ordering means blocking. Keep timeout tight
so next message in group processes promptly on failure.
DLQ: manga-fm-dlq / manga-fm-dlq.fifo
Visibility timeout: 30s (only for sampling/investigation)
Retention: 14 days (maximum for forensic analysis)
2. Request Validation Schemas and Mapping Templates
2.1 RequestSchema — Comprehensive Validation Framework
"""
Comprehensive request validation framework for MangaAssist FM API.
Handles JSON Schema validation, Unicode-aware sanitization,
content-length checks, and request normalization.
"""
import re
import json
import logging
from typing import Optional
from dataclasses import dataclass, field
import jsonschema
from jsonschema import Draft7Validator, FormatChecker
logger = logging.getLogger(__name__)
@dataclass
class ValidationResult:
"""Result of a request validation operation."""
valid: bool
errors: list = field(default_factory=list)
warnings: list = field(default_factory=list)
normalized_body: Optional[dict] = None
estimated_tokens: int = 0
class RequestSchema:
"""
Request validation engine for MangaAssist FM API endpoints.
Supports:
- JSON Schema Draft-7 validation
- Unicode-aware string validation (CJK characters)
- Token estimation for cost prediction
- Request normalization and default injection
- Custom format validators (manga_id, isbn, etc.)
Usage:
schema = RequestSchema()
result = schema.validate("manga_query", request_body)
if result.valid:
# Use result.normalized_body for FM invocation
pass
else:
# Return result.errors to client
pass
"""
# Approximate token-to-character ratios
CHARS_PER_TOKEN = {
"en": 4.0, # English: ~4 chars per token
"ja": 1.5, # Japanese: ~1.5 chars per token (CJK is dense)
"mixed": 2.5 # Mixed content estimate
}
# Maximum input tokens per model to prevent excessive cost
MAX_INPUT_TOKENS = {
"anthropic.claude-3-sonnet-20240229-v1:0": 200000,
"anthropic.claude-3-haiku-20240307-v1:0": 200000,
}
# Practical per-request token limits for cost control
COST_GUARD_TOKENS = {
"anthropic.claude-3-sonnet-20240229-v1:0": 8000, # ~$0.024 input
"anthropic.claude-3-haiku-20240307-v1:0": 16000, # ~$0.004 input
}
SCHEMAS = {
"manga_query": {
"$schema": "http://json-schema.org/draft-07/schema#",
"title": "MangaAssist Query Request",
"type": "object",
"required": ["query"],
"properties": {
"query": {
"type": "string",
"minLength": 1,
"maxLength": 8000,
"description": "User query text. Supports Japanese Unicode (Hiragana, Katakana, Kanji)."
},
"model": {
"type": "string",
"enum": ["sonnet", "haiku"],
"default": "haiku",
"description": "Model selection. Haiku for quick lookups, Sonnet for detailed analysis."
},
"max_tokens": {
"type": "integer",
"minimum": 1,
"maximum": 4096,
"default": 512
},
"temperature": {
"type": "number",
"minimum": 0.0,
"maximum": 1.0,
"default": 0.5
},
"conversation_id": {
"type": "string",
"format": "uuid"
},
"language": {
"type": "string",
"enum": ["en", "ja", "auto"],
"default": "auto"
},
"context": {
"type": "object",
"properties": {
"user_id": {"type": "string"},
"session_id": {"type": "string"},
"previous_manga": {
"type": "array",
"items": {"type": "string"},
"maxItems": 10
}
}
}
},
"additionalProperties": False
},
"batch_analysis": {
"$schema": "http://json-schema.org/draft-07/schema#",
"title": "MangaAssist Batch Analysis Request",
"type": "object",
"required": ["items"],
"properties": {
"items": {
"type": "array",
"minItems": 1,
"maxItems": 100,
"items": {
"type": "object",
"required": ["manga_id", "analysis_type"],
"properties": {
"manga_id": {
"type": "string",
"pattern": "^MNG-[A-Z0-9]{8}$"
},
"analysis_type": {
"type": "string",
"enum": [
"genre_classification",
"content_summary",
"similar_series",
"reading_level",
"full_enrichment"
]
},
"additional_context": {
"type": "string",
"maxLength": 2000
}
},
"additionalProperties": False
}
},
"callback_url": {
"type": "string",
"format": "uri",
"pattern": "^https://"
},
"priority": {
"type": "string",
"enum": ["high", "normal", "low"],
"default": "normal"
},
"notification": {
"type": "object",
"properties": {
"sns_topic_arn": {"type": "string"},
"email": {"type": "string", "format": "email"}
}
}
},
"additionalProperties": False
},
"streaming_chat": {
"$schema": "http://json-schema.org/draft-07/schema#",
"title": "MangaAssist Streaming Chat Request",
"type": "object",
"required": ["messages"],
"properties": {
"messages": {
"type": "array",
"minItems": 1,
"maxItems": 50,
"items": {
"type": "object",
"required": ["role", "content"],
"properties": {
"role": {
"type": "string",
"enum": ["user", "assistant"]
},
"content": {
"type": "string",
"minLength": 1,
"maxLength": 16000
}
}
}
},
"model": {
"type": "string",
"enum": ["sonnet", "haiku"],
"default": "sonnet"
},
"max_tokens": {
"type": "integer",
"minimum": 1,
"maximum": 4096,
"default": 2048
},
"stream": {
"type": "boolean",
"default": True
},
"session_id": {
"type": "string",
"format": "uuid"
}
},
"additionalProperties": False
}
}
def __init__(self):
"""Initialize validators with custom format checkers."""
self._format_checker = FormatChecker()
self._register_custom_formats()
self._validators = {}
for name, schema in self.SCHEMAS.items():
self._validators[name] = Draft7Validator(
schema,
format_checker=self._format_checker
)
def _register_custom_formats(self):
"""Register custom format validators for manga-specific data."""
@self._format_checker.checks("uuid", raises=jsonschema.FormatError)
def check_uuid(value):
pattern = r'^[a-f0-9]{8}-[a-f0-9]{4}-[a-f0-9]{4}-[a-f0-9]{4}-[a-f0-9]{12}$'
if not re.match(pattern, value):
raise jsonschema.FormatError(f"'{value}' is not a valid UUID")
return True
@self._format_checker.checks("manga_id", raises=jsonschema.FormatError)
def check_manga_id(value):
if not re.match(r'^MNG-[A-Z0-9]{8}$', value):
raise jsonschema.FormatError(
f"'{value}' is not a valid manga ID (expected MNG-XXXXXXXX)"
)
return True
@self._format_checker.checks("isbn", raises=jsonschema.FormatError)
def check_isbn(value):
clean = value.replace("-", "")
if len(clean) not in (10, 13) or not clean.replace("X", "").isdigit():
raise jsonschema.FormatError(f"'{value}' is not a valid ISBN")
return True
def validate(self, schema_name: str, body: dict) -> ValidationResult:
"""
Validate a request body against a named schema.
Performs:
1. JSON Schema structural validation
2. Unicode content checks
3. Token estimation for cost guard
4. Request normalization (default injection)
Args:
schema_name: Schema key from SCHEMAS dict
body: Parsed JSON request body
Returns:
ValidationResult with errors, warnings, and normalized body
"""
validator = self._validators.get(schema_name)
if not validator:
return ValidationResult(
valid=False,
errors=[{"path": "$", "message": f"Unknown schema: {schema_name}"}]
)
# Step 1: JSON Schema validation
errors = []
for error in validator.iter_errors(body):
errors.append({
"path": ".".join(str(p) for p in error.absolute_path) or "$",
"message": error.message,
"validator": error.validator,
"schema_path": list(error.schema_path)
})
if errors:
return ValidationResult(valid=False, errors=errors)
# Step 2: Unicode and content checks
warnings = []
content_warnings = self._check_unicode_content(body, schema_name)
warnings.extend(content_warnings)
# Step 3: Token estimation
estimated_tokens = self._estimate_input_tokens(body, schema_name)
model_key = body.get("model", "haiku")
model_id_map = {
"sonnet": "anthropic.claude-3-sonnet-20240229-v1:0",
"haiku": "anthropic.claude-3-haiku-20240307-v1:0"
}
model_id = model_id_map.get(model_key, model_id_map["haiku"])
cost_guard = self.COST_GUARD_TOKENS.get(model_id, 8000)
if estimated_tokens > cost_guard:
warnings.append({
"code": "TOKEN_LIMIT_WARNING",
"message": (
f"Estimated input tokens ({estimated_tokens}) exceeds "
f"cost guard limit ({cost_guard}) for {model_key}. "
f"Consider shortening the query or using a more cost-effective model."
)
})
# Step 4: Normalize — inject defaults
normalized = self._normalize_request(body, schema_name)
return ValidationResult(
valid=True,
errors=[],
warnings=warnings,
normalized_body=normalized,
estimated_tokens=estimated_tokens
)
def _check_unicode_content(self, body: dict, schema_name: str) -> list:
"""Check for Unicode-related issues in request content."""
warnings = []
# Extract text fields to check
text_fields = []
if schema_name == "manga_query":
text_fields.append(("query", body.get("query", "")))
elif schema_name == "streaming_chat":
for i, msg in enumerate(body.get("messages", [])):
text_fields.append((f"messages[{i}].content", msg.get("content", "")))
for field_name, text in text_fields:
if not text:
continue
# Check for mixed scripts that might indicate injection
has_cjk = bool(re.search(r'[\u4e00-\u9fff\u3040-\u309f\u30a0-\u30ff]', text))
has_cyrillic = bool(re.search(r'[\u0400-\u04ff]', text))
has_arabic = bool(re.search(r'[\u0600-\u06ff]', text))
unusual_scripts = sum([has_cyrillic, has_arabic])
if has_cjk and unusual_scripts > 0:
warnings.append({
"code": "MIXED_SCRIPT_WARNING",
"field": field_name,
"message": "Unusual script mixing detected. Content contains CJK with other non-Latin scripts."
})
# Check for control characters (except common whitespace)
control_chars = re.findall(r'[\x00-\x08\x0b\x0c\x0e-\x1f\x7f]', text)
if control_chars:
warnings.append({
"code": "CONTROL_CHAR_WARNING",
"field": field_name,
"message": f"Content contains {len(control_chars)} control character(s)."
})
return warnings
def _estimate_input_tokens(self, body: dict, schema_name: str) -> int:
"""Estimate the number of input tokens for cost prediction."""
total_chars = 0
if schema_name == "manga_query":
query = body.get("query", "")
total_chars = len(query)
elif schema_name == "streaming_chat":
for msg in body.get("messages", []):
total_chars += len(msg.get("content", ""))
elif schema_name == "batch_analysis":
for item in body.get("items", []):
total_chars += len(item.get("additional_context", ""))
total_chars += 200 # Approximate overhead per item prompt
# Detect primary language for token estimation
language = body.get("language", "auto")
if language == "auto":
sample = body.get("query", "") or ""
cjk_count = len(re.findall(r'[\u4e00-\u9fff\u3040-\u309f\u30a0-\u30ff]', sample))
if cjk_count > len(sample) * 0.3:
language = "ja"
else:
language = "en"
chars_per_token = self.CHARS_PER_TOKEN.get(language, self.CHARS_PER_TOKEN["mixed"])
estimated_tokens = int(total_chars / chars_per_token)
# Add system prompt overhead (~200 tokens for MangaAssist)
estimated_tokens += 200
return estimated_tokens
def _normalize_request(self, body: dict, schema_name: str) -> dict:
"""
Inject default values and normalize the request body.
Returns a new dict without modifying the original.
"""
schema = self.SCHEMAS.get(schema_name, {})
properties = schema.get("properties", {})
normalized = dict(body)
for prop_name, prop_schema in properties.items():
if prop_name not in normalized and "default" in prop_schema:
normalized[prop_name] = prop_schema["default"]
# Normalize nested item defaults for batch requests
if schema_name == "batch_analysis" and "items" in normalized:
item_schema = properties.get("items", {}).get("items", {}).get("properties", {})
for item in normalized["items"]:
for prop_name, prop_def in item_schema.items():
if prop_name not in item and "default" in prop_def:
item[prop_name] = prop_def["default"]
return normalized
2.2 API Gateway Model Definitions (CloudFormation)
# CloudFormation resource definitions for API Gateway request models
# These models enable API Gateway-level validation before Lambda is invoked
Resources:
MangaQueryModel:
Type: AWS::ApiGateway::Model
Properties:
RestApiId: !Ref MangaFMApi
ContentType: "application/json"
Name: "MangaQueryModel"
Description: "Validates manga query request structure"
Schema:
$schema: "http://json-schema.org/draft-07/schema#"
title: "MangaQueryRequest"
type: object
required:
- query
properties:
query:
type: string
minLength: 1
maxLength: 4000
model:
type: string
enum:
- sonnet
- haiku
max_tokens:
type: integer
minimum: 1
maximum: 4096
temperature:
type: number
minimum: 0.0
maximum: 1.0
conversation_id:
type: string
pattern: "^[a-f0-9-]{36}$"
language:
type: string
enum:
- en
- ja
- auto
BatchAnalysisModel:
Type: AWS::ApiGateway::Model
Properties:
RestApiId: !Ref MangaFMApi
ContentType: "application/json"
Name: "BatchAnalysisModel"
Description: "Validates batch manga analysis request"
Schema:
$schema: "http://json-schema.org/draft-07/schema#"
title: "BatchAnalysisRequest"
type: object
required:
- items
properties:
items:
type: array
minItems: 1
maxItems: 100
items:
type: object
required:
- manga_id
- analysis_type
properties:
manga_id:
type: string
pattern: "^MNG-[A-Z0-9]{8}$"
analysis_type:
type: string
enum:
- genre_classification
- content_summary
- similar_series
- reading_level
- full_enrichment
callback_url:
type: string
pattern: "^https://"
priority:
type: string
enum:
- high
- normal
- low
RequestValidator:
Type: AWS::ApiGateway::RequestValidator
Properties:
RestApiId: !Ref MangaFMApi
Name: "MangaRequestValidator"
ValidateRequestBody: true
ValidateRequestParameters: true
3. MappingTemplateEngine — VTL Template Processing
"""
Mapping template engine for API Gateway VTL (Velocity Template Language)
request/response transformation. Handles the logic for building and
managing VTL templates used in API Gateway integration mappings.
"""
import json
import re
import logging
from typing import Optional
from string import Template
logger = logging.getLogger(__name__)
class MappingTemplateEngine:
"""
Manages VTL mapping templates for API Gateway integrations.
Generates request and response mapping templates that transform
API Gateway payloads to/from Lambda and Bedrock formats.
MangaAssist uses mapping templates to:
1. Enrich incoming requests with context (request ID, timestamp, source IP)
2. Transform client-facing API shapes into internal Lambda event formats
3. Filter and reshape Lambda responses for client consumption
4. Handle error responses with appropriate status codes
Usage:
engine = MappingTemplateEngine()
# Get the request template for manga query endpoint
template = engine.get_request_template("manga_query")
# Get the response template with status code mapping
resp_template = engine.get_response_template("manga_query", status_code=200)
"""
# Request mapping templates
REQUEST_TEMPLATES = {
"manga_query": '''## MangaAssist Query - Request Mapping Template
## Transforms API Gateway request into Lambda event format
## Preserves Unicode content for Japanese manga queries
#set($inputRoot = $input.path('$'))
#set($context = $context)
{
"httpMethod": "POST",
"resource": "/v1/manga/query",
"body": $input.json('$'),
"headers": {
"Content-Type": "$input.params().header.get('Content-Type')",
"Accept-Language": "$input.params().header.get('Accept-Language')",
"X-Api-Key": "$input.params().header.get('X-Api-Key')",
"X-Request-Id": "$context.requestId"
},
"requestContext": {
"requestId": "$context.requestId",
"stage": "$context.stage",
"identity": {
"sourceIp": "$context.identity.sourceIp",
"userAgent": "$context.identity.userAgent"
},
"requestTime": "$context.requestTime",
"requestTimeEpoch": $context.requestTimeEpoch,
"apiId": "$context.apiId"
},
"stageVariables": {
#if($stageVariables)
"environment": "$stageVariables.get('environment')",
"bedrock_region": "$stageVariables.get('bedrock_region')"
#else
"environment": "prod",
"bedrock_region": "us-east-1"
#end
}
}''',
"batch_analysis": '''## MangaAssist Batch Analysis - Request Mapping Template
## Transforms batch submission into Lambda event with SQS routing hints
#set($inputRoot = $input.path('$'))
{
"httpMethod": "POST",
"resource": "/v1/manga/batch",
"body": $input.json('$'),
"headers": {
"Content-Type": "$input.params().header.get('Content-Type')",
"X-Api-Key": "$input.params().header.get('X-Api-Key')",
"X-Request-Id": "$context.requestId",
"X-Batch-Size": "$inputRoot.items.size()"
},
"requestContext": {
"requestId": "$context.requestId",
"stage": "$context.stage",
"requestTimeEpoch": $context.requestTimeEpoch
},
"routingHints": {
"itemCount": $inputRoot.items.size(),
"priority": #if($inputRoot.priority)"$inputRoot.priority"#else"normal"#end,
"hasCallback": #if($inputRoot.callback_url)true#else false#end
}
}''',
"status_check": '''## MangaAssist Status Check - Request Mapping Template
## Passes path parameter for async request status lookup
{
"httpMethod": "GET",
"resource": "/v1/manga/status/{request_id}",
"pathParameters": {
"request_id": "$input.params('request_id')"
},
"requestContext": {
"requestId": "$context.requestId",
"stage": "$context.stage"
}
}'''
}
# Response mapping templates
RESPONSE_TEMPLATES = {
"manga_query_200": '''## MangaAssist Query - 200 Response Template
## Reshapes Lambda response for client consumption
#set($inputRoot = $input.path('$'))
#set($body = $input.path('$.body'))
{
"success": true,
"data": {
"response": $body.response,
"model": "$body.model",
"usage": {
"input_tokens": $body.usage.input_tokens,
"output_tokens": $body.usage.output_tokens
}
},
"metadata": {
"request_id": "$context.requestId",
"timestamp": "$context.requestTime",
"latency_ms": #if($body.latency_ms)$body.latency_ms#else 0#end
}
}''',
"manga_query_400": '''## MangaAssist Query - 400 Error Response Template
#set($inputRoot = $input.path('$'))
{
"success": false,
"error": {
"code": "VALIDATION_ERROR",
"message": "$inputRoot.body.error",
"details": $inputRoot.body.details
},
"metadata": {
"request_id": "$context.requestId",
"timestamp": "$context.requestTime"
}
}''',
"manga_query_429": '''## MangaAssist Query - 429 Throttle Response Template
{
"success": false,
"error": {
"code": "RATE_LIMITED",
"message": "Request rate exceeded. The foundation model is currently throttled.",
"retry_after_seconds": 5
},
"metadata": {
"request_id": "$context.requestId"
}
}''',
"manga_query_500": '''## MangaAssist Query - 500 Error Response Template
{
"success": false,
"error": {
"code": "INTERNAL_ERROR",
"message": "An unexpected error occurred processing your request."
},
"metadata": {
"request_id": "$context.requestId"
}
}''',
"batch_202": '''## MangaAssist Batch - 202 Accepted Response Template
#set($inputRoot = $input.path('$'))
{
"success": true,
"data": {
"message": "Batch submitted for processing",
"item_count": $inputRoot.body.item_count,
"request_ids": $inputRoot.body.request_ids,
"estimated_completion": "Results will be available within 5-30 minutes"
},
"metadata": {
"request_id": "$context.requestId",
"timestamp": "$context.requestTime"
}
}'''
}
def __init__(self):
"""Initialize the mapping template engine."""
self._request_templates = dict(self.REQUEST_TEMPLATES)
self._response_templates = dict(self.RESPONSE_TEMPLATES)
def get_request_template(self, endpoint: str) -> Optional[str]:
"""
Get the request mapping template for an endpoint.
Args:
endpoint: Endpoint key (e.g., 'manga_query', 'batch_analysis')
Returns:
VTL template string or None if not found
"""
return self._request_templates.get(endpoint)
def get_response_template(
self, endpoint: str, status_code: int = 200
) -> Optional[str]:
"""
Get the response mapping template for an endpoint + status code.
Args:
endpoint: Endpoint key
status_code: HTTP status code
Returns:
VTL template string or None if not found
"""
key = f"{endpoint}_{status_code}"
return self._response_templates.get(key)
def register_template(
self, name: str, template: str, template_type: str = "request"
):
"""Register a custom mapping template."""
if template_type == "request":
self._request_templates[name] = template
elif template_type == "response":
self._response_templates[name] = template
else:
raise ValueError(f"Invalid template_type: {template_type}")
def validate_template_syntax(self, template: str) -> dict:
"""
Basic syntax validation of a VTL template.
Checks for common VTL errors without full parsing.
Returns:
Dict with 'valid' bool and optional 'issues' list
"""
issues = []
# Check balanced #if/#end blocks
if_count = len(re.findall(r'#if\s*\(', template))
end_count = len(re.findall(r'#end\b', template))
foreach_count = len(re.findall(r'#foreach\s*\(', template))
expected_ends = if_count + foreach_count
if end_count != expected_ends:
issues.append({
"type": "UNBALANCED_BLOCKS",
"message": (
f"Found {if_count} #if + {foreach_count} #foreach "
f"but only {end_count} #end directives"
)
})
# Check for unclosed $input references
unclosed = re.findall(r'\$input\.[^,}\s\)]+(?=[^"\'}\s])', template)
if unclosed:
issues.append({
"type": "POTENTIAL_UNCLOSED_REFERENCE",
"message": f"Potentially unclosed references: {unclosed[:5]}"
})
# Check for JSON syntax issues in static parts
# Extract static JSON portions between VTL directives
static_parts = re.sub(r'#[a-z]+\([^)]*\)|#end|\$[a-zA-Z._\[\]()]+', '""', template)
try:
json.loads(static_parts)
except json.JSONDecodeError as e:
issues.append({
"type": "JSON_SYNTAX",
"message": f"Static JSON structure may have issues: {str(e)[:100]}"
})
return {
"valid": len(issues) == 0,
"issues": issues
}
def generate_integration_config(self, endpoint: str) -> dict:
"""
Generate the complete API Gateway integration configuration
for an endpoint, including all mapping templates and status codes.
Returns:
Dict ready for CloudFormation/CDK integration configuration
"""
request_template = self.get_request_template(endpoint)
# Collect all response templates for this endpoint
response_templates = {}
status_codes = [200, 400, 429, 500]
for code in status_codes:
template = self.get_response_template(endpoint, code)
if template:
response_templates[str(code)] = template
return {
"type": "AWS_PROXY", # or "AWS" for non-proxy
"requestTemplates": {
"application/json": request_template
} if request_template else {},
"integrationResponses": [
{
"statusCode": code,
"responseTemplates": {
"application/json": response_templates.get(str(code), "")
},
"selectionPattern": self._status_pattern(code)
}
for code in status_codes
if str(code) in response_templates
]
}
def _status_pattern(self, code: int) -> str:
"""Get the regex selection pattern for a status code."""
patterns = {
200: "", # Default (no pattern = default)
400: ".*4\\d{2}.*",
429: ".*429.*",
500: ".*5\\d{2}.*"
}
return patterns.get(code, "")
4. Batch Processing Patterns for Bulk Manga Analysis
4.1 SQSFMBatchProcessor
"""
SQS-driven batch processor for bulk manga analysis using Bedrock.
Handles queue consumption, concurrent FM invocation, result collection,
and callback delivery.
"""
import json
import time
import logging
import traceback
from typing import Optional
from dataclasses import dataclass, field
from concurrent.futures import ThreadPoolExecutor, as_completed
import boto3
from botocore.config import Config
from botocore.exceptions import ClientError
logger = logging.getLogger(__name__)
@dataclass
class BatchResult:
"""Result of processing a batch of FM requests."""
total: int = 0
succeeded: int = 0
failed: int = 0
throttled: int = 0
total_input_tokens: int = 0
total_output_tokens: int = 0
total_cost_usd: float = 0.0
processing_time_seconds: float = 0.0
results: list = field(default_factory=list)
errors: list = field(default_factory=list)
class SQSFMBatchProcessor:
"""
Processes FM requests from SQS queues in configurable batches.
Designed to run on ECS Fargate or EC2 as a long-running worker.
Features:
- Concurrent FM invocations with configurable parallelism
- Automatic visibility timeout extension for long batches
- Result storage to S3 with DynamoDB status tracking
- Callback notification via SNS or webhook
- Graceful shutdown with in-flight request completion
- Cost tracking per batch
Architecture:
SQS Queue --> SQSFMBatchProcessor --> Bedrock InvokeModel
--> S3 (results)
--> DynamoDB (status)
--> SNS/Webhook (notification)
"""
PRICING = {
"anthropic.claude-3-sonnet-20240229-v1:0": (3.00, 15.00),
"anthropic.claude-3-haiku-20240307-v1:0": (0.25, 1.25),
}
def __init__(
self,
queue_url: str,
region: str = "us-east-1",
max_concurrent: int = 5,
batch_size: int = 10,
visibility_timeout: int = 120
):
self._queue_url = queue_url
self._batch_size = min(batch_size, 10) # SQS max per receive
self._visibility_timeout = visibility_timeout
self._max_concurrent = max_concurrent
self._running = True
# AWS clients
self._sqs = boto3.client("sqs", region_name=region)
bedrock_config = Config(
region_name=region,
retries={"max_attempts": 3, "mode": "adaptive"},
read_timeout=60,
max_pool_connections=max_concurrent
)
self._bedrock = boto3.client("bedrock-runtime", config=bedrock_config)
self._s3 = boto3.client("s3", region_name=region)
self._dynamodb = boto3.resource("dynamodb", region_name=region)
self._sns = boto3.client("sns", region_name=region)
self._status_table = self._dynamodb.Table("manga-fm-request-status")
self._executor = ThreadPoolExecutor(max_workers=max_concurrent)
def process_loop(self, max_iterations: int = None):
"""
Main processing loop. Continuously polls SQS and processes batches.
Args:
max_iterations: Maximum polling iterations (None for infinite)
"""
iteration = 0
logger.info(
f"Starting batch processor: queue={self._queue_url}, "
f"concurrency={self._max_concurrent}, batch_size={self._batch_size}"
)
while self._running:
if max_iterations and iteration >= max_iterations:
break
iteration += 1
try:
messages = self._receive_batch()
if not messages:
time.sleep(1) # Brief pause on empty queue
continue
batch_result = self._process_batch(messages)
logger.info(
f"Batch complete: {batch_result.succeeded}/{batch_result.total} "
f"succeeded, {batch_result.failed} failed, "
f"cost=${batch_result.total_cost_usd:.4f}"
)
except Exception as e:
logger.error(f"Batch processing error: {e}\n{traceback.format_exc()}")
time.sleep(5) # Back off on unexpected errors
def _receive_batch(self) -> list:
"""Receive a batch of messages from SQS with long polling."""
response = self._sqs.receive_message(
QueueUrl=self._queue_url,
MaxNumberOfMessages=self._batch_size,
VisibilityTimeout=self._visibility_timeout,
WaitTimeSeconds=20, # Long polling
MessageAttributeNames=["All"],
AttributeNames=["All"]
)
return response.get("Messages", [])
def _process_batch(self, messages: list) -> BatchResult:
"""Process a batch of SQS messages concurrently."""
start_time = time.time()
batch_result = BatchResult(total=len(messages))
# Submit all messages for concurrent processing
futures = {}
for msg in messages:
future = self._executor.submit(self._process_single, msg)
futures[future] = msg
# Collect results
for future in as_completed(futures, timeout=self._visibility_timeout - 10):
msg = futures[future]
try:
result = future.result()
batch_result.succeeded += 1
batch_result.total_input_tokens += result.get("input_tokens", 0)
batch_result.total_output_tokens += result.get("output_tokens", 0)
batch_result.total_cost_usd += result.get("cost_usd", 0)
batch_result.results.append(result)
# Delete message from queue on success
self._sqs.delete_message(
QueueUrl=self._queue_url,
ReceiptHandle=msg["ReceiptHandle"]
)
except ClientError as e:
error_code = e.response["Error"]["Code"]
if error_code == "ThrottlingException":
batch_result.throttled += 1
# Do NOT delete — let visibility timeout expire for retry
logger.warning(f"Throttled: {msg['MessageId']}")
else:
batch_result.failed += 1
batch_result.errors.append({
"message_id": msg["MessageId"],
"error": str(e)
})
except Exception as e:
batch_result.failed += 1
batch_result.errors.append({
"message_id": msg["MessageId"],
"error": str(e)
})
batch_result.processing_time_seconds = time.time() - start_time
return batch_result
def _process_single(self, message: dict) -> dict:
"""Process a single SQS message through Bedrock."""
body = json.loads(message["Body"])
request_id = body.get("request_id", message["MessageId"])
# Update status to processing
self._update_status(request_id, "processing")
prompt = body["prompt"]
model_id = body.get("model_id", "anthropic.claude-3-haiku-20240307-v1:0")
max_tokens = body.get("max_tokens", 1024)
# Build Bedrock request
request_body = json.dumps({
"anthropic_version": "bedrock-2023-05-31",
"max_tokens": max_tokens,
"messages": [{"role": "user", "content": prompt}],
"system": (
"You are a manga analysis system. Provide structured, "
"accurate analysis of manga content. Respond in JSON format "
"when analysis_type requires structured output."
),
"temperature": 0.2
})
response = self._bedrock.invoke_model(
modelId=model_id,
contentType="application/json",
accept="application/json",
body=request_body
)
response_body = json.loads(response["body"].read())
text = response_body["content"][0]["text"]
input_tokens = response_body["usage"]["input_tokens"]
output_tokens = response_body["usage"]["output_tokens"]
# Calculate cost
pricing = self.PRICING.get(model_id, (0, 0))
cost_usd = (
(input_tokens / 1_000_000) * pricing[0] +
(output_tokens / 1_000_000) * pricing[1]
)
# Store result in S3
result_key = f"fm-results/{request_id}.json"
result_data = {
"request_id": request_id,
"response": text,
"model_id": model_id,
"input_tokens": input_tokens,
"output_tokens": output_tokens,
"cost_usd": cost_usd,
"metadata": body.get("metadata", {}),
"processed_at": time.time()
}
self._s3.put_object(
Bucket="manga-fm-results",
Key=result_key,
Body=json.dumps(result_data),
ContentType="application/json"
)
# Update status to completed
self._update_status(request_id, "completed", result_key=result_key)
# Send callback notification if requested
callback_url = body.get("callback_url")
if callback_url:
self._send_callback(callback_url, result_data)
return result_data
def _update_status(
self, request_id: str, status: str, result_key: str = None
):
"""Update request status in DynamoDB."""
try:
update_expr = "SET #s = :status, updated_at = :ts"
expr_values = {
":status": status,
":ts": int(time.time())
}
expr_names = {"#s": "status"}
if result_key:
update_expr += ", result_key = :rk"
expr_values[":rk"] = result_key
self._status_table.update_item(
Key={"request_id": request_id},
UpdateExpression=update_expr,
ExpressionAttributeNames=expr_names,
ExpressionAttributeValues=expr_values
)
except Exception as e:
logger.warning(f"Status update failed for {request_id}: {e}")
def _send_callback(self, url: str, result: dict):
"""Send completion callback to the requester."""
import urllib.request
try:
req = urllib.request.Request(
url,
data=json.dumps(result).encode("utf-8"),
headers={"Content-Type": "application/json"},
method="POST"
)
urllib.request.urlopen(req, timeout=10)
except Exception as e:
logger.warning(f"Callback failed to {url}: {e}")
def shutdown(self):
"""Gracefully stop the processor."""
logger.info("Shutting down batch processor...")
self._running = False
self._executor.shutdown(wait=True, cancel_futures=False)
logger.info("Batch processor stopped")
4.2 Step Functions Orchestration for Complex Batch Workflows
{
"Comment": "MangaAssist Batch Analysis Workflow - Orchestrates multi-step manga analysis via Bedrock",
"StartAt": "ValidateBatchInput",
"States": {
"ValidateBatchInput": {
"Type": "Task",
"Resource": "arn:aws:lambda:us-east-1:123456789012:function:manga-validate-batch",
"ResultPath": "$.validation",
"Next": "CheckValidation",
"Retry": [
{
"ErrorEquals": ["Lambda.ServiceException"],
"IntervalSeconds": 2,
"MaxAttempts": 3,
"BackoffRate": 2.0
}
]
},
"CheckValidation": {
"Type": "Choice",
"Choices": [
{
"Variable": "$.validation.valid",
"BooleanEquals": true,
"Next": "SplitIntoBatches"
}
],
"Default": "ValidationFailed"
},
"SplitIntoBatches": {
"Type": "Task",
"Resource": "arn:aws:lambda:us-east-1:123456789012:function:manga-split-batch",
"ResultPath": "$.batches",
"Next": "ProcessBatchesMap"
},
"ProcessBatchesMap": {
"Type": "Map",
"ItemsPath": "$.batches.chunks",
"MaxConcurrency": 5,
"Iterator": {
"StartAt": "InvokeBedrock",
"States": {
"InvokeBedrock": {
"Type": "Task",
"Resource": "arn:aws:states:::bedrock:invokeModel",
"Parameters": {
"ModelId.$": "$.model_id",
"ContentType": "application/json",
"Accept": "application/json",
"Body": {
"anthropic_version": "bedrock-2023-05-31",
"max_tokens.$": "$.max_tokens",
"messages": [
{
"role": "user",
"content.$": "$.prompt"
}
],
"temperature": 0.2
}
},
"ResultPath": "$.fm_response",
"Next": "StoreResult",
"Retry": [
{
"ErrorEquals": ["Bedrock.ThrottlingException"],
"IntervalSeconds": 5,
"MaxAttempts": 5,
"BackoffRate": 2.0
},
{
"ErrorEquals": ["Bedrock.ModelTimeoutException"],
"IntervalSeconds": 10,
"MaxAttempts": 3,
"BackoffRate": 1.5
}
],
"Catch": [
{
"ErrorEquals": ["States.ALL"],
"Next": "RecordFailure",
"ResultPath": "$.error"
}
]
},
"StoreResult": {
"Type": "Task",
"Resource": "arn:aws:lambda:us-east-1:123456789012:function:manga-store-result",
"End": true
},
"RecordFailure": {
"Type": "Task",
"Resource": "arn:aws:lambda:us-east-1:123456789012:function:manga-record-failure",
"End": true
}
}
},
"ResultPath": "$.batch_results",
"Next": "AggregateResults"
},
"AggregateResults": {
"Type": "Task",
"Resource": "arn:aws:lambda:us-east-1:123456789012:function:manga-aggregate-results",
"ResultPath": "$.summary",
"Next": "NotifyCompletion"
},
"NotifyCompletion": {
"Type": "Task",
"Resource": "arn:aws:states:::sns:publish",
"Parameters": {
"TopicArn": "arn:aws:sns:us-east-1:123456789012:manga-fm-results",
"Subject": "MangaAssist Batch Analysis Complete",
"Message.$": "States.JsonToString($.summary)"
},
"End": true
},
"ValidationFailed": {
"Type": "Task",
"Resource": "arn:aws:states:::sns:publish",
"Parameters": {
"TopicArn": "arn:aws:sns:us-east-1:123456789012:manga-fm-alerts",
"Subject": "MangaAssist Batch Validation Failed",
"Message.$": "States.JsonToString($.validation.errors)"
},
"End": true
}
}
}
5. Visibility Timeout Extension Pattern
"""
Heartbeat-based visibility timeout extension for long-running FM requests.
Prevents SQS from re-delivering messages while processing is still active.
"""
import time
import threading
import logging
import boto3
logger = logging.getLogger(__name__)
class VisibilityExtender:
"""
Automatically extends SQS message visibility timeout while processing.
Runs a background thread that periodically extends visibility before
the current timeout expires.
This is critical for Bedrock requests that may take longer than the
initial visibility timeout (e.g., Sonnet with large prompts).
Usage:
with VisibilityExtender(sqs_client, queue_url, receipt_handle, 120) as ext:
# Process message — visibility auto-extends every 90 seconds
result = invoke_bedrock(prompt)
# On context exit, extension stops
"""
def __init__(
self,
sqs_client,
queue_url: str,
receipt_handle: str,
visibility_timeout: int = 120,
extension_margin_seconds: int = 30
):
self._sqs = sqs_client
self._queue_url = queue_url
self._receipt_handle = receipt_handle
self._visibility_timeout = visibility_timeout
self._margin = extension_margin_seconds
self._stop_event = threading.Event()
self._thread = None
def __enter__(self):
"""Start the visibility extension thread."""
self._thread = threading.Thread(
target=self._extend_loop,
daemon=True,
name="visibility-extender"
)
self._thread.start()
return self
def __exit__(self, exc_type, exc_val, exc_tb):
"""Stop the visibility extension thread."""
self._stop_event.set()
if self._thread:
self._thread.join(timeout=5)
return False
def _extend_loop(self):
"""Periodically extend visibility timeout until stopped."""
interval = self._visibility_timeout - self._margin
if interval <= 0:
interval = self._visibility_timeout // 2
while not self._stop_event.wait(timeout=interval):
try:
self._sqs.change_message_visibility(
QueueUrl=self._queue_url,
ReceiptHandle=self._receipt_handle,
VisibilityTimeout=self._visibility_timeout
)
logger.debug(
f"Extended visibility by {self._visibility_timeout}s "
f"for {self._receipt_handle[:20]}..."
)
except Exception as e:
logger.error(f"Visibility extension failed: {e}")
break # Stop extending — message may be re-delivered
6. Request/Response Transformation Pipeline
flowchart LR
subgraph Input["Client Request"]
RAW["Raw HTTP Request<br/>(JSON body + headers)"]
end
subgraph Validation["Validation Layer"]
SCHEMA["JSON Schema<br/>Validation"]
UNICODE["Unicode<br/>Sanitization"]
TOKEN["Token<br/>Estimation"]
COST["Cost Guard<br/>Check"]
end
subgraph Transform["Transformation Layer"]
VTL_REQ["VTL Request<br/>Mapping"]
ENRICH["Context<br/>Enrichment"]
ROUTE["Model<br/>Routing"]
end
subgraph FM["Foundation Model"]
BEDROCK["Bedrock<br/>InvokeModel"]
end
subgraph Output["Response"]
VTL_RESP["VTL Response<br/>Mapping"]
FILTER["Response<br/>Filtering"]
CLIENT["Client<br/>Response"]
end
RAW --> SCHEMA
SCHEMA -->|"valid"| UNICODE
SCHEMA -->|"invalid"| CLIENT
UNICODE --> TOKEN
TOKEN --> COST
COST -->|"within budget"| VTL_REQ
COST -->|"over budget"| CLIENT
VTL_REQ --> ENRICH
ENRICH --> ROUTE
ROUTE --> BEDROCK
BEDROCK --> VTL_RESP
VTL_RESP --> FILTER
FILTER --> CLIENT
style Validation fill:#fff3e0,stroke:#f57c00
style Transform fill:#e8f5e9,stroke:#388e3c
style FM fill:#e3f2fd,stroke:#1565c0
7. Queue Monitoring and Auto-Scaling
"""
CloudWatch-based monitoring and auto-scaling configuration for
SQS-backed FM processing queues.
"""
import json
import boto3
class QueueMonitorConfig:
"""
Generates CloudWatch alarms and auto-scaling policies
for MangaAssist FM request queues.
"""
@staticmethod
def generate_cloudwatch_alarms() -> list:
"""Generate CloudWatch alarm definitions for queue monitoring."""
return [
{
"AlarmName": "MangaFM-HighPriority-QueueDepth",
"MetricName": "ApproximateNumberOfMessagesVisible",
"Namespace": "AWS/SQS",
"Dimensions": [
{"Name": "QueueName", "Value": "manga-fm-high-priority"}
],
"Statistic": "Average",
"Period": 60,
"EvaluationPeriods": 2,
"Threshold": 100,
"ComparisonOperator": "GreaterThanThreshold",
"AlarmActions": [
"arn:aws:sns:us-east-1:123456789012:manga-fm-alerts"
],
"AlarmDescription": (
"High priority FM queue depth exceeds 100. "
"Indicates possible Bedrock throttling or consumer failure."
)
},
{
"AlarmName": "MangaFM-Normal-AgeOfOldestMessage",
"MetricName": "ApproximateAgeOfOldestMessage",
"Namespace": "AWS/SQS",
"Dimensions": [
{"Name": "QueueName", "Value": "manga-fm-normal"}
],
"Statistic": "Maximum",
"Period": 300,
"EvaluationPeriods": 2,
"Threshold": 600, # 10 minutes
"ComparisonOperator": "GreaterThanThreshold",
"AlarmActions": [
"arn:aws:sns:us-east-1:123456789012:manga-fm-alerts"
],
"AlarmDescription": (
"Oldest message in normal queue exceeds 10 minutes. "
"Processing is falling behind."
)
},
{
"AlarmName": "MangaFM-DLQ-NotEmpty",
"MetricName": "ApproximateNumberOfMessagesVisible",
"Namespace": "AWS/SQS",
"Dimensions": [
{"Name": "QueueName", "Value": "manga-fm-dlq"}
],
"Statistic": "Sum",
"Period": 300,
"EvaluationPeriods": 1,
"Threshold": 0,
"ComparisonOperator": "GreaterThanThreshold",
"AlarmActions": [
"arn:aws:sns:us-east-1:123456789012:manga-fm-alerts"
],
"AlarmDescription": (
"Dead letter queue has messages. "
"Investigate failed FM requests immediately."
)
}
]
@staticmethod
def generate_autoscaling_policy() -> dict:
"""
Generate ECS auto-scaling policy based on SQS queue depth.
Scales FM processor tasks up/down based on backlog.
"""
return {
"TargetTrackingScaling": {
"ServiceNamespace": "ecs",
"ScalableDimension": "ecs:service:DesiredCount",
"ResourceId": "service/manga-cluster/manga-fm-processor",
"MinCapacity": 1,
"MaxCapacity": 20,
"TargetTrackingPolicies": [
{
"PolicyName": "MangaFM-QueueDepthScaling",
"TargetValue": 10.0,
"CustomizedMetricSpecification": {
"MetricName": "BacklogPerTask",
"Namespace": "MangaAssist/SQS",
"Statistic": "Average",
"Unit": "Count"
},
"ScaleInCooldown": 300,
"ScaleOutCooldown": 60
}
]
},
"BacklogPerTaskMetric": {
"Description": (
"Custom metric: total queue messages / running task count. "
"Published by a Lambda function every 60 seconds."
),
"Formula": "ApproximateNumberOfMessagesVisible / RunningTaskCount",
"TargetValue": 10,
"Rationale": (
"Each ECS task processes ~10 messages per minute with Haiku. "
"Target of 10 backlog per task means ~1 minute of work queued."
)
}
}
8. Quick Reference — Queue Configuration Matrix
| Queue | Type | Visibility Timeout | Retention | Max Receives (DLQ) | Consumer | Use Case |
|---|---|---|---|---|---|---|
manga-fm-high-priority |
Standard | 60s | 4 days | 3 | Lambda (concurrency 50) | Partner API overflow, urgent requests |
manga-fm-normal |
Standard | 120s | 7 days | 3 | Lambda / ECS | General async analysis |
manga-fm-low-priority |
Standard | 300s | 14 days | 3 | ECS / EC2 | Batch enrichment, nightly jobs |
manga-fm-ordered.fifo |
FIFO | 60s | 7 days | 3 | Lambda | Session-ordered multi-turn |
manga-fm-session.fifo |
FIFO | 60s | 4 days | 3 | Lambda | Per-session context preservation |
manga-fm-dlq |
Standard | 30s | 14 days | N/A | Manual / Redrive | Failed request investigation |
manga-fm-dlq.fifo |
FIFO | 30s | 14 days | N/A | Manual / Redrive | Failed ordered request investigation |
End of File 2 — Asynchronous Processing and Request Validation Patterns