LOCAL PREVIEW View on GitHub

Capacity Planning & Auto-Scaling for GenAI Applications

AWS AIP-C01 Task 4.2 — Skill 4.2.5: Right-size resources to optimize FM application throughput Context: MangaAssist JP manga e-commerce chatbot — Bedrock Claude 3 (Sonnet/Haiku), OpenSearch Serverless, DynamoDB, ECS Fargate (orchestrator), API Gateway WebSocket, ElastiCache Redis. 1M messages/day, peak 20K concurrent sessions.


Skill Mapping

Certification Domain Task Skill
AWS AIP-C01 Domain 4 — Responsible AI & Monitoring Task 4.2 — Optimize FM application performance Skill 4.2.5 — Right-size resources to optimize FM application throughput

Focus: Deep-dive into bottom-up capacity modeling, per-service sizing, auto-scaling policy design, and reserved vs on-demand cost optimization for every layer of the MangaAssist stack.


Bottom-Up Capacity Model

Start from the one number that matters — 1M messages/day — and derive every service requirement mathematically.

flowchart TB
    subgraph Input["Traffic Input"]
        T["1M messages/day<br/>Peak 20K concurrent sessions"]
    end

    subgraph Step1["Step 1 — Request Rate"]
        RPS_AVG["Sustained: 11.6 req/s<br/>(1M / 86,400)"]
        RPS_PEAK["Peak: ~230 req/s<br/>(20K / avg_think_time)"]
    end

    subgraph Step2["Step 2 — Token Throughput"]
        TOK["Peak tokens/sec:<br/>Sonnet: 106,950 t/s (30%)<br/>Haiku: 249,550 t/s (70%)<br/>Total: 356,500 t/s"]
    end

    subgraph Step3["Step 3 — Per-Service Sizing"]
        direction TB
        BED["Bedrock Units<br/>Sonnet: 535 units<br/>Haiku: 268 units"]
        ECS["ECS Tasks<br/>29 tasks (buffered)<br/>2 vCPU / 4 GB each"]
        OS["OpenSearch OCU<br/>Base: 2 + Search: 6<br/>Total: 8 OCU"]
        DDB["DynamoDB<br/>On-Demand<br/>~575 RCU / ~276 WCU peak"]
        EC["ElastiCache<br/>2x r6g.large<br/>13 GB each"]
    end

    subgraph Step4["Step 4 — Cost Envelope"]
        COST["Monthly estimate<br/>per capacity plan"]
    end

    T --> Step1
    Step1 --> Step2
    Step2 --> Step3
    Step3 --> Step4

Detailed Derivation

Step Calculation Result
Daily messages Given 1,000,000
Sustained RPS 1,000,000 / 86,400 11.6 req/s
Peak concurrent sessions Given 20,000
Avg think time (sec between requests) 87s session / 3.2 requests ~27s
Peak RPS 20,000 / 27 ~740 active req/s (not all hit Bedrock simultaneously)
Bedrock peak RPS (adjusted for cache hits) 740 x 0.65 (35% cache hit) ~480 Bedrock req/s
Sonnet peak RPS 480 x 0.30 144 req/s
Haiku peak RPS 480 x 0.70 336 req/s
Sonnet peak tokens/min 144 x 1,550 tokens x 60 13,392,000 t/min
Haiku peak tokens/min 336 x 1,550 tokens x 60 31,248,000 t/min
Sonnet model units needed 13,392,000 / 50,000 x 1.25 buffer ~335 units
Haiku model units needed 31,248,000 / 100,000 x 1.25 buffer ~391 units

GenAI Traffic Patterns — Why Sizing Is Hard

GenAI traffic is fundamentally different from traditional web traffic. Understanding these patterns is critical for correct capacity planning.

Pattern 1 — Bursty and Correlated

When a new manga chapter drops (e.g., a popular series like One Piece), thousands of users ask similar questions within minutes. This creates correlated bursts where cache hit rate initially drops (novel queries) before recovering.

Normal traffic:     ▁▂▃▂▁▂▃▂▁▂▃▂  (smooth, predictable)
Manga release:      ▁▂▃▇█████▇▅▃▂  (sharp spike, slow decay)
Flash sale event:   ▁▁▁▁▁▂████████  (step function, sustained high)

Pattern 2 — Variable-Length Requests

Unlike REST APIs with predictable payload sizes, GenAI requests vary enormously in token count:

Request Type Input Tokens Output Tokens Total Duration
Simple FAQ ("Is this in stock?") 400 80 480 ~0.5s
Manga recommendation (multi-turn) 2,500 500 3,000 ~3s
Complex comparison (5 series) 3,500 900 4,400 ~5s
Order troubleshooting (with history) 4,000 600 4,600 ~4s

This means average resource consumption per request has high variance, making simple "requests per second" capacity models insufficient.

