Capacity Planning & Auto-Scaling for GenAI Applications
AWS AIP-C01 Task 4.2 — Skill 4.2.5: Right-size resources to optimize FM application throughput Context: MangaAssist JP manga e-commerce chatbot — Bedrock Claude 3 (Sonnet/Haiku), OpenSearch Serverless, DynamoDB, ECS Fargate (orchestrator), API Gateway WebSocket, ElastiCache Redis. 1M messages/day, peak 20K concurrent sessions.
Skill Mapping
| Certification | Domain | Task | Skill |
|---|---|---|---|
| AWS AIP-C01 | Domain 4 — Responsible AI & Monitoring | Task 4.2 — Optimize FM application performance | Skill 4.2.5 — Right-size resources to optimize FM application throughput |
Focus: Deep-dive into bottom-up capacity modeling, per-service sizing, auto-scaling policy design, and reserved vs on-demand cost optimization for every layer of the MangaAssist stack.
Bottom-Up Capacity Model
Start from the one number that matters — 1M messages/day — and derive every service requirement mathematically.
flowchart TB
subgraph Input["Traffic Input"]
T["1M messages/day<br/>Peak 20K concurrent sessions"]
end
subgraph Step1["Step 1 — Request Rate"]
RPS_AVG["Sustained: 11.6 req/s<br/>(1M / 86,400)"]
RPS_PEAK["Peak: ~230 req/s<br/>(20K / avg_think_time)"]
end
subgraph Step2["Step 2 — Token Throughput"]
TOK["Peak tokens/sec:<br/>Sonnet: 106,950 t/s (30%)<br/>Haiku: 249,550 t/s (70%)<br/>Total: 356,500 t/s"]
end
subgraph Step3["Step 3 — Per-Service Sizing"]
direction TB
BED["Bedrock Units<br/>Sonnet: 535 units<br/>Haiku: 268 units"]
ECS["ECS Tasks<br/>29 tasks (buffered)<br/>2 vCPU / 4 GB each"]
OS["OpenSearch OCU<br/>Base: 2 + Search: 6<br/>Total: 8 OCU"]
DDB["DynamoDB<br/>On-Demand<br/>~575 RCU / ~276 WCU peak"]
EC["ElastiCache<br/>2x r6g.large<br/>13 GB each"]
end
subgraph Step4["Step 4 — Cost Envelope"]
COST["Monthly estimate<br/>per capacity plan"]
end
T --> Step1
Step1 --> Step2
Step2 --> Step3
Step3 --> Step4
Detailed Derivation
| Step | Calculation | Result |
|---|---|---|
| Daily messages | Given | 1,000,000 |
| Sustained RPS | 1,000,000 / 86,400 | 11.6 req/s |
| Peak concurrent sessions | Given | 20,000 |
| Avg think time (sec between requests) | 87s session / 3.2 requests | ~27s |
| Peak RPS | 20,000 / 27 | ~740 active req/s (not all hit Bedrock simultaneously) |
| Bedrock peak RPS (adjusted for cache hits) | 740 x 0.65 (35% cache hit) | ~480 Bedrock req/s |
| Sonnet peak RPS | 480 x 0.30 | 144 req/s |
| Haiku peak RPS | 480 x 0.70 | 336 req/s |
| Sonnet peak tokens/min | 144 x 1,550 tokens x 60 | 13,392,000 t/min |
| Haiku peak tokens/min | 336 x 1,550 tokens x 60 | 31,248,000 t/min |
| Sonnet model units needed | 13,392,000 / 50,000 x 1.25 buffer | ~335 units |
| Haiku model units needed | 31,248,000 / 100,000 x 1.25 buffer | ~391 units |
GenAI Traffic Patterns — Why Sizing Is Hard
GenAI traffic is fundamentally different from traditional web traffic. Understanding these patterns is critical for correct capacity planning.
Pattern 1 — Bursty and Correlated
When a new manga chapter drops (e.g., a popular series like One Piece), thousands of users ask similar questions within minutes. This creates correlated bursts where cache hit rate initially drops (novel queries) before recovering.
Normal traffic: ▁▂▃▂▁▂▃▂▁▂▃▂ (smooth, predictable)
Manga release: ▁▂▃▇█████▇▅▃▂ (sharp spike, slow decay)
Flash sale event: ▁▁▁▁▁▂████████ (step function, sustained high)
Pattern 2 — Variable-Length Requests
Unlike REST APIs with predictable payload sizes, GenAI requests vary enormously in token count:
| Request Type | Input Tokens | Output Tokens | Total | Duration |
|---|---|---|---|---|
| Simple FAQ ("Is this in stock?") | 400 | 80 | 480 | ~0.5s |
| Manga recommendation (multi-turn) | 2,500 | 500 | 3,000 | ~3s |
| Complex comparison (5 series) | 3,500 | 900 | 4,400 | ~5s |
| Order troubleshooting (with history) | 4,000 | 600 | 4,600 | ~4s |
This means average resource consumption per request has high variance, making simple "requests per second" capacity models insufficient.
Pattern 3 — Time-of-Day Correlation (JP Market)
JST Hour: 06 08 10 12 14 16 18 20 22 00 02 04
Traffic %: 5% 8% 12% 15% 12% 15% 18% 25% 22% 10% 5% 3%
▂ ▃ ▅ ▆ ▅ ▆ ▇ █ ▇ ▄ ▂ ▁
↑ Prime time: 18:00-23:00 JST
Peak period (18:00-23:00 JST) carries ~65% of daily traffic in ~21% of the day. Scheduled scaling must pre-warm resources before 18:00 JST.
Per-Service Sizing Deep-Dive
ECS Fargate — Orchestrator Sizing
The orchestrator is I/O-bound, not CPU-bound. It spends most of its time waiting for Bedrock, OpenSearch, and DynamoDB responses. The key constraint is concurrent outbound connections, not raw CPU.
| Factor | Value | Notes |
|---|---|---|
| vCPU per task | 2 | Sufficient for async I/O orchestration |
| Memory per task | 4 GB | Handles prompt assembly for 4K token context windows |
| Max concurrent Bedrock calls per task | 10 | Async HTTP client with connection pooling |
| Max concurrent OpenSearch calls per task | 20 | Faster than Bedrock, higher concurrency OK |
| Peak Bedrock RPS (after cache) | 480 | From capacity model above |
| Tasks needed for Bedrock concurrency | 480 / 10 = 48 | Primary sizing constraint |
| With 25% buffer | 60 tasks | Recommended minimum at peak |
| Minimum tasks (off-peak) | 10 | Floor for availability |
| Maximum tasks | 200 | Account-level ceiling |
Fargate Spot consideration: The orchestrator is stateless (session state in DynamoDB, cache in Redis). Spot interruptions cause a single request to fail and retry. At 70% Spot / 30% On-Demand, savings are ~50% on compute.
DynamoDB — Session Storage
| Access Pattern | Frequency | Size | Consistency |
|---|---|---|---|
| Read session context | Every request | ~4 KB (eventually consistent) | Eventually consistent |
| Read conversation history | Every multi-turn request (70%) | 1-10 KB (depends on turns) | Eventually consistent |
| Write session update | Every request | ~2 KB | Standard write |
| Write conversation turn | Every request | ~1 KB | Standard write |
On-Demand vs Provisioned:
| Mode | Peak Cost/Month | Throttle Risk | Management Overhead |
|---|---|---|---|
| On-Demand | ~$600 | Very low (auto-scales to 2x previous peak) | Zero |
| Provisioned + Auto-Scaling | ~$400 | Medium (scaling lag on sudden spikes) | High (tune targets, alarms) |
| Provisioned with Reserved | ~$280 | Medium | Very high (1-year commit) |
MangaAssist recommendation: On-Demand for session table. The 50% cost premium over reserved provisioned is worth the zero-throttle guarantee for user-facing chatbot latency. DynamoDB is typically <5% of total GenAI stack cost, so optimizing it aggressively yields small absolute savings with meaningful risk.
OpenSearch Serverless — OCU Planning
OpenSearch Serverless uses OCU (OpenSearch Compute Units). Each collection requires: - Base OCU: Always-on for indexing. Minimum 2 OCU. - Search OCU: Scales with query load. Minimum 2 OCU.
| OCU Type | Minimum | Peak Estimate | Auto-Scales | Cost/OCU/Hour |
|---|---|---|---|---|
| Base (indexing) | 2 | 4 | Yes (with lag) | $0.24 |
| Search | 2 | 12 | Yes (faster) | $0.24 |
| Total at peak | 4 | 16 | — | — |
| Monthly cost (sustained peak) | — | — | — | ~$2,803 |
Scaling behavior: OpenSearch Serverless auto-scales search OCUs, but there is a lag of 5-15 minutes to add capacity. During sudden spikes (manga release events), search latency degrades before OCUs scale. Mitigation: maintain higher base search OCU during known event windows.
Vector search specifics: Manga embedding search uses k-NN with HNSW index. Memory requirements scale with: - Embedding count: 500K manga products x 1536 dimensions x 4 bytes = ~3 GB - HNSW graph overhead: ~2x embedding size = ~6 GB - Total per search OCU: ~8 GB available, so minimum 2 search OCU just for the index
ElastiCache Redis — Semantic Cache Sizing
| Component | Memory per Entry | Count | Total Memory |
|---|---|---|---|
| Cached query embedding (1536-dim float32) | 6 KB | 120,000 entries | ~703 MB |
| Cached response text | 2 KB avg | 120,000 entries | ~234 MB |
| Session state (active) | 4 KB | 20,000 concurrent | ~78 MB |
| Metadata + overhead | 30% of data | — | ~305 MB |
| Total | — | — | ~1.32 GB |
With r6g.large nodes (13 GB usable each), 2 nodes in a replication group provide:
- 13 GB primary + 13 GB replica = 26 GB total
- Utilization at ~1.32 GB = ~10% (room for growth)
- Consider: r6g.medium (6.38 GB) would save ~40% and still provide headroom
Eviction policy: allkeys-lfu (Least Frequently Used) ensures popular manga queries stay cached while rarely-asked queries are evicted first.
Auto-Scaling Policy Design
Target Tracking vs Step Scaling — When to Use Each
| Policy Type | Best For | Response Time | Stability | Use in MangaAssist |
|---|---|---|---|---|
| Target Tracking | Steady metric to maintain (e.g., CPU = 65%) | 1-3 minutes | High (built-in cooldown) | ECS primary policy |
| Step Scaling | Multi-threshold response (different urgency levels) | Configurable | Medium (manual cooldowns) | ECS secondary policy |
| Predictive Scaling | Recurring patterns | Pre-emptive | High | JP prime time pre-warming |
| Scheduled Scaling | Known events | Exact timing | High | Manga release events |
ECS Auto-Scaling — Full Policy Configuration
flowchart TB
subgraph Primary["Primary: Target Tracking"]
TT["Target: 10 active Bedrock<br/>invocations per ECS task<br/>Scale-out cooldown: 60s<br/>Scale-in cooldown: 300s"]
end
subgraph Secondary["Secondary: Step Scaling (CPU)"]
SS1["CPU 60-75%: +1 task"]
SS2["CPU 75-90%: +3 tasks"]
SS3["CPU >90%: +5 tasks"]
SS4["CPU <40%: -1 task"]
SS5["CPU <25%: -2 tasks"]
end
subgraph Predictive["Predictive: JP Prime Time"]
PS["18:00 JST: scale to 50 tasks<br/>23:00 JST: allow scale-in<br/>Based on 4-week history"]
end
subgraph Scheduled["Scheduled: Known Events"]
SC["Manga release: +30 min before<br/>scale to 100 tasks<br/>Flash sale: +1 hr before<br/>scale to 150 tasks"]
end
subgraph Result["Effective Desired Count"]
R["MAX(target_tracking, step_scaling,<br/>predictive, scheduled, minimum)"]
end
Primary --> Result
Secondary --> Result
Predictive --> Result
Scheduled --> Result
DynamoDB Auto-Scaling (if using provisioned mode)
| Parameter | Read Capacity | Write Capacity |
|---|---|---|
| Target utilization | 70% | 70% |
| Minimum capacity | 100 RCU | 50 WCU |
| Maximum capacity | 5,000 RCU | 2,000 WCU |
| Scale-out cooldown | 60 seconds | 60 seconds |
| Scale-in cooldown | 900 seconds (15 min) | 900 seconds (15 min) |
Note: DynamoDB auto-scaling reacts to
ConsumedReadCapacityUnits / ProvisionedReadCapacityUnits. Scale-in cooldown is deliberately long (15 min) because DynamoDB can only decrease capacity 4 times per table per day by default.
ElastiCache Scaling Strategy
ElastiCache Redis does not auto-scale horizontally in the traditional sense. Scaling options:
| Scaling Dimension | Method | Downtime | Automation |
|---|---|---|---|
| Vertical (larger node) | Modify replication group | Brief failover | CloudWatch alarm + Lambda |
| Horizontal read (more replicas) | Add read replica | None | Manual / CloudFormation |
| Cluster mode (sharding) | Enable cluster mode | Migration required | One-time architectural change |
MangaAssist approach: Start with vertical scaling triggered by memory threshold. If memory > 80% for 15 minutes, a Lambda function initiates a node type upgrade. For read throughput, add replicas to distribute read load.
Reserved Capacity vs On-Demand — Cost Optimization Matrix
| Service | On-Demand Monthly | 1-Year Reserved | Savings | Recommendation |
|---|---|---|---|---|
| Bedrock Sonnet (335 units) | $15.4M | N/A (Provisioned Throughput model) | See PT pricing | Use PT for baseline, on-demand for burst |
| Bedrock Haiku (391 units) | $2.3M | N/A (Provisioned Throughput model) | See PT pricing | Use PT for baseline, on-demand for burst |
| ECS Fargate (avg 30 tasks) | ~$2,100 | Savings Plan: ~$1,470 | 30% | Compute Savings Plan for steady-state |
| OpenSearch (avg 8 OCU) | ~$1,402 | Reserved: ~$980 | 30% | Reserve base OCU, on-demand for search scaling |
| ElastiCache (2x r6g.large) | ~$99 | Reserved: ~$63 | 36% | Reserve — always-on service |
| DynamoDB (on-demand) | ~$600 | Reserved: ~$280 | 53% | Stay on-demand — flexibility > savings |
Strategy: Reserve always-on baseline capacity (ElastiCache, OpenSearch base OCU, ECS Compute Savings Plan). Use on-demand for variable components (DynamoDB, OpenSearch search OCU, ECS burst).
Python — TrafficPatternAnalyzer
"""
MangaAssist Capacity Planning — Traffic Pattern Analyzer
Analyzes historical traffic to identify patterns, seasonality, and anomalies
for accurate capacity planning.
"""
import math
import statistics
from dataclasses import dataclass, field
from typing import Dict, List, Optional, Tuple
from datetime import datetime, timedelta
from collections import defaultdict
@dataclass
class HourlyTraffic:
"""Traffic measurement for one hour."""
timestamp: datetime
request_count: int
avg_latency_ms: float
p95_latency_ms: float
bedrock_invocations: int
cache_hit_rate: float
error_rate: float
avg_input_tokens: float
avg_output_tokens: float
@dataclass
class TrafficPattern:
"""Identified traffic pattern with characteristics."""
pattern_type: str # "daily_cycle", "weekly_cycle", "event_spike", "trend"
description: str
peak_hour_jst: int
peak_multiplier: float # peak / average ratio
duration_hours: float
predictability_score: float # 0-1, how consistent this pattern is
recommended_action: str
@dataclass
class CapacityRecommendation:
"""Per-service capacity recommendation based on traffic analysis."""
service: str
current_capacity: float
recommended_min: float
recommended_max: float
recommended_baseline: float
scaling_trigger: str
estimated_savings_pct: float
confidence: float # 0-1
class TrafficPatternAnalyzer:
"""
Analyzes MangaAssist traffic history to identify patterns and generate
capacity recommendations.
Uses 4 weeks of hourly data to detect:
- Daily cycles (JP prime time pattern)
- Weekly cycles (weekend vs weekday)
- Event-driven spikes (manga releases, sales)
- Growth trends
"""
# JP prime time hours (JST)
JP_PRIME_START = 18
JP_PRIME_END = 23
def __init__(self, history: List[HourlyTraffic]):
self.history = sorted(history, key=lambda h: h.timestamp)
self._daily_profiles: Dict[int, List[float]] = defaultdict(list)
self._weekly_profiles: Dict[int, List[float]] = defaultdict(list)
def _build_profiles(self) -> None:
"""Build hourly and daily-of-week traffic profiles."""
for entry in self.history:
hour = entry.timestamp.hour
day_of_week = entry.timestamp.weekday()
self._daily_profiles[hour].append(entry.request_count)
self._weekly_profiles[day_of_week].append(entry.request_count)
def analyze_daily_pattern(self) -> TrafficPattern:
"""Identify the daily traffic cycle (JP prime time pattern)."""
self._build_profiles()
hourly_avgs = {
hour: statistics.mean(counts)
for hour, counts in self._daily_profiles.items()
if counts
}
if not hourly_avgs:
return TrafficPattern(
pattern_type="daily_cycle",
description="Insufficient data",
peak_hour_jst=20,
peak_multiplier=1.0,
duration_hours=0,
predictability_score=0.0,
recommended_action="Collect more data",
)
overall_avg = statistics.mean(hourly_avgs.values())
peak_hour = max(hourly_avgs, key=hourly_avgs.get)
peak_avg = hourly_avgs[peak_hour]
peak_multiplier = peak_avg / overall_avg if overall_avg > 0 else 1.0
# Calculate predictability: lower CV = more predictable
peak_hour_values = self._daily_profiles.get(peak_hour, [])
if len(peak_hour_values) > 1:
cv = statistics.stdev(peak_hour_values) / statistics.mean(peak_hour_values)
predictability = max(0, 1 - cv)
else:
predictability = 0.5
# Determine prime-time duration
prime_hours = [
h for h, avg in hourly_avgs.items()
if avg > overall_avg * 1.3
]
return TrafficPattern(
pattern_type="daily_cycle",
description=(
f"Peak traffic at {peak_hour}:00 JST with "
f"{peak_multiplier:.1f}x average. Prime time spans "
f"{len(prime_hours)} hours."
),
peak_hour_jst=peak_hour,
peak_multiplier=peak_multiplier,
duration_hours=len(prime_hours),
predictability_score=predictability,
recommended_action=(
f"Pre-scale resources 30 min before {peak_hour - 1}:00 JST. "
f"Maintain {peak_multiplier:.0f}x baseline capacity during "
f"prime time window."
),
)
def detect_event_spikes(
self, std_dev_threshold: float = 3.0
) -> List[Dict]:
"""
Detect anomalous traffic spikes that exceed normal patterns.
These correspond to manga release events, flash sales, etc.
"""
if len(self.history) < 48:
return []
self._build_profiles()
spikes = []
for entry in self.history:
hour = entry.timestamp.hour
hour_values = self._daily_profiles.get(hour, [])
if len(hour_values) < 7:
continue
mean_val = statistics.mean(hour_values)
std_val = statistics.stdev(hour_values) if len(hour_values) > 1 else 0
if std_val > 0:
z_score = (entry.request_count - mean_val) / std_val
else:
z_score = 0
if z_score > std_dev_threshold:
spikes.append({
"timestamp": entry.timestamp,
"request_count": entry.request_count,
"expected": mean_val,
"z_score": z_score,
"multiplier": entry.request_count / mean_val,
"cache_hit_rate": entry.cache_hit_rate,
})
return sorted(spikes, key=lambda s: s["z_score"], reverse=True)
def analyze_token_patterns(self) -> Dict[str, float]:
"""Analyze token consumption patterns across time periods."""
if not self.history:
return {}
prime_time = [
e for e in self.history
if self.JP_PRIME_START <= e.timestamp.hour < self.JP_PRIME_END
]
off_peak = [
e for e in self.history
if not (self.JP_PRIME_START <= e.timestamp.hour < self.JP_PRIME_END)
]
def avg_ratio(entries: List[HourlyTraffic]) -> float:
if not entries:
return 0.0
ratios = [
e.avg_input_tokens / e.avg_output_tokens
for e in entries
if e.avg_output_tokens > 0
]
return statistics.mean(ratios) if ratios else 0.0
return {
"prime_time_input_output_ratio": avg_ratio(prime_time),
"off_peak_input_output_ratio": avg_ratio(off_peak),
"prime_time_avg_input_tokens": (
statistics.mean([e.avg_input_tokens for e in prime_time])
if prime_time else 0.0
),
"off_peak_avg_input_tokens": (
statistics.mean([e.avg_input_tokens for e in off_peak])
if off_peak else 0.0
),
"prime_time_cache_hit_rate": (
statistics.mean([e.cache_hit_rate for e in prime_time])
if prime_time else 0.0
),
"off_peak_cache_hit_rate": (
statistics.mean([e.cache_hit_rate for e in off_peak])
if off_peak else 0.0
),
}
def generate_capacity_recommendations(
self,
) -> List[CapacityRecommendation]:
"""
Generate per-service capacity recommendations based on observed
traffic patterns.
"""
daily = self.analyze_daily_pattern()
spikes = self.detect_event_spikes()
token_patterns = self.analyze_token_patterns()
peak_multiplier = daily.peak_multiplier
max_spike_multiplier = (
max(s["multiplier"] for s in spikes) if spikes else peak_multiplier
)
recommendations = []
# ECS Fargate recommendation
avg_rps = statistics.mean(
[e.request_count / 3600 for e in self.history]
) if self.history else 11.6
ecs_baseline = math.ceil(avg_rps / 10) # 10 invocations/task
ecs_peak = math.ceil(avg_rps * peak_multiplier / 10)
ecs_spike = math.ceil(avg_rps * max_spike_multiplier / 10)
recommendations.append(CapacityRecommendation(
service="ECS Fargate",
current_capacity=0,
recommended_min=max(10, ecs_baseline),
recommended_max=min(200, ecs_spike * 1.25),
recommended_baseline=ecs_peak,
scaling_trigger="Active Bedrock invocations > 10/task for 60s",
estimated_savings_pct=0.0,
confidence=daily.predictability_score,
))
# OpenSearch recommendation
search_rps = avg_rps * 0.6 # 60% need vector search
search_ocu_baseline = max(2, math.ceil(search_rps / 50))
search_ocu_peak = max(2, math.ceil(
search_rps * peak_multiplier / 50
))
recommendations.append(CapacityRecommendation(
service="OpenSearch Serverless (Search OCU)",
current_capacity=0,
recommended_min=search_ocu_baseline,
recommended_max=search_ocu_peak * 2,
recommended_baseline=search_ocu_peak,
scaling_trigger="SearchLatency P95 > 80ms for 5 min",
estimated_savings_pct=0.0,
confidence=daily.predictability_score * 0.8,
))
# ElastiCache recommendation based on cache hit patterns
cache_hit = token_patterns.get("prime_time_cache_hit_rate", 0.35)
if cache_hit < 0.30:
cache_action = (
"Cache hit rate below 30%. Consider increasing cache "
"capacity or adjusting similarity threshold."
)
else:
cache_action = "Cache performance healthy."
recommendations.append(CapacityRecommendation(
service="ElastiCache Redis",
current_capacity=0,
recommended_min=2,
recommended_max=4,
recommended_baseline=2,
scaling_trigger=f"Memory > 80% for 15 min. {cache_action}",
estimated_savings_pct=0.0,
confidence=0.9,
))
return recommendations
def print_analysis(self) -> None:
"""Print a comprehensive traffic analysis report."""
daily = self.analyze_daily_pattern()
spikes = self.detect_event_spikes()
token_patterns = self.analyze_token_patterns()
recommendations = self.generate_capacity_recommendations()
print("=" * 70)
print("MANGAASSIST TRAFFIC PATTERN ANALYSIS")
print("=" * 70)
print(f"\nDaily Pattern: {daily.description}")
print(f" Predictability: {daily.predictability_score:.0%}")
print(f" Action: {daily.recommended_action}")
if spikes:
print(f"\nDetected {len(spikes)} anomalous spikes:")
for s in spikes[:5]:
print(
f" {s['timestamp']}: {s['request_count']:,} requests "
f"({s['multiplier']:.1f}x expected, z={s['z_score']:.1f})"
)
print(f"\nToken Patterns:")
for key, val in token_patterns.items():
print(f" {key}: {val:.2f}")
print(f"\nCapacity Recommendations:")
for rec in recommendations:
print(f"\n {rec.service}:")
print(f" Baseline: {rec.recommended_baseline:.0f}")
print(f" Range: {rec.recommended_min:.0f} - {rec.recommended_max:.0f}")
print(f" Trigger: {rec.scaling_trigger}")
print(f" Confidence: {rec.confidence:.0%}")
# --- Usage ---
if __name__ == "__main__":
# Simulate 1 week of hourly traffic data
import random
history = []
base_time = datetime(2025, 3, 1, 0, 0, 0)
for day in range(28):
for hour in range(24):
# Simulate JP traffic pattern
if 18 <= hour <= 22:
base_count = 80_000 # prime time
elif 10 <= hour <= 17:
base_count = 40_000 # daytime
elif 7 <= hour <= 9:
base_count = 25_000 # morning
else:
base_count = 10_000 # night
# Add noise
noise = random.gauss(0, base_count * 0.15)
count = max(1000, int(base_count + noise))
# Simulate a manga release spike on day 7 and 21
if day in [7, 21] and 19 <= hour <= 21:
count = int(count * 3.5)
cache_hit = random.uniform(0.30, 0.45)
history.append(HourlyTraffic(
timestamp=base_time + timedelta(days=day, hours=hour),
request_count=count,
avg_latency_ms=random.uniform(200, 800),
p95_latency_ms=random.uniform(500, 2000),
bedrock_invocations=int(count * (1 - cache_hit)),
cache_hit_rate=cache_hit,
error_rate=random.uniform(0.001, 0.01),
avg_input_tokens=random.uniform(900, 1500),
avg_output_tokens=random.uniform(200, 500),
))
analyzer = TrafficPatternAnalyzer(history)
analyzer.print_analysis()
Python — AutoScalingPolicyGenerator
"""
MangaAssist Capacity Planning — Auto-Scaling Policy Generator
Generates CloudFormation/CDK-compatible auto-scaling policy configurations
for each service in the MangaAssist stack.
"""
import json
import math
from dataclasses import dataclass, field
from typing import Any, Dict, List, Optional
from datetime import datetime
@dataclass
class ScalingPolicy:
"""Represents a single auto-scaling policy."""
policy_name: str
policy_type: str # "TargetTracking", "StepScaling", "Scheduled"
service: str
metric_namespace: str
metric_name: str
target_value: Optional[float] = None
step_adjustments: Optional[List[Dict[str, Any]]] = None
cooldown_scale_out: int = 60
cooldown_scale_in: int = 300
schedule_expression: Optional[str] = None
min_capacity: Optional[int] = None
max_capacity: Optional[int] = None
dimensions: Dict[str, str] = field(default_factory=dict)
class AutoScalingPolicyGenerator:
"""
Generates auto-scaling policies for every scalable service in the
MangaAssist stack. Outputs configurations compatible with CloudFormation,
CDK, or direct AWS API calls.
"""
def __init__(
self,
cluster_name: str = "mangaassist-prod",
service_name: str = "orchestrator",
region: str = "ap-northeast-1",
):
self.cluster_name = cluster_name
self.service_name = service_name
self.region = region
self._policies: List[ScalingPolicy] = []
def generate_ecs_target_tracking(self) -> ScalingPolicy:
"""
Primary ECS policy: target tracking on custom metric
'ActiveBedrockInvocationsPerTask'.
"""
policy = ScalingPolicy(
policy_name="ecs-bedrock-invocations-target",
policy_type="TargetTracking",
service="ECS",
metric_namespace="MangaAssist/Orchestrator",
metric_name="ActiveBedrockInvocationsPerTask",
target_value=10.0,
cooldown_scale_out=60,
cooldown_scale_in=300,
dimensions={
"ClusterName": self.cluster_name,
"ServiceName": self.service_name,
},
)
self._policies.append(policy)
return policy
def generate_ecs_step_scaling(self) -> ScalingPolicy:
"""
Secondary ECS policy: step scaling on CPU for emergency response.
"""
policy = ScalingPolicy(
policy_name="ecs-cpu-step-scaling",
policy_type="StepScaling",
service="ECS",
metric_namespace="AWS/ECS",
metric_name="CPUUtilization",
step_adjustments=[
{
"MetricIntervalLowerBound": 0,
"MetricIntervalUpperBound": 15,
"ScalingAdjustment": 1,
},
{
"MetricIntervalLowerBound": 15,
"MetricIntervalUpperBound": 30,
"ScalingAdjustment": 3,
},
{
"MetricIntervalLowerBound": 30,
"ScalingAdjustment": 5,
},
],
cooldown_scale_out=60,
cooldown_scale_in=300,
dimensions={
"ClusterName": self.cluster_name,
"ServiceName": self.service_name,
},
)
self._policies.append(policy)
return policy
def generate_ecs_scheduled_scaling(self) -> List[ScalingPolicy]:
"""
Scheduled scaling for JP prime time and known events.
"""
policies = []
# JP prime time: pre-warm at 17:30 JST (08:30 UTC)
prime_time = ScalingPolicy(
policy_name="ecs-jp-primetime-prewarm",
policy_type="Scheduled",
service="ECS",
metric_namespace="",
metric_name="",
schedule_expression="cron(30 8 * * ? *)", # 17:30 JST = 08:30 UTC
min_capacity=50,
max_capacity=200,
)
policies.append(prime_time)
# End of prime time: allow scale-in at 23:30 JST (14:30 UTC)
off_peak = ScalingPolicy(
policy_name="ecs-jp-offpeak",
policy_type="Scheduled",
service="ECS",
metric_namespace="",
metric_name="",
schedule_expression="cron(30 14 * * ? *)", # 23:30 JST = 14:30 UTC
min_capacity=10,
max_capacity=200,
)
policies.append(off_peak)
self._policies.extend(policies)
return policies
def generate_dynamodb_scaling(
self,
table_name: str = "mangaassist-sessions",
) -> List[ScalingPolicy]:
"""
DynamoDB auto-scaling policies for provisioned mode.
Only used if table is in provisioned mode (on-demand needs no policy).
"""
policies = []
for capacity_type, target, min_cap, max_cap in [
("Read", 70.0, 100, 5000),
("Write", 70.0, 50, 2000),
]:
policy = ScalingPolicy(
policy_name=f"dynamodb-{capacity_type.lower()}-target",
policy_type="TargetTracking",
service="DynamoDB",
metric_namespace="AWS/DynamoDB",
metric_name=f"Consumed{capacity_type}CapacityUnits",
target_value=target,
cooldown_scale_out=60,
cooldown_scale_in=900,
min_capacity=min_cap,
max_capacity=max_cap,
dimensions={"TableName": table_name},
)
policies.append(policy)
self._policies.extend(policies)
return policies
def to_cloudformation(self) -> Dict[str, Any]:
"""
Convert all generated policies to CloudFormation-compatible JSON.
"""
resources = {}
for policy in self._policies:
safe_name = policy.policy_name.replace("-", "").title().replace(" ", "")
if policy.policy_type == "TargetTracking":
resources[f"{safe_name}Policy"] = {
"Type": "AWS::ApplicationAutoScaling::ScalingPolicy",
"Properties": {
"PolicyName": policy.policy_name,
"PolicyType": "TargetTrackingScaling",
"TargetTrackingScalingPolicyConfiguration": {
"TargetValue": policy.target_value,
"CustomizedMetricSpecification": {
"MetricName": policy.metric_name,
"Namespace": policy.metric_namespace,
"Statistic": "Average",
"Dimensions": [
{"Name": k, "Value": v}
for k, v in policy.dimensions.items()
],
},
"ScaleOutCooldown": policy.cooldown_scale_out,
"ScaleInCooldown": policy.cooldown_scale_in,
},
},
}
elif policy.policy_type == "StepScaling":
resources[f"{safe_name}Policy"] = {
"Type": "AWS::ApplicationAutoScaling::ScalingPolicy",
"Properties": {
"PolicyName": policy.policy_name,
"PolicyType": "StepScaling",
"StepScalingPolicyConfiguration": {
"AdjustmentType": "ChangeInCapacity",
"StepAdjustments": policy.step_adjustments,
"Cooldown": policy.cooldown_scale_out,
},
},
}
elif policy.policy_type == "Scheduled":
resources[f"{safe_name}Action"] = {
"Type": "AWS::ApplicationAutoScaling::ScheduledAction",
"Properties": {
"ScheduledActionName": policy.policy_name,
"Schedule": policy.schedule_expression,
"ScalableTargetAction": {
"MinCapacity": policy.min_capacity,
"MaxCapacity": policy.max_capacity,
},
},
}
return {"Resources": resources}
def print_summary(self) -> None:
"""Print a readable summary of all generated policies."""
print("=" * 70)
print("MANGAASSIST AUTO-SCALING POLICY SUMMARY")
print("=" * 70)
for policy in self._policies:
print(f"\n--- {policy.policy_name} ---")
print(f" Type: {policy.policy_type}")
print(f" Service: {policy.service}")
if policy.target_value:
print(f" Target: {policy.metric_name} = {policy.target_value}")
if policy.step_adjustments:
print(f" Steps: {json.dumps(policy.step_adjustments, indent=4)}")
if policy.schedule_expression:
print(f" Schedule: {policy.schedule_expression}")
print(f" Capacity: min={policy.min_capacity}, max={policy.max_capacity}")
print(f" Cooldowns: out={policy.cooldown_scale_out}s, "
f"in={policy.cooldown_scale_in}s")
def generate_all(self) -> List[ScalingPolicy]:
"""Generate all auto-scaling policies for the MangaAssist stack."""
self._policies.clear()
self.generate_ecs_target_tracking()
self.generate_ecs_step_scaling()
self.generate_ecs_scheduled_scaling()
self.generate_dynamodb_scaling()
return self._policies
# --- Usage ---
if __name__ == "__main__":
generator = AutoScalingPolicyGenerator(
cluster_name="mangaassist-prod",
service_name="orchestrator",
)
generator.generate_all()
generator.print_summary()
cfn = generator.to_cloudformation()
print("\n\nCloudFormation Output:")
print(json.dumps(cfn, indent=2))
Capacity Model Breakdown — Complete Mermaid Diagram
flowchart TB
subgraph Traffic["1M Messages / Day"]
T1["Sustained: 11.6 req/s"]
T2["Peak: ~480 Bedrock req/s<br/>(after 35% cache hit)"]
end
subgraph ModelTier["Model Tiering (70/30)"]
H["Haiku: 336 req/s<br/>Simple queries, intent routing"]
S["Sonnet: 144 req/s<br/>Recommendations, reasoning"]
end
subgraph Tokens["Token Budget"]
HT["Haiku: 520K tokens/min<br/>391 model units"]
ST["Sonnet: 223K tokens/min<br/>335 model units"]
end
subgraph Compute["ECS Orchestration"]
EC["60 tasks at peak<br/>2 vCPU / 4 GB each<br/>120 vCPU total"]
end
subgraph Data["Data Layer"]
OS["OpenSearch: 8-16 OCU<br/>~288 vector searches/sec"]
DDB["DynamoDB: On-Demand<br/>~575 RCU / ~276 WCU"]
CACHE["Redis: 2x r6g.large<br/>120K cached entries<br/>35% hit rate"]
end
subgraph Cost["Monthly Cost Estimate"]
C1["Bedrock: dominant cost<br/>(depends on PT pricing)"]
C2["ECS: ~$4,200<br/>(with Savings Plan)"]
C3["OpenSearch: ~$1,400-2,800"]
C4["DynamoDB: ~$600"]
C5["ElastiCache: ~$100"]
C6["API Gateway: ~$100"]
end
T1 --> T2
T2 --> ModelTier
H --> HT
S --> ST
T2 --> Compute
Compute --> Data
Data --> Cost
Tokens --> Cost
Key Takeaways
-
Bottom-up modeling prevents surprises: Start from 1M messages/day, calculate per-service requirements mathematically. Every number in the capacity plan has a derivation, not a guess.
-
GenAI traffic is not like web traffic: Bursty, correlated, variable-length requests demand auto-scaling policies that go beyond simple CPU thresholds. Custom metrics (Bedrock invocations per task) are essential.
-
Layer your scaling policies: Target tracking for steady-state, step scaling for emergencies, predictive for daily cycles, scheduled for known events. Each layer catches what the others miss.
-
Right-size DynamoDB as on-demand: For user-facing chatbot session storage, the on-demand cost premium is insurance against throttling. Reserve your optimization effort for the services that dominate cost (Bedrock, ECS).
-
Reserved for baseline, on-demand for burst: Savings Plans and reserved instances for always-on components, on-demand for elastic scaling. This typically saves 25-35% without sacrificing elasticity.