Deployment Pattern Selection
MangaAssist context: JP Manga store chatbot on AWS — Bedrock Claude 3 (Sonnet at $3/$15 per 1M tokens input/output, Haiku at $0.25/$1.25), OpenSearch Serverless (vector store), DynamoDB (sessions/products), ECS Fargate (orchestrator), API Gateway WebSocket, ElastiCache Redis. Target: useful answer in under 3 seconds, 1M messages/day scale.
Skill Mapping
| Dimension | Value |
|---|---|
| Certification | AWS Certified AI Practitioner (AIF-C01) |
| Task | 2.2 — Select and implement model deployment strategies |
| Skill | 2.2.1 — Deploy FMs using Lambda on-demand, Bedrock provisioned throughput, SageMaker hybrid |
| This File | Choosing patterns based on latency/throughput/cost, SageMaker auto-scaling |
Skill Scope
This file covers the practical selection process for FM deployment patterns. We examine how to evaluate latency requirements, throughput demands, and cost constraints to choose the right deployment pattern. Deep focus on SageMaker auto-scaling configuration, Bedrock throughput planning, and Lambda concurrency tuning. Includes production-grade scoring algorithms and real-time pattern switching for MangaAssist.
Mind Map
mindmap
root((Deployment Pattern Selection))
Latency Analysis
P50 vs P99 targets
Cold start impact
Network hops
Model inference time
Response streaming
End-to-end budget
Throughput Planning
Requests per second
Tokens per minute
Concurrent sessions
Burst capacity
Sustained load
Queue depth
Cost Optimization
Per-token pricing
Instance reservations
Spot instances
Right-sizing
Time-of-day scheduling
Budget guardrails
SageMaker Auto-Scaling
Target tracking
Step scaling
Scheduled scaling
Scale-in cooldown
Scale-out speed
Custom metrics
Pattern Switching
Real-time routing
Health-based failover
Cost-based switching
Latency-based switching
A/B deployment
Canary releases
Monitoring
CloudWatch metrics
Custom dashboards
Alerting thresholds
Cost anomaly detection
Latency percentiles
Error rate tracking
1. Latency-Based Pattern Selection
1.1 Latency Budget Decomposition
Every MangaAssist request must complete within 3 seconds end-to-end. Understanding where time is spent determines which deployment pattern can meet the budget.
graph LR
subgraph "3-Second Latency Budget"
A[WebSocket<br/>Receive<br/>50ms] --> B[Auth +<br/>Session<br/>100ms]
B --> C[RAG<br/>Retrieval<br/>200ms]
C --> D[FM<br/>Inference<br/>1500ms MAX]
D --> E[Post-<br/>Processing<br/>100ms]
E --> F[WebSocket<br/>Send<br/>50ms]
end
style D fill:#ffcdd2
1.2 Latency Analyzer
"""
MangaAssist — Latency-based deployment pattern selector.
Analyzes historical latency data to recommend the optimal pattern.
"""
import time
import math
import logging
import statistics
from dataclasses import dataclass, field
from enum import Enum
from typing import Optional
from collections import deque
logger = logging.getLogger(__name__)
class DeploymentPattern(Enum):
"""Available deployment patterns."""
LAMBDA_ON_DEMAND = "lambda_on_demand"
BEDROCK_PROVISIONED = "bedrock_provisioned"
SAGEMAKER_REALTIME = "sagemaker_realtime"
SAGEMAKER_SERVERLESS = "sagemaker_serverless"
@dataclass
class LatencyProfile:
"""Latency profile for a deployment pattern."""
pattern: DeploymentPattern
p50_ms: float
p90_ms: float
p99_ms: float
cold_start_ms: float
cold_start_probability: float # 0.0 - 1.0
warmup_invocations: int # requests needed to warm up
@property
def effective_p99_ms(self) -> float:
"""P99 latency accounting for cold starts."""
if self.cold_start_probability >= 0.01: # >1% cold starts
return max(self.p99_ms, self.cold_start_ms)
return self.p99_ms
# Reference latency profiles (measured for MangaAssist)
LATENCY_PROFILES = {
DeploymentPattern.LAMBDA_ON_DEMAND: LatencyProfile(
pattern=DeploymentPattern.LAMBDA_ON_DEMAND,
p50_ms=1200,
p90_ms=2000,
p99_ms=3500,
cold_start_ms=4500,
cold_start_probability=0.05,
warmup_invocations=0,
),
DeploymentPattern.BEDROCK_PROVISIONED: LatencyProfile(
pattern=DeploymentPattern.BEDROCK_PROVISIONED,
p50_ms=600,
p90_ms=900,
p99_ms=1400,
cold_start_ms=0,
cold_start_probability=0.0,
warmup_invocations=0,
),
DeploymentPattern.SAGEMAKER_REALTIME: LatencyProfile(
pattern=DeploymentPattern.SAGEMAKER_REALTIME,
p50_ms=500,
p90_ms=800,
p99_ms=1200,
cold_start_ms=0,
cold_start_probability=0.0,
warmup_invocations=0,
),
DeploymentPattern.SAGEMAKER_SERVERLESS: LatencyProfile(
pattern=DeploymentPattern.SAGEMAKER_SERVERLESS,
p50_ms=700,
p90_ms=1500,
p99_ms=5000,
cold_start_ms=8000,
cold_start_probability=0.10,
warmup_invocations=0,
),
}
class LatencyAnalyzer:
"""
Analyzes latency requirements and recommends deployment patterns.
Tracks real-time latency measurements for dynamic re-evaluation.
"""
def __init__(self, target_p99_ms: float = 2500.0, window_size: int = 1000):
self.target_p99_ms = target_p99_ms
self._measurements: dict[DeploymentPattern, deque] = {
pattern: deque(maxlen=window_size)
for pattern in DeploymentPattern
}
def record_latency(self, pattern: DeploymentPattern, latency_ms: float) -> None:
"""Record an observed latency measurement."""
self._measurements[pattern].append(latency_ms)
def get_observed_percentile(
self, pattern: DeploymentPattern, percentile: float
) -> Optional[float]:
"""Get observed latency at a given percentile (0-100)."""
measurements = list(self._measurements[pattern])
if len(measurements) < 10:
return None
measurements.sort()
index = int(len(measurements) * percentile / 100)
return measurements[min(index, len(measurements) - 1)]
def recommend_pattern(
self,
max_p99_ms: float,
min_throughput_rps: float,
max_monthly_cost_usd: float,
needs_custom_model: bool = False,
) -> list[dict]:
"""
Recommend deployment patterns ranked by fitness.
Returns scored recommendations with reasoning.
"""
candidates = []
for pattern, profile in LATENCY_PROFILES.items():
# Hard filters
if needs_custom_model and pattern != DeploymentPattern.SAGEMAKER_REALTIME:
continue
if profile.effective_p99_ms > max_p99_ms * 1.5:
continue # Allow some headroom but skip way-too-slow options
# Score components (0-100 each)
latency_score = self._score_latency(profile, max_p99_ms)
throughput_score = self._score_throughput(pattern, min_throughput_rps)
cost_score = self._score_cost(pattern, min_throughput_rps, max_monthly_cost_usd)
ops_score = self._score_operational_complexity(pattern)
# Weighted total
total_score = (
latency_score * 0.35
+ throughput_score * 0.25
+ cost_score * 0.25
+ ops_score * 0.15
)
candidates.append({
"pattern": pattern.value,
"total_score": round(total_score, 1),
"latency_score": round(latency_score, 1),
"throughput_score": round(throughput_score, 1),
"cost_score": round(cost_score, 1),
"ops_score": round(ops_score, 1),
"p99_ms": profile.effective_p99_ms,
"meets_latency": profile.effective_p99_ms <= max_p99_ms,
})
# Sort by total score descending
candidates.sort(key=lambda x: x["total_score"], reverse=True)
return candidates
def _score_latency(self, profile: LatencyProfile, target_p99: float) -> float:
"""Score latency fitness (0-100). Higher = better latency."""
effective = profile.effective_p99_ms
if effective <= target_p99 * 0.5:
return 100.0
if effective <= target_p99:
return 100.0 * (1.0 - (effective - target_p99 * 0.5) / (target_p99 * 0.5))
# Exceeds target: penalize heavily
overshoot = (effective - target_p99) / target_p99
return max(0.0, 50.0 * (1.0 - overshoot))
def _score_throughput(
self, pattern: DeploymentPattern, target_rps: float
) -> float:
"""Score throughput capability (0-100)."""
max_rps = {
DeploymentPattern.LAMBDA_ON_DEMAND: 1000,
DeploymentPattern.BEDROCK_PROVISIONED: 500,
DeploymentPattern.SAGEMAKER_REALTIME: 200,
DeploymentPattern.SAGEMAKER_SERVERLESS: 50,
}
available = max_rps.get(pattern, 100)
if available >= target_rps * 2:
return 100.0
if available >= target_rps:
return 70.0 + 30.0 * (available - target_rps) / target_rps
return max(0.0, 70.0 * available / target_rps)
def _score_cost(
self,
pattern: DeploymentPattern,
rps: float,
max_monthly: float,
) -> float:
"""Score cost efficiency (0-100). Higher = cheaper."""
# Estimated monthly cost at given RPS
monthly_estimates = {
DeploymentPattern.LAMBDA_ON_DEMAND: rps * 86400 * 30 * 0.0005,
DeploymentPattern.BEDROCK_PROVISIONED: 10800 + rps * 86400 * 30 * 0.0002,
DeploymentPattern.SAGEMAKER_REALTIME: 1400 * 30 + rps * 86400 * 30 * 0.0001,
DeploymentPattern.SAGEMAKER_SERVERLESS: rps * 86400 * 30 * 0.0008,
}
estimated = monthly_estimates.get(pattern, max_monthly)
if estimated <= max_monthly * 0.3:
return 100.0
if estimated <= max_monthly:
ratio = (estimated - max_monthly * 0.3) / (max_monthly * 0.7)
return 100.0 - 60.0 * ratio
# Over budget
overshoot = (estimated - max_monthly) / max_monthly
return max(0.0, 40.0 * (1.0 - overshoot))
def _score_operational_complexity(self, pattern: DeploymentPattern) -> float:
"""Score operational simplicity (0-100). Higher = simpler."""
scores = {
DeploymentPattern.LAMBDA_ON_DEMAND: 95,
DeploymentPattern.BEDROCK_PROVISIONED: 85,
DeploymentPattern.SAGEMAKER_REALTIME: 40,
DeploymentPattern.SAGEMAKER_SERVERLESS: 70,
}
return float(scores.get(pattern, 50))
2. Throughput-Based Selection
2.1 Throughput Requirements Analysis
graph TD
A[MangaAssist<br/>1M msgs/day] --> B[Calculate RPS]
B --> C["Average: 11.6 RPS<br/>(1M / 86400)"]
C --> D[Peak Multiplier<br/>3-5x average]
D --> E["Peak: 35-58 RPS"]
E --> F{Sustained or<br/>Bursty?}
F -- "Sustained<br/>(18:00-24:00)" --> G[Provisioned Throughput<br/>Reserved capacity]
F -- "Bursty<br/>(flash sales)" --> H[Lambda On-Demand<br/>Elastic scaling]
G --> I[Size Model Units]
H --> J[Set Concurrency Limits]
style A fill:#e3f2fd
style E fill:#fff9c4
style G fill:#c8e6c9
style H fill:#fff3e0
2.2 Throughput Calculator
"""
MangaAssist — Throughput calculator for deployment pattern sizing.
Determines required capacity based on traffic patterns.
"""
import math
from dataclasses import dataclass
from typing import Optional
@dataclass
class TrafficProfile:
"""Traffic profile for capacity planning."""
daily_messages: int
peak_multiplier: float # Peak RPS / Average RPS
peak_hours: int # Hours of peak traffic per day
avg_input_tokens: int
avg_output_tokens: int
burst_factor: float # Max instantaneous / Peak RPS
@dataclass
class CapacityRequirement:
"""Computed capacity requirement."""
avg_rps: float
peak_rps: float
burst_rps: float
peak_tokens_per_minute: int
sustained_tokens_per_hour: int
recommended_pattern: str
sizing_details: dict
class ThroughputCalculator:
"""
Calculates throughput requirements and recommends sizing
for each deployment pattern.
"""
# Bedrock model unit capacity estimates (tokens/min per model unit)
BEDROCK_MU_CAPACITY = {
"claude-3-sonnet": 40000, # tokens/min per model unit
"claude-3-haiku": 80000,
}
# SageMaker instance throughput estimates (requests/sec)
SAGEMAKER_INSTANCE_RPS = {
"ml.g5.xlarge": 5,
"ml.g5.2xlarge": 8,
"ml.g5.12xlarge": 25,
"ml.p4d.24xlarge": 50,
}
def calculate(self, profile: TrafficProfile) -> CapacityRequirement:
"""Calculate capacity requirements from a traffic profile."""
avg_rps = profile.daily_messages / 86400
peak_rps = avg_rps * profile.peak_multiplier
burst_rps = peak_rps * profile.burst_factor
# Token calculations
avg_tokens_per_request = (
profile.avg_input_tokens + profile.avg_output_tokens
)
peak_tokens_per_minute = int(peak_rps * 60 * avg_tokens_per_request)
sustained_tokens_per_hour = int(
peak_rps * 3600 * avg_tokens_per_request
)
# Determine recommended pattern
pattern, sizing = self._recommend(
peak_rps, burst_rps, peak_tokens_per_minute, profile
)
return CapacityRequirement(
avg_rps=round(avg_rps, 2),
peak_rps=round(peak_rps, 2),
burst_rps=round(burst_rps, 2),
peak_tokens_per_minute=peak_tokens_per_minute,
sustained_tokens_per_hour=sustained_tokens_per_hour,
recommended_pattern=pattern,
sizing_details=sizing,
)
def _recommend(
self,
peak_rps: float,
burst_rps: float,
peak_tpm: int,
profile: TrafficProfile,
) -> tuple[str, dict]:
"""Recommend pattern and sizing based on throughput needs."""
# Check if Lambda can handle it
lambda_max_concurrency = 1000
lambda_avg_duration_s = 2.0
lambda_capacity_rps = lambda_max_concurrency / lambda_avg_duration_s
if burst_rps < lambda_capacity_rps * 0.7:
lambda_concurrency = math.ceil(burst_rps * lambda_avg_duration_s * 1.2)
return "lambda_on_demand", {
"reserved_concurrency": min(lambda_concurrency, 1000),
"provisioned_concurrency": math.ceil(peak_rps * lambda_avg_duration_s),
"memory_mb": 1024,
"timeout_s": 30,
}
# Bedrock provisioned sizing
model_units_sonnet = math.ceil(
peak_tpm / self.BEDROCK_MU_CAPACITY["claude-3-sonnet"]
)
model_units_haiku = math.ceil(
peak_tpm / self.BEDROCK_MU_CAPACITY["claude-3-haiku"]
)
if model_units_sonnet <= 10:
return "bedrock_provisioned", {
"sonnet_model_units": model_units_sonnet,
"haiku_model_units": model_units_haiku,
"commitment": "OneMonth" if peak_rps > 20 else "NoCommitment",
"estimated_hourly_cost": model_units_sonnet * 23 + model_units_haiku * 5,
}
# SageMaker for very high throughput
best_instance = "ml.g5.2xlarge"
instance_rps = self.SAGEMAKER_INSTANCE_RPS[best_instance]
instance_count = math.ceil(burst_rps / instance_rps)
return "sagemaker_realtime", {
"instance_type": best_instance,
"min_instances": math.ceil(peak_rps / instance_rps),
"max_instances": instance_count + 2,
"auto_scaling_target": instance_rps,
}
# MangaAssist throughput calculation
manga_profile = TrafficProfile(
daily_messages=1_000_000,
peak_multiplier=3.5,
peak_hours=6,
avg_input_tokens=500,
avg_output_tokens=300,
burst_factor=2.0,
)
calculator = ThroughputCalculator()
requirement = calculator.calculate(manga_profile)
# Result: peak ~40 RPS, burst ~80 RPS, ~1.9M tokens/min peak
# Recommendation: bedrock_provisioned with 2 Sonnet MUs + 1 Haiku MU
3. Cost-Based Selection
3.1 Total Cost of Ownership Model
"""
MangaAssist — Total Cost of Ownership (TCO) calculator.
Compares deployment patterns across all cost dimensions.
"""
from dataclasses import dataclass
from typing import Optional
@dataclass
class TCOInputs:
"""Inputs for TCO calculation."""
daily_messages: int
avg_input_tokens: int
avg_output_tokens: int
peak_hours_per_day: int
off_peak_ratio: float # off-peak traffic as fraction of peak
engineer_hourly_rate: float # USD
ops_hours_per_month: float # hours spent on operations
@dataclass
class TCOResult:
"""TCO calculation result."""
pattern: str
compute_cost: float
token_cost: float
ops_cost: float
total_monthly: float
cost_per_message: float
breakdown: dict
class TCOCalculator:
"""
Calculates total cost of ownership for each deployment pattern,
including compute, API, and operational costs.
"""
def calculate_all(self, inputs: TCOInputs) -> list[TCOResult]:
"""Calculate TCO for all deployment patterns."""
results = [
self._calc_lambda_ondemand(inputs),
self._calc_bedrock_provisioned(inputs),
self._calc_sagemaker(inputs),
self._calc_hybrid(inputs),
]
results.sort(key=lambda r: r.total_monthly)
return results
def _calc_lambda_ondemand(self, inputs: TCOInputs) -> TCOResult:
"""Lambda + Bedrock on-demand TCO."""
monthly_messages = inputs.daily_messages * 30
# Assume 70% Haiku, 30% Sonnet split
haiku_msgs = monthly_messages * 0.7
sonnet_msgs = monthly_messages * 0.3
haiku_token_cost = (
haiku_msgs * inputs.avg_input_tokens / 1_000_000 * 0.25
+ haiku_msgs * inputs.avg_output_tokens / 1_000_000 * 1.25
)
sonnet_token_cost = (
sonnet_msgs * inputs.avg_input_tokens / 1_000_000 * 3.0
+ sonnet_msgs * inputs.avg_output_tokens / 1_000_000 * 15.0
)
token_cost = haiku_token_cost + sonnet_token_cost
# Lambda compute: ~$0.0000167/GB-s * 1GB * 2s per invocation
lambda_cost = monthly_messages * 0.0000167 * 1.0 * 2.0
# Lambda requests: $0.20 per 1M
lambda_request_cost = monthly_messages / 1_000_000 * 0.20
compute_cost = lambda_cost + lambda_request_cost
# Low ops: ~2 hrs/month
ops_cost = 2 * inputs.engineer_hourly_rate
total = compute_cost + token_cost + ops_cost
return TCOResult(
pattern="lambda_on_demand",
compute_cost=round(compute_cost, 2),
token_cost=round(token_cost, 2),
ops_cost=round(ops_cost, 2),
total_monthly=round(total, 2),
cost_per_message=round(total / monthly_messages, 6),
breakdown={
"haiku_token_cost": round(haiku_token_cost, 2),
"sonnet_token_cost": round(sonnet_token_cost, 2),
"lambda_compute": round(lambda_cost, 2),
"lambda_requests": round(lambda_request_cost, 2),
},
)
def _calc_bedrock_provisioned(self, inputs: TCOInputs) -> TCOResult:
"""Bedrock provisioned throughput TCO."""
# 2 Sonnet MUs during peak (6 hrs), 1 Haiku MU during business (12 hrs)
sonnet_hourly = 23.0 * 2 # 2 model units
haiku_hourly = 5.0 * 1
off_peak_haiku_hourly = 5.0 * 1
daily_compute = (
sonnet_hourly * inputs.peak_hours_per_day
+ haiku_hourly * (12 - inputs.peak_hours_per_day)
)
# Off-peak: on-demand (no provisioned)
off_peak_hours = 24 - 12
daily_off_peak_msgs = (
inputs.daily_messages * inputs.off_peak_ratio * off_peak_hours / 24
)
off_peak_token_cost_daily = (
daily_off_peak_msgs * inputs.avg_input_tokens / 1_000_000 * 0.25
+ daily_off_peak_msgs * inputs.avg_output_tokens / 1_000_000 * 1.25
)
compute_cost = daily_compute * 30
token_cost = off_peak_token_cost_daily * 30
ops_cost = 4 * inputs.engineer_hourly_rate # 4 hrs/month
total = compute_cost + token_cost + ops_cost
return TCOResult(
pattern="bedrock_provisioned",
compute_cost=round(compute_cost, 2),
token_cost=round(token_cost, 2),
ops_cost=round(ops_cost, 2),
total_monthly=round(total, 2),
cost_per_message=round(total / (inputs.daily_messages * 30), 6),
breakdown={
"sonnet_provisioned_daily": round(sonnet_hourly * inputs.peak_hours_per_day, 2),
"haiku_provisioned_daily": round(haiku_hourly * (12 - inputs.peak_hours_per_day), 2),
"off_peak_on_demand_daily": round(off_peak_token_cost_daily, 2),
},
)
def _calc_sagemaker(self, inputs: TCOInputs) -> TCOResult:
"""SageMaker real-time endpoint TCO."""
# 2x ml.g5.2xlarge 24/7 + auto-scaling to 4 during peak
base_instance_cost_hr = 1.89
base_count = 2
peak_count = 4
daily_base_hours = (24 - inputs.peak_hours_per_day) * base_count
daily_peak_hours = inputs.peak_hours_per_day * peak_count
daily_instance_cost = (daily_base_hours + daily_peak_hours) * base_instance_cost_hr
compute_cost = daily_instance_cost * 30
token_cost = 0 # No per-token cost for self-hosted
ops_cost = 12 * inputs.engineer_hourly_rate # 12 hrs/month (highest)
total = compute_cost + token_cost + ops_cost
return TCOResult(
pattern="sagemaker_realtime",
compute_cost=round(compute_cost, 2),
token_cost=round(token_cost, 2),
ops_cost=round(ops_cost, 2),
total_monthly=round(total, 2),
cost_per_message=round(total / (inputs.daily_messages * 30), 6),
breakdown={
"base_instance_hours": round(daily_base_hours * 30, 1),
"peak_instance_hours": round(daily_peak_hours * 30, 1),
"total_instance_hours": round((daily_base_hours + daily_peak_hours) * 30, 1),
},
)
def _calc_hybrid(self, inputs: TCOInputs) -> TCOResult:
"""Hybrid deployment TCO (MangaAssist production)."""
monthly_msgs = inputs.daily_messages * 30
# Peak: 40% messages via provisioned Sonnet
provisioned_msgs = monthly_msgs * 0.25
# Business: 35% messages via on-demand Haiku
haiku_msgs = monthly_msgs * 0.50
# Complex: 15% messages via on-demand Sonnet
sonnet_od_msgs = monthly_msgs * 0.15
# Manga-specific: 10% via SageMaker
sagemaker_msgs = monthly_msgs * 0.10
# Provisioned compute (1 Sonnet MU for 6 hrs peak)
provisioned_compute = 23.0 * 1 * inputs.peak_hours_per_day * 30
# On-demand token costs
haiku_cost = (
haiku_msgs * inputs.avg_input_tokens / 1_000_000 * 0.25
+ haiku_msgs * inputs.avg_output_tokens / 1_000_000 * 1.25
)
sonnet_od_cost = (
sonnet_od_msgs * inputs.avg_input_tokens / 1_000_000 * 3.0
+ sonnet_od_msgs * inputs.avg_output_tokens / 1_000_000 * 15.0
)
# SageMaker: 1x ml.g5.2xlarge 24/7
sagemaker_cost = 1.89 * 24 * 30
compute_cost = provisioned_compute + sagemaker_cost
token_cost = haiku_cost + sonnet_od_cost
ops_cost = 8 * inputs.engineer_hourly_rate # 8 hrs/month
total = compute_cost + token_cost + ops_cost
return TCOResult(
pattern="hybrid",
compute_cost=round(compute_cost, 2),
token_cost=round(token_cost, 2),
ops_cost=round(ops_cost, 2),
total_monthly=round(total, 2),
cost_per_message=round(total / monthly_msgs, 6),
breakdown={
"provisioned_compute": round(provisioned_compute, 2),
"haiku_on_demand": round(haiku_cost, 2),
"sonnet_on_demand": round(sonnet_od_cost, 2),
"sagemaker_instance": round(sagemaker_cost, 2),
},
)
4. SageMaker Auto-Scaling Deep Dive
4.1 Auto-Scaling Architecture
graph TB
subgraph "SageMaker Auto-Scaling"
CW[CloudWatch Metrics] --> ASG[Application<br/>Auto Scaling]
ASG --> EP[SageMaker Endpoint<br/>1-8 instances]
subgraph "Scaling Policies"
TT[Target Tracking<br/>InvocationsPerInstance = 10]
SS[Step Scaling<br/>ModelLatency > 2000ms]
SC[Scheduled Scaling<br/>Peak hours: min=3]
end
TT --> ASG
SS --> ASG
SC --> ASG
end
subgraph "Metrics Pipeline"
EP --> M1[InvocationsPerInstance]
EP --> M2[ModelLatency]
EP --> M3[CPUUtilization]
EP --> M4[GPUMemoryUtilization]
M1 --> CW
M2 --> CW
M3 --> CW
M4 --> CW
end
style ASG fill:#fff9c4
style EP fill:#c8e6c9
4.2 Production Auto-Scaling Configuration
"""
MangaAssist — SageMaker auto-scaling configuration.
Implements target tracking, step scaling, and scheduled scaling.
"""
import json
import logging
import boto3
from datetime import datetime, timezone, timedelta
from typing import Optional
logger = logging.getLogger(__name__)
class SageMakerAutoScaler:
"""
Configures comprehensive auto-scaling for SageMaker endpoints.
Combines target tracking, step scaling, and scheduled actions.
"""
def __init__(self, region: str = "ap-northeast-1"):
self.region = region
self.autoscaling = boto3.client(
"application-autoscaling", region_name=region
)
self.sm_client = boto3.client("sagemaker", region_name=region)
def setup_complete_scaling(
self,
endpoint_name: str,
variant_name: str = "AllTraffic",
min_instances: int = 1,
max_instances: int = 8,
) -> dict:
"""
Set up complete auto-scaling with all three policy types.
Returns policy ARNs for monitoring.
"""
resource_id = f"endpoint/{endpoint_name}/variant/{variant_name}"
# 1. Register scalable target
self.autoscaling.register_scalable_target(
ServiceNamespace="sagemaker",
ResourceId=resource_id,
ScalableDimension="sagemaker:variant:DesiredInstanceCount",
MinCapacity=min_instances,
MaxCapacity=max_instances,
)
# 2. Target tracking: maintain N invocations per instance
tt_policy = self._create_target_tracking_policy(
endpoint_name, variant_name, resource_id,
target_invocations=10,
)
# 3. Step scaling: react to latency spikes
step_policy = self._create_step_scaling_policy(
endpoint_name, variant_name, resource_id
)
# 4. Scheduled scaling: pre-scale for known peaks
schedules = self._create_scheduled_scaling(
endpoint_name, resource_id, min_instances, max_instances
)
return {
"target_tracking_policy": tt_policy,
"step_scaling_policy": step_policy,
"scheduled_actions": schedules,
}
def _create_target_tracking_policy(
self,
endpoint_name: str,
variant_name: str,
resource_id: str,
target_invocations: int = 10,
) -> str:
"""Create target tracking scaling policy."""
response = self.autoscaling.put_scaling_policy(
PolicyName=f"{endpoint_name}-target-tracking",
ServiceNamespace="sagemaker",
ResourceId=resource_id,
ScalableDimension="sagemaker:variant:DesiredInstanceCount",
PolicyType="TargetTrackingScaling",
TargetTrackingScalingPolicyConfiguration={
"TargetValue": float(target_invocations),
"CustomizedMetricSpecification": {
"MetricName": "InvocationsPerInstance",
"Namespace": "AWS/SageMaker",
"Dimensions": [
{"Name": "EndpointName", "Value": endpoint_name},
{"Name": "VariantName", "Value": variant_name},
],
"Statistic": "Average",
},
"ScaleInCooldown": 300, # 5 min cool-down before scale-in
"ScaleOutCooldown": 60, # 1 min cool-down before scale-out
},
)
arn = response["PolicyARN"]
logger.info(f"Target tracking policy created: {arn}")
return arn
def _create_step_scaling_policy(
self,
endpoint_name: str,
variant_name: str,
resource_id: str,
) -> str:
"""Create step scaling policy triggered by high latency."""
# Create CloudWatch alarm for latency
cw = boto3.client("cloudwatch", region_name=self.region)
# Step scaling policy
response = self.autoscaling.put_scaling_policy(
PolicyName=f"{endpoint_name}-latency-step",
ServiceNamespace="sagemaker",
ResourceId=resource_id,
ScalableDimension="sagemaker:variant:DesiredInstanceCount",
PolicyType="StepScaling",
StepScalingPolicyConfiguration={
"AdjustmentType": "ChangeInCapacity",
"StepAdjustments": [
{
"MetricIntervalLowerBound": 0,
"MetricIntervalUpperBound": 500,
"ScalingAdjustment": 1,
},
{
"MetricIntervalLowerBound": 500,
"ScalingAdjustment": 2,
},
],
"Cooldown": 120,
},
)
policy_arn = response["PolicyARN"]
# Create alarm that triggers the step policy
cw.put_metric_alarm(
AlarmName=f"{endpoint_name}-high-latency",
Namespace="AWS/SageMaker",
MetricName="ModelLatency",
Dimensions=[
{"Name": "EndpointName", "Value": endpoint_name},
{"Name": "VariantName", "Value": variant_name},
],
Statistic="Average",
Period=60,
EvaluationPeriods=2,
Threshold=2000000, # 2 seconds in microseconds
ComparisonOperator="GreaterThanThreshold",
AlarmActions=[policy_arn],
)
logger.info(f"Step scaling policy created: {policy_arn}")
return policy_arn
def _create_scheduled_scaling(
self,
endpoint_name: str,
resource_id: str,
min_instances: int,
max_instances: int,
) -> list[str]:
"""Create scheduled scaling for peak/off-peak hours (JST)."""
schedules = []
# Peak hours: 18:00 JST (09:00 UTC) — min 3 instances
self.autoscaling.put_scheduled_action(
ServiceNamespace="sagemaker",
ScheduledActionName=f"{endpoint_name}-peak-start",
ResourceId=resource_id,
ScalableDimension="sagemaker:variant:DesiredInstanceCount",
Schedule="cron(0 9 * * ? *)", # 09:00 UTC = 18:00 JST
ScalableTargetAction={
"MinCapacity": 3,
"MaxCapacity": max_instances,
},
)
schedules.append(f"{endpoint_name}-peak-start")
# Off-peak: 00:00 JST (15:00 UTC) — min 1 instance
self.autoscaling.put_scheduled_action(
ServiceNamespace="sagemaker",
ScheduledActionName=f"{endpoint_name}-offpeak-start",
ResourceId=resource_id,
ScalableDimension="sagemaker:variant:DesiredInstanceCount",
Schedule="cron(0 15 * * ? *)", # 15:00 UTC = 00:00 JST
ScalableTargetAction={
"MinCapacity": min_instances,
"MaxCapacity": max(max_instances // 2, 2),
},
)
schedules.append(f"{endpoint_name}-offpeak-start")
logger.info(f"Scheduled scaling actions created: {schedules}")
return schedules
def get_scaling_status(self, endpoint_name: str) -> dict:
"""Get current scaling status and activity."""
resource_id = f"endpoint/{endpoint_name}/variant/AllTraffic"
# Get current target
targets = self.autoscaling.describe_scalable_targets(
ServiceNamespace="sagemaker",
ResourceIds=[resource_id],
)
# Get scaling activities
activities = self.autoscaling.describe_scaling_activities(
ServiceNamespace="sagemaker",
ResourceId=resource_id,
MaxResults=5,
)
target = targets["ScalableTargets"][0] if targets["ScalableTargets"] else {}
recent_activities = [
{
"cause": a.get("Cause", ""),
"status": a["StatusCode"],
"start": str(a.get("StartTime", "")),
"end": str(a.get("EndTime", "")),
}
for a in activities.get("ScalingActivities", [])
]
return {
"current_min": target.get("MinCapacity"),
"current_max": target.get("MaxCapacity"),
"recent_activities": recent_activities,
}
4.3 Scaling Timeline Visualization
graph LR
subgraph "Scaling Response Timeline"
A["00:00 JST<br/>Min: 1 instance"] --> B["09:00 JST<br/>Business ramp-up"]
B --> C["12:00 JST<br/>Lunch peak<br/>Scale to 2"]
C --> D["17:00 JST<br/>Pre-peak warm-up"]
D --> E["18:00 JST<br/>Scheduled: Min 3"]
E --> F["20:00 JST<br/>Peak traffic<br/>Scale to 4-5"]
F --> G["23:00 JST<br/>Peak declining"]
G --> H["00:00 JST<br/>Scheduled: Min 1<br/>Scale-in begins"]
end
style E fill:#ffcdd2
style F fill:#ffcdd2
style H fill:#c8e6c9
5. Dynamic Pattern Switching
5.1 Real-Time Pattern Evaluation
"""
MangaAssist — Dynamic pattern switching.
Evaluates and switches deployment patterns based on real-time conditions.
"""
import time
import logging
from dataclasses import dataclass
from collections import deque
from typing import Optional
logger = logging.getLogger(__name__)
@dataclass
class PatternHealth:
"""Real-time health metrics for a deployment pattern."""
pattern: str
is_available: bool
current_latency_p50_ms: float
current_latency_p99_ms: float
error_rate: float # 0.0 - 1.0
capacity_utilization: float # 0.0 - 1.0
last_updated: float # timestamp
class DynamicPatternSwitcher:
"""
Monitors deployment pattern health and switches dynamically
when a pattern degrades below thresholds.
"""
def __init__(
self,
latency_threshold_ms: float = 2500,
error_rate_threshold: float = 0.05,
capacity_threshold: float = 0.85,
):
self.latency_threshold = latency_threshold_ms
self.error_rate_threshold = error_rate_threshold
self.capacity_threshold = capacity_threshold
self._pattern_health: dict[str, PatternHealth] = {}
self._switch_history: deque = deque(maxlen=100)
self._current_primary = "bedrock_provisioned"
def update_health(self, health: PatternHealth) -> None:
"""Update health metrics for a deployment pattern."""
self._pattern_health[health.pattern] = health
def evaluate_and_switch(self) -> Optional[str]:
"""
Evaluate all patterns and switch primary if needed.
Returns the new primary pattern, or None if no switch needed.
"""
current = self._pattern_health.get(self._current_primary)
if current is None:
return None
# Check if current primary is degraded
is_degraded = (
not current.is_available
or current.current_latency_p99_ms > self.latency_threshold
or current.error_rate > self.error_rate_threshold
or current.capacity_utilization > self.capacity_threshold
)
if not is_degraded:
return None
# Find best alternative
fallback_priority = [
"bedrock_provisioned",
"bedrock_ondemand_haiku",
"bedrock_ondemand_sonnet",
"sagemaker_realtime",
]
for candidate in fallback_priority:
if candidate == self._current_primary:
continue
candidate_health = self._pattern_health.get(candidate)
if candidate_health is None:
continue
if (
candidate_health.is_available
and candidate_health.error_rate < self.error_rate_threshold
and candidate_health.capacity_utilization < self.capacity_threshold
):
old_primary = self._current_primary
self._current_primary = candidate
self._switch_history.append({
"timestamp": time.time(),
"from": old_primary,
"to": candidate,
"reason": self._get_switch_reason(current),
})
logger.warning(
"Switching primary deployment pattern",
extra={
"from": old_primary,
"to": candidate,
"reason": self._get_switch_reason(current),
},
)
return candidate
logger.error("No healthy deployment pattern available")
return None
def _get_switch_reason(self, health: PatternHealth) -> str:
"""Determine why a switch is necessary."""
reasons = []
if not health.is_available:
reasons.append("unavailable")
if health.current_latency_p99_ms > self.latency_threshold:
reasons.append(f"high_latency({health.current_latency_p99_ms:.0f}ms)")
if health.error_rate > self.error_rate_threshold:
reasons.append(f"high_errors({health.error_rate:.1%})")
if health.capacity_utilization > self.capacity_threshold:
reasons.append(f"capacity({health.capacity_utilization:.1%})")
return ", ".join(reasons) or "unknown"
def get_routing_weights(self) -> dict[str, float]:
"""
Get traffic routing weights for all patterns.
Primary gets most traffic; healthy alternatives get a small share.
"""
weights = {}
for pattern, health in self._pattern_health.items():
if not health.is_available:
weights[pattern] = 0.0
elif pattern == self._current_primary:
weights[pattern] = 0.80
elif health.error_rate < self.error_rate_threshold:
weights[pattern] = 0.05 # Small canary allocation
else:
weights[pattern] = 0.0
# Normalize
total = sum(weights.values())
if total > 0:
weights = {k: round(v / total, 3) for k, v in weights.items()}
return weights
5.2 Pattern Health Dashboard
graph TB
subgraph "Health Monitor"
H1[Bedrock Provisioned<br/>P99: 600ms ✓<br/>Errors: 0.1% ✓<br/>Capacity: 65%] --> Router
H2[Bedrock On-Demand Haiku<br/>P99: 1200ms ✓<br/>Errors: 0.3% ✓<br/>Capacity: N/A] --> Router
H3[Bedrock On-Demand Sonnet<br/>P99: 1800ms ✓<br/>Errors: 0.2% ✓<br/>Capacity: N/A] --> Router
H4[SageMaker Endpoint<br/>P99: 500ms ✓<br/>Errors: 0.1% ✓<br/>Capacity: 45%] --> Router
end
Router[Traffic Router<br/>Weights: 80/10/5/5] --> Response[Client Response]
style H1 fill:#c8e6c9
style H2 fill:#c8e6c9
style H3 fill:#c8e6c9
style H4 fill:#c8e6c9
style Router fill:#fff9c4
6. Monitoring and Observability
6.1 Pattern Selection Metrics
"""
MangaAssist — Deployment pattern monitoring.
Tracks selection decisions, performance, and cost across patterns.
"""
import json
import time
import logging
import boto3
from typing import Optional
logger = logging.getLogger(__name__)
class DeploymentPatternMonitor:
"""
Publishes custom CloudWatch metrics for deployment pattern monitoring.
Tracks routing decisions, latency by pattern, and cost accumulation.
"""
def __init__(self, region: str = "ap-northeast-1"):
self.cw = boto3.client("cloudwatch", region_name=region)
self.namespace = "MangaAssist/Deployment"
def record_routing_decision(
self,
pattern: str,
reason: str,
latency_ms: float,
cost_usd: float,
success: bool,
) -> None:
"""Record a routing decision and its outcome."""
metric_data = [
{
"MetricName": "RoutingDecision",
"Dimensions": [
{"Name": "Pattern", "Value": pattern},
{"Name": "Reason", "Value": reason},
],
"Value": 1,
"Unit": "Count",
},
{
"MetricName": "InferenceLatency",
"Dimensions": [
{"Name": "Pattern", "Value": pattern},
],
"Value": latency_ms,
"Unit": "Milliseconds",
},
{
"MetricName": "InferenceCost",
"Dimensions": [
{"Name": "Pattern", "Value": pattern},
],
"Value": cost_usd,
"Unit": "None",
},
{
"MetricName": "InferenceSuccess",
"Dimensions": [
{"Name": "Pattern", "Value": pattern},
],
"Value": 1 if success else 0,
"Unit": "Count",
},
]
self.cw.put_metric_data(
Namespace=self.namespace,
MetricData=metric_data,
)
def record_pattern_switch(
self,
from_pattern: str,
to_pattern: str,
reason: str,
) -> None:
"""Record a deployment pattern switch event."""
self.cw.put_metric_data(
Namespace=self.namespace,
MetricData=[
{
"MetricName": "PatternSwitch",
"Dimensions": [
{"Name": "FromPattern", "Value": from_pattern},
{"Name": "ToPattern", "Value": to_pattern},
{"Name": "Reason", "Value": reason},
],
"Value": 1,
"Unit": "Count",
},
],
)
def create_dashboard(self, dashboard_name: str = "MangaAssist-Deployment") -> None:
"""Create a CloudWatch dashboard for deployment monitoring."""
dashboard_body = {
"widgets": [
{
"type": "metric",
"properties": {
"title": "Routing Decisions by Pattern",
"metrics": [
[self.namespace, "RoutingDecision", "Pattern", p]
for p in [
"provisioned-sonnet",
"ondemand-haiku",
"ondemand-sonnet",
"sagemaker-custom",
]
],
"period": 300,
"stat": "Sum",
},
},
{
"type": "metric",
"properties": {
"title": "Inference Latency P99 by Pattern",
"metrics": [
[self.namespace, "InferenceLatency", "Pattern", p]
for p in [
"provisioned-sonnet",
"ondemand-haiku",
"ondemand-sonnet",
"sagemaker-custom",
]
],
"period": 300,
"stat": "p99",
},
},
{
"type": "metric",
"properties": {
"title": "Cumulative Daily Cost",
"metrics": [
[self.namespace, "InferenceCost", "Pattern", p]
for p in [
"provisioned-sonnet",
"ondemand-haiku",
"ondemand-sonnet",
"sagemaker-custom",
]
],
"period": 3600,
"stat": "Sum",
},
},
],
}
self.cw.put_dashboard(
DashboardName=dashboard_name,
DashboardBody=json.dumps(dashboard_body),
)
logger.info(f"Dashboard created: {dashboard_name}")
Key Takeaways
-
Latency budgets drive pattern selection — MangaAssist's 3-second end-to-end target leaves only 1.5s for FM inference; this eliminates Lambda without provisioned concurrency (cold starts push p99 above 3.5s) and favors Bedrock provisioned or SageMaker for consistent sub-second inference.
-
Throughput sizing is a math problem — at 1M messages/day with 3.5x peak multiplier, MangaAssist needs ~40 RPS peak capacity; this maps to 2 Sonnet model units for provisioned throughput or 2-4 SageMaker g5.2xlarge instances.
-
TCO includes operational cost — SageMaker may have the lowest per-message cost at scale, but 12 hours/month of engineer time for GPU management, model updates, and scaling tuning adds $1,200-2,400/month that Bedrock avoids entirely.
-
SageMaker auto-scaling requires three policy types — target tracking handles steady growth, step scaling reacts to latency spikes, and scheduled scaling pre-provisions for known peaks; using only one leaves gaps.
-
Scale-out cooldown must be aggressive (60s) — new SageMaker instances take 5-15 minutes to become available, so the scaling trigger must fire as early as possible; scale-in cooldown should be conservative (300s) to avoid flapping.
-
Dynamic pattern switching provides resilience — monitoring all patterns in real time and automatically routing away from degraded ones ensures MangaAssist maintains its 3-second SLA even when individual deployment targets experience issues.
-
The hybrid approach optimizes for each traffic segment — peak hours use provisioned throughput (cost-efficient at high volume), simple queries use Haiku on-demand (cheapest per request), complex queries use Sonnet (quality-first), and manga-specific queries use a fine-tuned SageMaker model (best accuracy).