Pattern 3 — Time-of-Day Correlation (JP Market)

JST Hour:    06  08  10  12  14  16  18  20  22  00  02  04
Traffic %:   5%  8% 12% 15% 12% 15% 18% 25% 22% 10%  5%  3%
             ▂   ▃   ▅   ▆   ▅   ▆   ▇   █   ▇   ▄   ▂   ▁
                                        ↑ Prime time: 18:00-23:00 JST

Peak period (18:00-23:00 JST) carries ~65% of daily traffic in ~21% of the day. Scheduled scaling must pre-warm resources before 18:00 JST.


Per-Service Sizing Deep-Dive

ECS Fargate — Orchestrator Sizing

The orchestrator is I/O-bound, not CPU-bound. It spends most of its time waiting for Bedrock, OpenSearch, and DynamoDB responses. The key constraint is concurrent outbound connections, not raw CPU.

Factor Value Notes
vCPU per task 2 Sufficient for async I/O orchestration
Memory per task 4 GB Handles prompt assembly for 4K token context windows
Max concurrent Bedrock calls per task 10 Async HTTP client with connection pooling
Max concurrent OpenSearch calls per task 20 Faster than Bedrock, higher concurrency OK
Peak Bedrock RPS (after cache) 480 From capacity model above
Tasks needed for Bedrock concurrency 480 / 10 = 48 Primary sizing constraint
With 25% buffer 60 tasks Recommended minimum at peak
Minimum tasks (off-peak) 10 Floor for availability
Maximum tasks 200 Account-level ceiling

Fargate Spot consideration: The orchestrator is stateless (session state in DynamoDB, cache in Redis). Spot interruptions cause a single request to fail and retry. At 70% Spot / 30% On-Demand, savings are ~50% on compute.

DynamoDB — Session Storage

Access Pattern Frequency Size Consistency
Read session context Every request ~4 KB (eventually consistent) Eventually consistent
Read conversation history Every multi-turn request (70%) 1-10 KB (depends on turns) Eventually consistent
Write session update Every request ~2 KB Standard write
Write conversation turn Every request ~1 KB Standard write

On-Demand vs Provisioned:

Mode Peak Cost/Month Throttle Risk Management Overhead
On-Demand ~$600 Very low (auto-scales to 2x previous peak) Zero
Provisioned + Auto-Scaling ~$400 Medium (scaling lag on sudden spikes) High (tune targets, alarms)
Provisioned with Reserved ~$280 Medium Very high (1-year commit)

MangaAssist recommendation: On-Demand for session table. The 50% cost premium over reserved provisioned is worth the zero-throttle guarantee for user-facing chatbot latency. DynamoDB is typically <5% of total GenAI stack cost, so optimizing it aggressively yields small absolute savings with meaningful risk.

OpenSearch Serverless — OCU Planning

OpenSearch Serverless uses OCU (OpenSearch Compute Units). Each collection requires: - Base OCU: Always-on for indexing. Minimum 2 OCU. - Search OCU: Scales with query load. Minimum 2 OCU.

OCU Type Minimum Peak Estimate Auto-Scales Cost/OCU/Hour
Base (indexing) 2 4 Yes (with lag) $0.24
Search 2 12 Yes (faster) $0.24
Total at peak 4 16
Monthly cost (sustained peak) ~$2,803

Scaling behavior: OpenSearch Serverless auto-scales search OCUs, but there is a lag of 5-15 minutes to add capacity. During sudden spikes (manga release events), search latency degrades before OCUs scale. Mitigation: maintain higher base search OCU during known event windows.

Vector search specifics: Manga embedding search uses k-NN with HNSW index. Memory requirements scale with: - Embedding count: 500K manga products x 1536 dimensions x 4 bytes = ~3 GB - HNSW graph overhead: ~2x embedding size = ~6 GB - Total per search OCU: ~8 GB available, so minimum 2 search OCU just for the index

ElastiCache Redis — Semantic Cache Sizing

Component Memory per Entry Count Total Memory
Cached query embedding (1536-dim float32) 6 KB 120,000 entries ~703 MB
Cached response text 2 KB avg 120,000 entries ~234 MB
Session state (active) 4 KB 20,000 concurrent ~78 MB
Metadata + overhead 30% of data ~305 MB
Total ~1.32 GB

With r6g.large nodes (13 GB usable each), 2 nodes in a replication group provide: - 13 GB primary + 13 GB replica = 26 GB total - Utilization at ~1.32 GB = ~10% (room for growth) - Consider: r6g.medium (6.38 GB) would save ~40% and still provide headroom

Eviction policy: allkeys-lfu (Least Frequently Used) ensures popular manga queries stay cached while rarely-asked queries are evicted first.


Auto-Scaling Policy Design

Target Tracking vs Step Scaling — When to Use Each

Policy Type Best For Response Time Stability Use in MangaAssist
Target Tracking Steady metric to maintain (e.g., CPU = 65%) 1-3 minutes High (built-in cooldown) ECS primary policy
Step Scaling Multi-threshold response (different urgency levels) Configurable Medium (manual cooldowns) ECS secondary policy
Predictive Scaling Recurring patterns Pre-emptive High JP prime time pre-warming
Scheduled Scaling Known events Exact timing High Manga release events

ECS Auto-Scaling — Full Policy Configuration

flowchart TB
    subgraph Primary["Primary: Target Tracking"]
        TT["Target: 10 active Bedrock<br/>invocations per ECS task<br/>Scale-out cooldown: 60s<br/>Scale-in cooldown: 300s"]
    end

    subgraph Secondary["Secondary: Step Scaling (CPU)"]
        SS1["CPU 60-75%: +1 task"]
        SS2["CPU 75-90%: +3 tasks"]
        SS3["CPU >90%: +5 tasks"]
        SS4["CPU <40%: -1 task"]
        SS5["CPU <25%: -2 tasks"]
    end

    subgraph Predictive["Predictive: JP Prime Time"]
        PS["18:00 JST: scale to 50 tasks<br/>23:00 JST: allow scale-in<br/>Based on 4-week history"]
    end

    subgraph Scheduled["Scheduled: Known Events"]
        SC["Manga release: +30 min before<br/>scale to 100 tasks<br/>Flash sale: +1 hr before<br/>scale to 150 tasks"]
    end

    subgraph Result["Effective Desired Count"]
        R["MAX(target_tracking, step_scaling,<br/>predictive, scheduled, minimum)"]
    end

    Primary --> Result
    Secondary --> Result
    Predictive --> Result
    Scheduled --> Result

DynamoDB Auto-Scaling (if using provisioned mode)

Parameter Read Capacity Write Capacity
Target utilization 70% 70%
Minimum capacity 100 RCU 50 WCU
Maximum capacity 5,000 RCU 2,000 WCU
Scale-out cooldown 60 seconds 60 seconds
Scale-in cooldown 900 seconds (15 min) 900 seconds (15 min)

Note: DynamoDB auto-scaling reacts to ConsumedReadCapacityUnits / ProvisionedReadCapacityUnits. Scale-in cooldown is deliberately long (15 min) because DynamoDB can only decrease capacity 4 times per table per day by default.

ElastiCache Scaling Strategy

ElastiCache Redis does not auto-scale horizontally in the traditional sense. Scaling options:

Scaling Dimension Method Downtime Automation
Vertical (larger node) Modify replication group Brief failover CloudWatch alarm + Lambda
Horizontal read (more replicas) Add read replica None Manual / CloudFormation
Cluster mode (sharding) Enable cluster mode Migration required One-time architectural change

MangaAssist approach: Start with vertical scaling triggered by memory threshold. If memory > 80% for 15 minutes, a Lambda function initiates a node type upgrade. For read throughput, add replicas to distribute read load.


Reserved Capacity vs On-Demand — Cost Optimization Matrix

Service On-Demand Monthly 1-Year Reserved Savings Recommendation
Bedrock Sonnet (335 units) $15.4M N/A (Provisioned Throughput model) See PT pricing Use PT for baseline, on-demand for burst
Bedrock Haiku (391 units) $2.3M N/A (Provisioned Throughput model) See PT pricing Use PT for baseline, on-demand for burst
ECS Fargate (avg 30 tasks) ~$2,100 Savings Plan: ~$1,470 30% Compute Savings Plan for steady-state
OpenSearch (avg 8 OCU) ~$1,402 Reserved: ~$980 30% Reserve base OCU, on-demand for search scaling
ElastiCache (2x r6g.large) ~$99 Reserved: ~$63 36% Reserve — always-on service
DynamoDB (on-demand) ~$600 Reserved: ~$280 53% Stay on-demand — flexibility > savings

Strategy: Reserve always-on baseline capacity (ElastiCache, OpenSearch base OCU, ECS Compute Savings Plan). Use on-demand for variable components (DynamoDB, OpenSearch search OCU, ECS burst).


Python — TrafficPatternAnalyzer

"""
MangaAssist Capacity Planning — Traffic Pattern Analyzer
Analyzes historical traffic to identify patterns, seasonality, and anomalies
for accurate capacity planning.
"""

import math
import statistics
from dataclasses import dataclass, field
from typing import Dict, List, Optional, Tuple
from datetime import datetime, timedelta
from collections import defaultdict


@dataclass
class HourlyTraffic:
    """Traffic measurement for one hour."""
    timestamp: datetime
    request_count: int
    avg_latency_ms: float
    p95_latency_ms: float
    bedrock_invocations: int
    cache_hit_rate: float
    error_rate: float
    avg_input_tokens: float
    avg_output_tokens: float


@dataclass
class TrafficPattern:
    """Identified traffic pattern with characteristics."""
    pattern_type: str  # "daily_cycle", "weekly_cycle", "event_spike", "trend"
    description: str
    peak_hour_jst: int
    peak_multiplier: float  # peak / average ratio
    duration_hours: float
    predictability_score: float  # 0-1, how consistent this pattern is
    recommended_action: str


@dataclass
class CapacityRecommendation:
    """Per-service capacity recommendation based on traffic analysis."""
    service: str
    current_capacity: float
    recommended_min: float
    recommended_max: float
    recommended_baseline: float
    scaling_trigger: str
    estimated_savings_pct: float
    confidence: float  # 0-1


class TrafficPatternAnalyzer:
    """
    Analyzes MangaAssist traffic history to identify patterns and generate
    capacity recommendations.

    Uses 4 weeks of hourly data to detect:
    - Daily cycles (JP prime time pattern)
    - Weekly cycles (weekend vs weekday)
    - Event-driven spikes (manga releases, sales)
    - Growth trends
    """

    # JP prime time hours (JST)
    JP_PRIME_START = 18
    JP_PRIME_END = 23

    def __init__(self, history: List[HourlyTraffic]):
        self.history = sorted(history, key=lambda h: h.timestamp)
        self._daily_profiles: Dict[int, List[float]] = defaultdict(list)
        self._weekly_profiles: Dict[int, List[float]] = defaultdict(list)

    def _build_profiles(self) -> None:
        """Build hourly and daily-of-week traffic profiles."""
        for entry in self.history:
            hour = entry.timestamp.hour
            day_of_week = entry.timestamp.weekday()
            self._daily_profiles[hour].append(entry.request_count)
            self._weekly_profiles[day_of_week].append(entry.request_count)

    def analyze_daily_pattern(self) -> TrafficPattern:
        """Identify the daily traffic cycle (JP prime time pattern)."""
        self._build_profiles()

        hourly_avgs = {
            hour: statistics.mean(counts)
            for hour, counts in self._daily_profiles.items()
            if counts
        }

        if not hourly_avgs:
            return TrafficPattern(
                pattern_type="daily_cycle",
                description="Insufficient data",
                peak_hour_jst=20,
                peak_multiplier=1.0,
                duration_hours=0,
                predictability_score=0.0,
                recommended_action="Collect more data",
            )

        overall_avg = statistics.mean(hourly_avgs.values())
        peak_hour = max(hourly_avgs, key=hourly_avgs.get)
        peak_avg = hourly_avgs[peak_hour]
        peak_multiplier = peak_avg / overall_avg if overall_avg > 0 else 1.0

        # Calculate predictability: lower CV = more predictable
        peak_hour_values = self._daily_profiles.get(peak_hour, [])
        if len(peak_hour_values) > 1:
            cv = statistics.stdev(peak_hour_values) / statistics.mean(peak_hour_values)
            predictability = max(0, 1 - cv)
        else:
            predictability = 0.5

        # Determine prime-time duration
        prime_hours = [
            h for h, avg in hourly_avgs.items()
            if avg > overall_avg * 1.3
        ]

        return TrafficPattern(
            pattern_type="daily_cycle",
            description=(
                f"Peak traffic at {peak_hour}:00 JST with "
                f"{peak_multiplier:.1f}x average. Prime time spans "
                f"{len(prime_hours)} hours."
            ),
            peak_hour_jst=peak_hour,
            peak_multiplier=peak_multiplier,
            duration_hours=len(prime_hours),
            predictability_score=predictability,
            recommended_action=(
                f"Pre-scale resources 30 min before {peak_hour - 1}:00 JST. "
                f"Maintain {peak_multiplier:.0f}x baseline capacity during "
                f"prime time window."
            ),
        )

    def detect_event_spikes(
        self, std_dev_threshold: float = 3.0
    ) -> List[Dict]:
        """
        Detect anomalous traffic spikes that exceed normal patterns.
        These correspond to manga release events, flash sales, etc.
        """
        if len(self.history) < 48:
            return []

        self._build_profiles()

        spikes = []
        for entry in self.history:
            hour = entry.timestamp.hour
            hour_values = self._daily_profiles.get(hour, [])

            if len(hour_values) < 7:
                continue

            mean_val = statistics.mean(hour_values)
            std_val = statistics.stdev(hour_values) if len(hour_values) > 1 else 0

            if std_val > 0:
                z_score = (entry.request_count - mean_val) / std_val
            else:
                z_score = 0

            if z_score > std_dev_threshold:
                spikes.append({
                    "timestamp": entry.timestamp,
                    "request_count": entry.request_count,
                    "expected": mean_val,
                    "z_score": z_score,
                    "multiplier": entry.request_count / mean_val,
                    "cache_hit_rate": entry.cache_hit_rate,
                })

        return sorted(spikes, key=lambda s: s["z_score"], reverse=True)

    def analyze_token_patterns(self) -> Dict[str, float]:
        """Analyze token consumption patterns across time periods."""
        if not self.history:
            return {}

        prime_time = [
            e for e in self.history
            if self.JP_PRIME_START <= e.timestamp.hour < self.JP_PRIME_END
        ]
        off_peak = [
            e for e in self.history
            if not (self.JP_PRIME_START <= e.timestamp.hour < self.JP_PRIME_END)
        ]

        def avg_ratio(entries: List[HourlyTraffic]) -> float:
            if not entries:
                return 0.0
            ratios = [
                e.avg_input_tokens / e.avg_output_tokens
                for e in entries
                if e.avg_output_tokens > 0
            ]
            return statistics.mean(ratios) if ratios else 0.0

        return {
            "prime_time_input_output_ratio": avg_ratio(prime_time),
            "off_peak_input_output_ratio": avg_ratio(off_peak),
            "prime_time_avg_input_tokens": (
                statistics.mean([e.avg_input_tokens for e in prime_time])
                if prime_time else 0.0
            ),
            "off_peak_avg_input_tokens": (
                statistics.mean([e.avg_input_tokens for e in off_peak])
                if off_peak else 0.0
            ),
            "prime_time_cache_hit_rate": (
                statistics.mean([e.cache_hit_rate for e in prime_time])
                if prime_time else 0.0
            ),
            "off_peak_cache_hit_rate": (
                statistics.mean([e.cache_hit_rate for e in off_peak])
                if off_peak else 0.0
            ),
        }

    def generate_capacity_recommendations(
        self,
    ) -> List[CapacityRecommendation]:
        """
        Generate per-service capacity recommendations based on observed
        traffic patterns.
        """
        daily = self.analyze_daily_pattern()
        spikes = self.detect_event_spikes()
        token_patterns = self.analyze_token_patterns()

        peak_multiplier = daily.peak_multiplier
        max_spike_multiplier = (
            max(s["multiplier"] for s in spikes) if spikes else peak_multiplier
        )

        recommendations = []

        # ECS Fargate recommendation
        avg_rps = statistics.mean(
            [e.request_count / 3600 for e in self.history]
        ) if self.history else 11.6

        ecs_baseline = math.ceil(avg_rps / 10)  # 10 invocations/task
        ecs_peak = math.ceil(avg_rps * peak_multiplier / 10)
        ecs_spike = math.ceil(avg_rps * max_spike_multiplier / 10)

        recommendations.append(CapacityRecommendation(
            service="ECS Fargate",
            current_capacity=0,
            recommended_min=max(10, ecs_baseline),
            recommended_max=min(200, ecs_spike * 1.25),
            recommended_baseline=ecs_peak,
            scaling_trigger="Active Bedrock invocations > 10/task for 60s",
            estimated_savings_pct=0.0,
            confidence=daily.predictability_score,
        ))

        # OpenSearch recommendation
        search_rps = avg_rps * 0.6  # 60% need vector search
        search_ocu_baseline = max(2, math.ceil(search_rps / 50))
        search_ocu_peak = max(2, math.ceil(
            search_rps * peak_multiplier / 50
        ))

        recommendations.append(CapacityRecommendation(
            service="OpenSearch Serverless (Search OCU)",
            current_capacity=0,
            recommended_min=search_ocu_baseline,
            recommended_max=search_ocu_peak * 2,
            recommended_baseline=search_ocu_peak,
            scaling_trigger="SearchLatency P95 > 80ms for 5 min",
            estimated_savings_pct=0.0,
            confidence=daily.predictability_score * 0.8,
        ))

        # ElastiCache recommendation based on cache hit patterns
        cache_hit = token_patterns.get("prime_time_cache_hit_rate", 0.35)
        if cache_hit < 0.30:
            cache_action = (
                "Cache hit rate below 30%. Consider increasing cache "
                "capacity or adjusting similarity threshold."
            )
        else:
            cache_action = "Cache performance healthy."

        recommendations.append(CapacityRecommendation(
            service="ElastiCache Redis",
            current_capacity=0,
            recommended_min=2,
            recommended_max=4,
            recommended_baseline=2,
            scaling_trigger=f"Memory > 80% for 15 min. {cache_action}",
            estimated_savings_pct=0.0,
            confidence=0.9,
        ))

        return recommendations

    def print_analysis(self) -> None:
        """Print a comprehensive traffic analysis report."""
        daily = self.analyze_daily_pattern()
        spikes = self.detect_event_spikes()
        token_patterns = self.analyze_token_patterns()
        recommendations = self.generate_capacity_recommendations()

        print("=" * 70)
        print("MANGAASSIST TRAFFIC PATTERN ANALYSIS")
        print("=" * 70)

        print(f"\nDaily Pattern: {daily.description}")
        print(f"  Predictability: {daily.predictability_score:.0%}")
        print(f"  Action: {daily.recommended_action}")

        if spikes:
            print(f"\nDetected {len(spikes)} anomalous spikes:")
            for s in spikes[:5]:
                print(
                    f"  {s['timestamp']}: {s['request_count']:,} requests "
                    f"({s['multiplier']:.1f}x expected, z={s['z_score']:.1f})"
                )

        print(f"\nToken Patterns:")
        for key, val in token_patterns.items():
            print(f"  {key}: {val:.2f}")

        print(f"\nCapacity Recommendations:")
        for rec in recommendations:
            print(f"\n  {rec.service}:")
            print(f"    Baseline: {rec.recommended_baseline:.0f}")
            print(f"    Range: {rec.recommended_min:.0f} - {rec.recommended_max:.0f}")
            print(f"    Trigger: {rec.scaling_trigger}")
            print(f"    Confidence: {rec.confidence:.0%}")


# --- Usage ---
if __name__ == "__main__":
    # Simulate 1 week of hourly traffic data
    import random

    history = []
    base_time = datetime(2025, 3, 1, 0, 0, 0)

    for day in range(28):
        for hour in range(24):
            # Simulate JP traffic pattern
            if 18 <= hour <= 22:
                base_count = 80_000  # prime time
            elif 10 <= hour <= 17:
                base_count = 40_000  # daytime
            elif 7 <= hour <= 9:
                base_count = 25_000  # morning
            else:
                base_count = 10_000  # night

            # Add noise
            noise = random.gauss(0, base_count * 0.15)
            count = max(1000, int(base_count + noise))

            # Simulate a manga release spike on day 7 and 21
            if day in [7, 21] and 19 <= hour <= 21:
                count = int(count * 3.5)

            cache_hit = random.uniform(0.30, 0.45)

            history.append(HourlyTraffic(
                timestamp=base_time + timedelta(days=day, hours=hour),
                request_count=count,
                avg_latency_ms=random.uniform(200, 800),
                p95_latency_ms=random.uniform(500, 2000),
                bedrock_invocations=int(count * (1 - cache_hit)),
                cache_hit_rate=cache_hit,
                error_rate=random.uniform(0.001, 0.01),
                avg_input_tokens=random.uniform(900, 1500),
                avg_output_tokens=random.uniform(200, 500),
            ))

    analyzer = TrafficPatternAnalyzer(history)
    analyzer.print_analysis()

Python — AutoScalingPolicyGenerator

"""
MangaAssist Capacity Planning — Auto-Scaling Policy Generator
Generates CloudFormation/CDK-compatible auto-scaling policy configurations
for each service in the MangaAssist stack.
"""

import json
import math
from dataclasses import dataclass, field
from typing import Any, Dict, List, Optional
from datetime import datetime


@dataclass
class ScalingPolicy:
    """Represents a single auto-scaling policy."""
    policy_name: str
    policy_type: str  # "TargetTracking", "StepScaling", "Scheduled"
    service: str
    metric_namespace: str
    metric_name: str
    target_value: Optional[float] = None
    step_adjustments: Optional[List[Dict[str, Any]]] = None
    cooldown_scale_out: int = 60
    cooldown_scale_in: int = 300
    schedule_expression: Optional[str] = None
    min_capacity: Optional[int] = None
    max_capacity: Optional[int] = None
    dimensions: Dict[str, str] = field(default_factory=dict)


class AutoScalingPolicyGenerator:
    """
    Generates auto-scaling policies for every scalable service in the
    MangaAssist stack. Outputs configurations compatible with CloudFormation,
    CDK, or direct AWS API calls.
    """

    def __init__(
        self,
        cluster_name: str = "mangaassist-prod",
        service_name: str = "orchestrator",
        region: str = "ap-northeast-1",
    ):
        self.cluster_name = cluster_name
        self.service_name = service_name
        self.region = region
        self._policies: List[ScalingPolicy] = []

    def generate_ecs_target_tracking(self) -> ScalingPolicy:
        """
        Primary ECS policy: target tracking on custom metric
        'ActiveBedrockInvocationsPerTask'.
        """
        policy = ScalingPolicy(
            policy_name="ecs-bedrock-invocations-target",
            policy_type="TargetTracking",
            service="ECS",
            metric_namespace="MangaAssist/Orchestrator",
            metric_name="ActiveBedrockInvocationsPerTask",
            target_value=10.0,
            cooldown_scale_out=60,
            cooldown_scale_in=300,
            dimensions={
                "ClusterName": self.cluster_name,
                "ServiceName": self.service_name,
            },
        )
        self._policies.append(policy)
        return policy

    def generate_ecs_step_scaling(self) -> ScalingPolicy:
        """
        Secondary ECS policy: step scaling on CPU for emergency response.
        """
        policy = ScalingPolicy(
            policy_name="ecs-cpu-step-scaling",
            policy_type="StepScaling",
            service="ECS",
            metric_namespace="AWS/ECS",
            metric_name="CPUUtilization",
            step_adjustments=[
                {
                    "MetricIntervalLowerBound": 0,
                    "MetricIntervalUpperBound": 15,
                    "ScalingAdjustment": 1,
                },
                {
                    "MetricIntervalLowerBound": 15,
                    "MetricIntervalUpperBound": 30,
                    "ScalingAdjustment": 3,
                },
                {
                    "MetricIntervalLowerBound": 30,
                    "ScalingAdjustment": 5,
                },
            ],
            cooldown_scale_out=60,
            cooldown_scale_in=300,
            dimensions={
                "ClusterName": self.cluster_name,
                "ServiceName": self.service_name,
            },
        )
        self._policies.append(policy)
        return policy

    def generate_ecs_scheduled_scaling(self) -> List[ScalingPolicy]:
        """
        Scheduled scaling for JP prime time and known events.
        """
        policies = []

        # JP prime time: pre-warm at 17:30 JST (08:30 UTC)
        prime_time = ScalingPolicy(
            policy_name="ecs-jp-primetime-prewarm",
            policy_type="Scheduled",
            service="ECS",
            metric_namespace="",
            metric_name="",
            schedule_expression="cron(30 8 * * ? *)",  # 17:30 JST = 08:30 UTC
            min_capacity=50,
            max_capacity=200,
        )
        policies.append(prime_time)

        # End of prime time: allow scale-in at 23:30 JST (14:30 UTC)
        off_peak = ScalingPolicy(
            policy_name="ecs-jp-offpeak",
            policy_type="Scheduled",
            service="ECS",
            metric_namespace="",
            metric_name="",
            schedule_expression="cron(30 14 * * ? *)",  # 23:30 JST = 14:30 UTC
            min_capacity=10,
            max_capacity=200,
        )
        policies.append(off_peak)

        self._policies.extend(policies)
        return policies

    def generate_dynamodb_scaling(
        self,
        table_name: str = "mangaassist-sessions",
    ) -> List[ScalingPolicy]:
        """
        DynamoDB auto-scaling policies for provisioned mode.
        Only used if table is in provisioned mode (on-demand needs no policy).
        """
        policies = []

        for capacity_type, target, min_cap, max_cap in [
            ("Read", 70.0, 100, 5000),
            ("Write", 70.0, 50, 2000),
        ]:
            policy = ScalingPolicy(
                policy_name=f"dynamodb-{capacity_type.lower()}-target",
                policy_type="TargetTracking",
                service="DynamoDB",
                metric_namespace="AWS/DynamoDB",
                metric_name=f"Consumed{capacity_type}CapacityUnits",
                target_value=target,
                cooldown_scale_out=60,
                cooldown_scale_in=900,
                min_capacity=min_cap,
                max_capacity=max_cap,
                dimensions={"TableName": table_name},
            )
            policies.append(policy)

        self._policies.extend(policies)
        return policies

    def to_cloudformation(self) -> Dict[str, Any]:
        """
        Convert all generated policies to CloudFormation-compatible JSON.
        """
        resources = {}

        for policy in self._policies:
            safe_name = policy.policy_name.replace("-", "").title().replace(" ", "")

            if policy.policy_type == "TargetTracking":
                resources[f"{safe_name}Policy"] = {
                    "Type": "AWS::ApplicationAutoScaling::ScalingPolicy",
                    "Properties": {
                        "PolicyName": policy.policy_name,
                        "PolicyType": "TargetTrackingScaling",
                        "TargetTrackingScalingPolicyConfiguration": {
                            "TargetValue": policy.target_value,
                            "CustomizedMetricSpecification": {
                                "MetricName": policy.metric_name,
                                "Namespace": policy.metric_namespace,
                                "Statistic": "Average",
                                "Dimensions": [
                                    {"Name": k, "Value": v}
                                    for k, v in policy.dimensions.items()
                                ],
                            },
                            "ScaleOutCooldown": policy.cooldown_scale_out,
                            "ScaleInCooldown": policy.cooldown_scale_in,
                        },
                    },
                }

            elif policy.policy_type == "StepScaling":
                resources[f"{safe_name}Policy"] = {
                    "Type": "AWS::ApplicationAutoScaling::ScalingPolicy",
                    "Properties": {
                        "PolicyName": policy.policy_name,
                        "PolicyType": "StepScaling",
                        "StepScalingPolicyConfiguration": {
                            "AdjustmentType": "ChangeInCapacity",
                            "StepAdjustments": policy.step_adjustments,
                            "Cooldown": policy.cooldown_scale_out,
                        },
                    },
                }

            elif policy.policy_type == "Scheduled":
                resources[f"{safe_name}Action"] = {
                    "Type": "AWS::ApplicationAutoScaling::ScheduledAction",
                    "Properties": {
                        "ScheduledActionName": policy.policy_name,
                        "Schedule": policy.schedule_expression,
                        "ScalableTargetAction": {
                            "MinCapacity": policy.min_capacity,
                            "MaxCapacity": policy.max_capacity,
                        },
                    },
                }

        return {"Resources": resources}

    def print_summary(self) -> None:
        """Print a readable summary of all generated policies."""
        print("=" * 70)
        print("MANGAASSIST AUTO-SCALING POLICY SUMMARY")
        print("=" * 70)

        for policy in self._policies:
            print(f"\n--- {policy.policy_name} ---")
            print(f"  Type: {policy.policy_type}")
            print(f"  Service: {policy.service}")
            if policy.target_value:
                print(f"  Target: {policy.metric_name} = {policy.target_value}")
            if policy.step_adjustments:
                print(f"  Steps: {json.dumps(policy.step_adjustments, indent=4)}")
            if policy.schedule_expression:
                print(f"  Schedule: {policy.schedule_expression}")
                print(f"  Capacity: min={policy.min_capacity}, max={policy.max_capacity}")
            print(f"  Cooldowns: out={policy.cooldown_scale_out}s, "
                  f"in={policy.cooldown_scale_in}s")

    def generate_all(self) -> List[ScalingPolicy]:
        """Generate all auto-scaling policies for the MangaAssist stack."""
        self._policies.clear()
        self.generate_ecs_target_tracking()
        self.generate_ecs_step_scaling()
        self.generate_ecs_scheduled_scaling()
        self.generate_dynamodb_scaling()
        return self._policies


# --- Usage ---
if __name__ == "__main__":
    generator = AutoScalingPolicyGenerator(
        cluster_name="mangaassist-prod",
        service_name="orchestrator",
    )
    generator.generate_all()
    generator.print_summary()

    cfn = generator.to_cloudformation()
    print("\n\nCloudFormation Output:")
    print(json.dumps(cfn, indent=2))

Capacity Model Breakdown — Complete Mermaid Diagram

flowchart TB
    subgraph Traffic["1M Messages / Day"]
        T1["Sustained: 11.6 req/s"]
        T2["Peak: ~480 Bedrock req/s<br/>(after 35% cache hit)"]
    end

    subgraph ModelTier["Model Tiering (70/30)"]
        H["Haiku: 336 req/s<br/>Simple queries, intent routing"]
        S["Sonnet: 144 req/s<br/>Recommendations, reasoning"]
    end

    subgraph Tokens["Token Budget"]
        HT["Haiku: 520K tokens/min<br/>391 model units"]
        ST["Sonnet: 223K tokens/min<br/>335 model units"]
    end

    subgraph Compute["ECS Orchestration"]
        EC["60 tasks at peak<br/>2 vCPU / 4 GB each<br/>120 vCPU total"]
    end

    subgraph Data["Data Layer"]
        OS["OpenSearch: 8-16 OCU<br/>~288 vector searches/sec"]
        DDB["DynamoDB: On-Demand<br/>~575 RCU / ~276 WCU"]
        CACHE["Redis: 2x r6g.large<br/>120K cached entries<br/>35% hit rate"]
    end

    subgraph Cost["Monthly Cost Estimate"]
        C1["Bedrock: dominant cost<br/>(depends on PT pricing)"]
        C2["ECS: ~$4,200<br/>(with Savings Plan)"]
        C3["OpenSearch: ~$1,400-2,800"]
        C4["DynamoDB: ~$600"]
        C5["ElastiCache: ~$100"]
        C6["API Gateway: ~$100"]
    end

    T1 --> T2
    T2 --> ModelTier
    H --> HT
    S --> ST
    T2 --> Compute
    Compute --> Data
    Data --> Cost
    Tokens --> Cost

Key Takeaways

  1. Bottom-up modeling prevents surprises: Start from 1M messages/day, calculate per-service requirements mathematically. Every number in the capacity plan has a derivation, not a guess.

  2. GenAI traffic is not like web traffic: Bursty, correlated, variable-length requests demand auto-scaling policies that go beyond simple CPU thresholds. Custom metrics (Bedrock invocations per task) are essential.

  3. Layer your scaling policies: Target tracking for steady-state, step scaling for emergencies, predictive for daily cycles, scheduled for known events. Each layer catches what the others miss.

  4. Right-size DynamoDB as on-demand: For user-facing chatbot session storage, the on-demand cost premium is insurance against throttling. Reserve your optimization effort for the services that dominate cost (Bedrock, ECS).

  5. Reserved for baseline, on-demand for burst: Savings Plans and reserved instances for always-on components, on-demand for elastic scaling. This typically saves 25-35% without sacrificing elasticity.