LOCAL PREVIEW View on GitHub

Deployment Pattern Selection

MangaAssist context: JP Manga store chatbot on AWS — Bedrock Claude 3 (Sonnet at $3/$15 per 1M tokens input/output, Haiku at $0.25/$1.25), OpenSearch Serverless (vector store), DynamoDB (sessions/products), ECS Fargate (orchestrator), API Gateway WebSocket, ElastiCache Redis. Target: useful answer in under 3 seconds, 1M messages/day scale.


Skill Mapping

Dimension Value
Certification AWS Certified AI Practitioner (AIF-C01)
Task 2.2 — Select and implement model deployment strategies
Skill 2.2.1 — Deploy FMs using Lambda on-demand, Bedrock provisioned throughput, SageMaker hybrid
This File Choosing patterns based on latency/throughput/cost, SageMaker auto-scaling

Skill Scope

This file covers the practical selection process for FM deployment patterns. We examine how to evaluate latency requirements, throughput demands, and cost constraints to choose the right deployment pattern. Deep focus on SageMaker auto-scaling configuration, Bedrock throughput planning, and Lambda concurrency tuning. Includes production-grade scoring algorithms and real-time pattern switching for MangaAssist.


Mind Map

mindmap
  root((Deployment Pattern Selection))
    Latency Analysis
      P50 vs P99 targets
      Cold start impact
      Network hops
      Model inference time
      Response streaming
      End-to-end budget
    Throughput Planning
      Requests per second
      Tokens per minute
      Concurrent sessions
      Burst capacity
      Sustained load
      Queue depth
    Cost Optimization
      Per-token pricing
      Instance reservations
      Spot instances
      Right-sizing
      Time-of-day scheduling
      Budget guardrails
    SageMaker Auto-Scaling
      Target tracking
      Step scaling
      Scheduled scaling
      Scale-in cooldown
      Scale-out speed
      Custom metrics
    Pattern Switching
      Real-time routing
      Health-based failover
      Cost-based switching
      Latency-based switching
      A/B deployment
      Canary releases
    Monitoring
      CloudWatch metrics
      Custom dashboards
      Alerting thresholds
      Cost anomaly detection
      Latency percentiles
      Error rate tracking

1. Latency-Based Pattern Selection

1.1 Latency Budget Decomposition

Every MangaAssist request must complete within 3 seconds end-to-end. Understanding where time is spent determines which deployment pattern can meet the budget.

graph LR
    subgraph "3-Second Latency Budget"
        A[WebSocket<br/>Receive<br/>50ms] --> B[Auth +<br/>Session<br/>100ms]
        B --> C[RAG<br/>Retrieval<br/>200ms]
        C --> D[FM<br/>Inference<br/>1500ms MAX]
        D --> E[Post-<br/>Processing<br/>100ms]
        E --> F[WebSocket<br/>Send<br/>50ms]
    end

    style D fill:#ffcdd2

1.2 Latency Analyzer

"""
MangaAssist — Latency-based deployment pattern selector.
Analyzes historical latency data to recommend the optimal pattern.
"""

import time
import math
import logging
import statistics
from dataclasses import dataclass, field
from enum import Enum
from typing import Optional
from collections import deque

logger = logging.getLogger(__name__)


class DeploymentPattern(Enum):
    """Available deployment patterns."""
    LAMBDA_ON_DEMAND = "lambda_on_demand"
    BEDROCK_PROVISIONED = "bedrock_provisioned"
    SAGEMAKER_REALTIME = "sagemaker_realtime"
    SAGEMAKER_SERVERLESS = "sagemaker_serverless"


@dataclass
class LatencyProfile:
    """Latency profile for a deployment pattern."""
    pattern: DeploymentPattern
    p50_ms: float
    p90_ms: float
    p99_ms: float
    cold_start_ms: float
    cold_start_probability: float  # 0.0 - 1.0
    warmup_invocations: int  # requests needed to warm up

    @property
    def effective_p99_ms(self) -> float:
        """P99 latency accounting for cold starts."""
        if self.cold_start_probability >= 0.01:  # >1% cold starts
            return max(self.p99_ms, self.cold_start_ms)
        return self.p99_ms


# Reference latency profiles (measured for MangaAssist)
LATENCY_PROFILES = {
    DeploymentPattern.LAMBDA_ON_DEMAND: LatencyProfile(
        pattern=DeploymentPattern.LAMBDA_ON_DEMAND,
        p50_ms=1200,
        p90_ms=2000,
        p99_ms=3500,
        cold_start_ms=4500,
        cold_start_probability=0.05,
        warmup_invocations=0,
    ),
    DeploymentPattern.BEDROCK_PROVISIONED: LatencyProfile(
        pattern=DeploymentPattern.BEDROCK_PROVISIONED,
        p50_ms=600,
        p90_ms=900,
        p99_ms=1400,
        cold_start_ms=0,
        cold_start_probability=0.0,
        warmup_invocations=0,
    ),
    DeploymentPattern.SAGEMAKER_REALTIME: LatencyProfile(
        pattern=DeploymentPattern.SAGEMAKER_REALTIME,
        p50_ms=500,
        p90_ms=800,
        p99_ms=1200,
        cold_start_ms=0,
        cold_start_probability=0.0,
        warmup_invocations=0,
    ),
    DeploymentPattern.SAGEMAKER_SERVERLESS: LatencyProfile(
        pattern=DeploymentPattern.SAGEMAKER_SERVERLESS,
        p50_ms=700,
        p90_ms=1500,
        p99_ms=5000,
        cold_start_ms=8000,
        cold_start_probability=0.10,
        warmup_invocations=0,
    ),
}


class LatencyAnalyzer:
    """
    Analyzes latency requirements and recommends deployment patterns.
    Tracks real-time latency measurements for dynamic re-evaluation.
    """

    def __init__(self, target_p99_ms: float = 2500.0, window_size: int = 1000):
        self.target_p99_ms = target_p99_ms
        self._measurements: dict[DeploymentPattern, deque] = {
            pattern: deque(maxlen=window_size)
            for pattern in DeploymentPattern
        }

    def record_latency(self, pattern: DeploymentPattern, latency_ms: float) -> None:
        """Record an observed latency measurement."""
        self._measurements[pattern].append(latency_ms)

    def get_observed_percentile(
        self, pattern: DeploymentPattern, percentile: float
    ) -> Optional[float]:
        """Get observed latency at a given percentile (0-100)."""
        measurements = list(self._measurements[pattern])
        if len(measurements) < 10:
            return None

        measurements.sort()
        index = int(len(measurements) * percentile / 100)
        return measurements[min(index, len(measurements) - 1)]

    def recommend_pattern(
        self,
        max_p99_ms: float,
        min_throughput_rps: float,
        max_monthly_cost_usd: float,
        needs_custom_model: bool = False,
    ) -> list[dict]:
        """
        Recommend deployment patterns ranked by fitness.
        Returns scored recommendations with reasoning.
        """
        candidates = []

        for pattern, profile in LATENCY_PROFILES.items():
            # Hard filters
            if needs_custom_model and pattern != DeploymentPattern.SAGEMAKER_REALTIME:
                continue

            if profile.effective_p99_ms > max_p99_ms * 1.5:
                continue  # Allow some headroom but skip way-too-slow options

            # Score components (0-100 each)
            latency_score = self._score_latency(profile, max_p99_ms)
            throughput_score = self._score_throughput(pattern, min_throughput_rps)
            cost_score = self._score_cost(pattern, min_throughput_rps, max_monthly_cost_usd)
            ops_score = self._score_operational_complexity(pattern)

            # Weighted total
            total_score = (
                latency_score * 0.35
                + throughput_score * 0.25
                + cost_score * 0.25
                + ops_score * 0.15
            )

            candidates.append({
                "pattern": pattern.value,
                "total_score": round(total_score, 1),
                "latency_score": round(latency_score, 1),
                "throughput_score": round(throughput_score, 1),
                "cost_score": round(cost_score, 1),
                "ops_score": round(ops_score, 1),
                "p99_ms": profile.effective_p99_ms,
                "meets_latency": profile.effective_p99_ms <= max_p99_ms,
            })

        # Sort by total score descending
        candidates.sort(key=lambda x: x["total_score"], reverse=True)
        return candidates

    def _score_latency(self, profile: LatencyProfile, target_p99: float) -> float:
        """Score latency fitness (0-100). Higher = better latency."""
        effective = profile.effective_p99_ms
        if effective <= target_p99 * 0.5:
            return 100.0
        if effective <= target_p99:
            return 100.0 * (1.0 - (effective - target_p99 * 0.5) / (target_p99 * 0.5))
        # Exceeds target: penalize heavily
        overshoot = (effective - target_p99) / target_p99
        return max(0.0, 50.0 * (1.0 - overshoot))

    def _score_throughput(
        self, pattern: DeploymentPattern, target_rps: float
    ) -> float:
        """Score throughput capability (0-100)."""
        max_rps = {
            DeploymentPattern.LAMBDA_ON_DEMAND: 1000,
            DeploymentPattern.BEDROCK_PROVISIONED: 500,
            DeploymentPattern.SAGEMAKER_REALTIME: 200,
            DeploymentPattern.SAGEMAKER_SERVERLESS: 50,
        }
        available = max_rps.get(pattern, 100)
        if available >= target_rps * 2:
            return 100.0
        if available >= target_rps:
            return 70.0 + 30.0 * (available - target_rps) / target_rps
        return max(0.0, 70.0 * available / target_rps)

    def _score_cost(
        self,
        pattern: DeploymentPattern,
        rps: float,
        max_monthly: float,
    ) -> float:
        """Score cost efficiency (0-100). Higher = cheaper."""
        # Estimated monthly cost at given RPS
        monthly_estimates = {
            DeploymentPattern.LAMBDA_ON_DEMAND: rps * 86400 * 30 * 0.0005,
            DeploymentPattern.BEDROCK_PROVISIONED: 10800 + rps * 86400 * 30 * 0.0002,
            DeploymentPattern.SAGEMAKER_REALTIME: 1400 * 30 + rps * 86400 * 30 * 0.0001,
            DeploymentPattern.SAGEMAKER_SERVERLESS: rps * 86400 * 30 * 0.0008,
        }
        estimated = monthly_estimates.get(pattern, max_monthly)
        if estimated <= max_monthly * 0.3:
            return 100.0
        if estimated <= max_monthly:
            ratio = (estimated - max_monthly * 0.3) / (max_monthly * 0.7)
            return 100.0 - 60.0 * ratio
        # Over budget
        overshoot = (estimated - max_monthly) / max_monthly
        return max(0.0, 40.0 * (1.0 - overshoot))

    def _score_operational_complexity(self, pattern: DeploymentPattern) -> float:
        """Score operational simplicity (0-100). Higher = simpler."""
        scores = {
            DeploymentPattern.LAMBDA_ON_DEMAND: 95,
            DeploymentPattern.BEDROCK_PROVISIONED: 85,
            DeploymentPattern.SAGEMAKER_REALTIME: 40,
            DeploymentPattern.SAGEMAKER_SERVERLESS: 70,
        }
        return float(scores.get(pattern, 50))

2. Throughput-Based Selection

2.1 Throughput Requirements Analysis

graph TD
    A[MangaAssist<br/>1M msgs/day] --> B[Calculate RPS]
    B --> C["Average: 11.6 RPS<br/>(1M / 86400)"]
    C --> D[Peak Multiplier<br/>3-5x average]
    D --> E["Peak: 35-58 RPS"]

    E --> F{Sustained or<br/>Bursty?}
    F -- "Sustained<br/>(18:00-24:00)" --> G[Provisioned Throughput<br/>Reserved capacity]
    F -- "Bursty<br/>(flash sales)" --> H[Lambda On-Demand<br/>Elastic scaling]

    G --> I[Size Model Units]
    H --> J[Set Concurrency Limits]

    style A fill:#e3f2fd
    style E fill:#fff9c4
    style G fill:#c8e6c9
    style H fill:#fff3e0

2.2 Throughput Calculator

"""
MangaAssist — Throughput calculator for deployment pattern sizing.
Determines required capacity based on traffic patterns.
"""

import math
from dataclasses import dataclass
from typing import Optional


@dataclass
class TrafficProfile:
    """Traffic profile for capacity planning."""
    daily_messages: int
    peak_multiplier: float  # Peak RPS / Average RPS
    peak_hours: int  # Hours of peak traffic per day
    avg_input_tokens: int
    avg_output_tokens: int
    burst_factor: float  # Max instantaneous / Peak RPS


@dataclass
class CapacityRequirement:
    """Computed capacity requirement."""
    avg_rps: float
    peak_rps: float
    burst_rps: float
    peak_tokens_per_minute: int
    sustained_tokens_per_hour: int
    recommended_pattern: str
    sizing_details: dict


class ThroughputCalculator:
    """
    Calculates throughput requirements and recommends sizing
    for each deployment pattern.
    """

    # Bedrock model unit capacity estimates (tokens/min per model unit)
    BEDROCK_MU_CAPACITY = {
        "claude-3-sonnet": 40000,  # tokens/min per model unit
        "claude-3-haiku": 80000,
    }

    # SageMaker instance throughput estimates (requests/sec)
    SAGEMAKER_INSTANCE_RPS = {
        "ml.g5.xlarge": 5,
        "ml.g5.2xlarge": 8,
        "ml.g5.12xlarge": 25,
        "ml.p4d.24xlarge": 50,
    }

    def calculate(self, profile: TrafficProfile) -> CapacityRequirement:
        """Calculate capacity requirements from a traffic profile."""
        avg_rps = profile.daily_messages / 86400
        peak_rps = avg_rps * profile.peak_multiplier
        burst_rps = peak_rps * profile.burst_factor

        # Token calculations
        avg_tokens_per_request = (
            profile.avg_input_tokens + profile.avg_output_tokens
        )
        peak_tokens_per_minute = int(peak_rps * 60 * avg_tokens_per_request)
        sustained_tokens_per_hour = int(
            peak_rps * 3600 * avg_tokens_per_request
        )

        # Determine recommended pattern
        pattern, sizing = self._recommend(
            peak_rps, burst_rps, peak_tokens_per_minute, profile
        )

        return CapacityRequirement(
            avg_rps=round(avg_rps, 2),
            peak_rps=round(peak_rps, 2),
            burst_rps=round(burst_rps, 2),
            peak_tokens_per_minute=peak_tokens_per_minute,
            sustained_tokens_per_hour=sustained_tokens_per_hour,
            recommended_pattern=pattern,
            sizing_details=sizing,
        )

    def _recommend(
        self,
        peak_rps: float,
        burst_rps: float,
        peak_tpm: int,
        profile: TrafficProfile,
    ) -> tuple[str, dict]:
        """Recommend pattern and sizing based on throughput needs."""
        # Check if Lambda can handle it
        lambda_max_concurrency = 1000
        lambda_avg_duration_s = 2.0
        lambda_capacity_rps = lambda_max_concurrency / lambda_avg_duration_s

        if burst_rps < lambda_capacity_rps * 0.7:
            lambda_concurrency = math.ceil(burst_rps * lambda_avg_duration_s * 1.2)
            return "lambda_on_demand", {
                "reserved_concurrency": min(lambda_concurrency, 1000),
                "provisioned_concurrency": math.ceil(peak_rps * lambda_avg_duration_s),
                "memory_mb": 1024,
                "timeout_s": 30,
            }

        # Bedrock provisioned sizing
        model_units_sonnet = math.ceil(
            peak_tpm / self.BEDROCK_MU_CAPACITY["claude-3-sonnet"]
        )
        model_units_haiku = math.ceil(
            peak_tpm / self.BEDROCK_MU_CAPACITY["claude-3-haiku"]
        )

        if model_units_sonnet <= 10:
            return "bedrock_provisioned", {
                "sonnet_model_units": model_units_sonnet,
                "haiku_model_units": model_units_haiku,
                "commitment": "OneMonth" if peak_rps > 20 else "NoCommitment",
                "estimated_hourly_cost": model_units_sonnet * 23 + model_units_haiku * 5,
            }

        # SageMaker for very high throughput
        best_instance = "ml.g5.2xlarge"
        instance_rps = self.SAGEMAKER_INSTANCE_RPS[best_instance]
        instance_count = math.ceil(burst_rps / instance_rps)

        return "sagemaker_realtime", {
            "instance_type": best_instance,
            "min_instances": math.ceil(peak_rps / instance_rps),
            "max_instances": instance_count + 2,
            "auto_scaling_target": instance_rps,
        }


# MangaAssist throughput calculation
manga_profile = TrafficProfile(
    daily_messages=1_000_000,
    peak_multiplier=3.5,
    peak_hours=6,
    avg_input_tokens=500,
    avg_output_tokens=300,
    burst_factor=2.0,
)

calculator = ThroughputCalculator()
requirement = calculator.calculate(manga_profile)
# Result: peak ~40 RPS, burst ~80 RPS, ~1.9M tokens/min peak
# Recommendation: bedrock_provisioned with 2 Sonnet MUs + 1 Haiku MU

3. Cost-Based Selection

3.1 Total Cost of Ownership Model

"""
MangaAssist — Total Cost of Ownership (TCO) calculator.
Compares deployment patterns across all cost dimensions.
"""

from dataclasses import dataclass
from typing import Optional


@dataclass
class TCOInputs:
    """Inputs for TCO calculation."""
    daily_messages: int
    avg_input_tokens: int
    avg_output_tokens: int
    peak_hours_per_day: int
    off_peak_ratio: float  # off-peak traffic as fraction of peak
    engineer_hourly_rate: float  # USD
    ops_hours_per_month: float  # hours spent on operations


@dataclass
class TCOResult:
    """TCO calculation result."""
    pattern: str
    compute_cost: float
    token_cost: float
    ops_cost: float
    total_monthly: float
    cost_per_message: float
    breakdown: dict


class TCOCalculator:
    """
    Calculates total cost of ownership for each deployment pattern,
    including compute, API, and operational costs.
    """

    def calculate_all(self, inputs: TCOInputs) -> list[TCOResult]:
        """Calculate TCO for all deployment patterns."""
        results = [
            self._calc_lambda_ondemand(inputs),
            self._calc_bedrock_provisioned(inputs),
            self._calc_sagemaker(inputs),
            self._calc_hybrid(inputs),
        ]
        results.sort(key=lambda r: r.total_monthly)
        return results

    def _calc_lambda_ondemand(self, inputs: TCOInputs) -> TCOResult:
        """Lambda + Bedrock on-demand TCO."""
        monthly_messages = inputs.daily_messages * 30

        # Assume 70% Haiku, 30% Sonnet split
        haiku_msgs = monthly_messages * 0.7
        sonnet_msgs = monthly_messages * 0.3

        haiku_token_cost = (
            haiku_msgs * inputs.avg_input_tokens / 1_000_000 * 0.25
            + haiku_msgs * inputs.avg_output_tokens / 1_000_000 * 1.25
        )
        sonnet_token_cost = (
            sonnet_msgs * inputs.avg_input_tokens / 1_000_000 * 3.0
            + sonnet_msgs * inputs.avg_output_tokens / 1_000_000 * 15.0
        )
        token_cost = haiku_token_cost + sonnet_token_cost

        # Lambda compute: ~$0.0000167/GB-s * 1GB * 2s per invocation
        lambda_cost = monthly_messages * 0.0000167 * 1.0 * 2.0
        # Lambda requests: $0.20 per 1M
        lambda_request_cost = monthly_messages / 1_000_000 * 0.20
        compute_cost = lambda_cost + lambda_request_cost

        # Low ops: ~2 hrs/month
        ops_cost = 2 * inputs.engineer_hourly_rate

        total = compute_cost + token_cost + ops_cost

        return TCOResult(
            pattern="lambda_on_demand",
            compute_cost=round(compute_cost, 2),
            token_cost=round(token_cost, 2),
            ops_cost=round(ops_cost, 2),
            total_monthly=round(total, 2),
            cost_per_message=round(total / monthly_messages, 6),
            breakdown={
                "haiku_token_cost": round(haiku_token_cost, 2),
                "sonnet_token_cost": round(sonnet_token_cost, 2),
                "lambda_compute": round(lambda_cost, 2),
                "lambda_requests": round(lambda_request_cost, 2),
            },
        )

    def _calc_bedrock_provisioned(self, inputs: TCOInputs) -> TCOResult:
        """Bedrock provisioned throughput TCO."""
        # 2 Sonnet MUs during peak (6 hrs), 1 Haiku MU during business (12 hrs)
        sonnet_hourly = 23.0 * 2  # 2 model units
        haiku_hourly = 5.0 * 1
        off_peak_haiku_hourly = 5.0 * 1

        daily_compute = (
            sonnet_hourly * inputs.peak_hours_per_day
            + haiku_hourly * (12 - inputs.peak_hours_per_day)
        )
        # Off-peak: on-demand (no provisioned)
        off_peak_hours = 24 - 12
        daily_off_peak_msgs = (
            inputs.daily_messages * inputs.off_peak_ratio * off_peak_hours / 24
        )
        off_peak_token_cost_daily = (
            daily_off_peak_msgs * inputs.avg_input_tokens / 1_000_000 * 0.25
            + daily_off_peak_msgs * inputs.avg_output_tokens / 1_000_000 * 1.25
        )

        compute_cost = daily_compute * 30
        token_cost = off_peak_token_cost_daily * 30
        ops_cost = 4 * inputs.engineer_hourly_rate  # 4 hrs/month

        total = compute_cost + token_cost + ops_cost

        return TCOResult(
            pattern="bedrock_provisioned",
            compute_cost=round(compute_cost, 2),
            token_cost=round(token_cost, 2),
            ops_cost=round(ops_cost, 2),
            total_monthly=round(total, 2),
            cost_per_message=round(total / (inputs.daily_messages * 30), 6),
            breakdown={
                "sonnet_provisioned_daily": round(sonnet_hourly * inputs.peak_hours_per_day, 2),
                "haiku_provisioned_daily": round(haiku_hourly * (12 - inputs.peak_hours_per_day), 2),
                "off_peak_on_demand_daily": round(off_peak_token_cost_daily, 2),
            },
        )

    def _calc_sagemaker(self, inputs: TCOInputs) -> TCOResult:
        """SageMaker real-time endpoint TCO."""
        # 2x ml.g5.2xlarge 24/7 + auto-scaling to 4 during peak
        base_instance_cost_hr = 1.89
        base_count = 2
        peak_count = 4

        daily_base_hours = (24 - inputs.peak_hours_per_day) * base_count
        daily_peak_hours = inputs.peak_hours_per_day * peak_count
        daily_instance_cost = (daily_base_hours + daily_peak_hours) * base_instance_cost_hr

        compute_cost = daily_instance_cost * 30
        token_cost = 0  # No per-token cost for self-hosted
        ops_cost = 12 * inputs.engineer_hourly_rate  # 12 hrs/month (highest)

        total = compute_cost + token_cost + ops_cost

        return TCOResult(
            pattern="sagemaker_realtime",
            compute_cost=round(compute_cost, 2),
            token_cost=round(token_cost, 2),
            ops_cost=round(ops_cost, 2),
            total_monthly=round(total, 2),
            cost_per_message=round(total / (inputs.daily_messages * 30), 6),
            breakdown={
                "base_instance_hours": round(daily_base_hours * 30, 1),
                "peak_instance_hours": round(daily_peak_hours * 30, 1),
                "total_instance_hours": round((daily_base_hours + daily_peak_hours) * 30, 1),
            },
        )

    def _calc_hybrid(self, inputs: TCOInputs) -> TCOResult:
        """Hybrid deployment TCO (MangaAssist production)."""
        monthly_msgs = inputs.daily_messages * 30

        # Peak: 40% messages via provisioned Sonnet
        provisioned_msgs = monthly_msgs * 0.25
        # Business: 35% messages via on-demand Haiku
        haiku_msgs = monthly_msgs * 0.50
        # Complex: 15% messages via on-demand Sonnet
        sonnet_od_msgs = monthly_msgs * 0.15
        # Manga-specific: 10% via SageMaker
        sagemaker_msgs = monthly_msgs * 0.10

        # Provisioned compute (1 Sonnet MU for 6 hrs peak)
        provisioned_compute = 23.0 * 1 * inputs.peak_hours_per_day * 30

        # On-demand token costs
        haiku_cost = (
            haiku_msgs * inputs.avg_input_tokens / 1_000_000 * 0.25
            + haiku_msgs * inputs.avg_output_tokens / 1_000_000 * 1.25
        )
        sonnet_od_cost = (
            sonnet_od_msgs * inputs.avg_input_tokens / 1_000_000 * 3.0
            + sonnet_od_msgs * inputs.avg_output_tokens / 1_000_000 * 15.0
        )

        # SageMaker: 1x ml.g5.2xlarge 24/7
        sagemaker_cost = 1.89 * 24 * 30

        compute_cost = provisioned_compute + sagemaker_cost
        token_cost = haiku_cost + sonnet_od_cost
        ops_cost = 8 * inputs.engineer_hourly_rate  # 8 hrs/month

        total = compute_cost + token_cost + ops_cost

        return TCOResult(
            pattern="hybrid",
            compute_cost=round(compute_cost, 2),
            token_cost=round(token_cost, 2),
            ops_cost=round(ops_cost, 2),
            total_monthly=round(total, 2),
            cost_per_message=round(total / monthly_msgs, 6),
            breakdown={
                "provisioned_compute": round(provisioned_compute, 2),
                "haiku_on_demand": round(haiku_cost, 2),
                "sonnet_on_demand": round(sonnet_od_cost, 2),
                "sagemaker_instance": round(sagemaker_cost, 2),
            },
        )

4. SageMaker Auto-Scaling Deep Dive

4.1 Auto-Scaling Architecture

graph TB
    subgraph "SageMaker Auto-Scaling"
        CW[CloudWatch Metrics] --> ASG[Application<br/>Auto Scaling]
        ASG --> EP[SageMaker Endpoint<br/>1-8 instances]

        subgraph "Scaling Policies"
            TT[Target Tracking<br/>InvocationsPerInstance = 10]
            SS[Step Scaling<br/>ModelLatency > 2000ms]
            SC[Scheduled Scaling<br/>Peak hours: min=3]
        end

        TT --> ASG
        SS --> ASG
        SC --> ASG
    end

    subgraph "Metrics Pipeline"
        EP --> M1[InvocationsPerInstance]
        EP --> M2[ModelLatency]
        EP --> M3[CPUUtilization]
        EP --> M4[GPUMemoryUtilization]
        M1 --> CW
        M2 --> CW
        M3 --> CW
        M4 --> CW
    end

    style ASG fill:#fff9c4
    style EP fill:#c8e6c9

4.2 Production Auto-Scaling Configuration

"""
MangaAssist — SageMaker auto-scaling configuration.
Implements target tracking, step scaling, and scheduled scaling.
"""

import json
import logging
import boto3
from datetime import datetime, timezone, timedelta
from typing import Optional

logger = logging.getLogger(__name__)


class SageMakerAutoScaler:
    """
    Configures comprehensive auto-scaling for SageMaker endpoints.
    Combines target tracking, step scaling, and scheduled actions.
    """

    def __init__(self, region: str = "ap-northeast-1"):
        self.region = region
        self.autoscaling = boto3.client(
            "application-autoscaling", region_name=region
        )
        self.sm_client = boto3.client("sagemaker", region_name=region)

    def setup_complete_scaling(
        self,
        endpoint_name: str,
        variant_name: str = "AllTraffic",
        min_instances: int = 1,
        max_instances: int = 8,
    ) -> dict:
        """
        Set up complete auto-scaling with all three policy types.
        Returns policy ARNs for monitoring.
        """
        resource_id = f"endpoint/{endpoint_name}/variant/{variant_name}"

        # 1. Register scalable target
        self.autoscaling.register_scalable_target(
            ServiceNamespace="sagemaker",
            ResourceId=resource_id,
            ScalableDimension="sagemaker:variant:DesiredInstanceCount",
            MinCapacity=min_instances,
            MaxCapacity=max_instances,
        )

        # 2. Target tracking: maintain N invocations per instance
        tt_policy = self._create_target_tracking_policy(
            endpoint_name, variant_name, resource_id,
            target_invocations=10,
        )

        # 3. Step scaling: react to latency spikes
        step_policy = self._create_step_scaling_policy(
            endpoint_name, variant_name, resource_id
        )

        # 4. Scheduled scaling: pre-scale for known peaks
        schedules = self._create_scheduled_scaling(
            endpoint_name, resource_id, min_instances, max_instances
        )

        return {
            "target_tracking_policy": tt_policy,
            "step_scaling_policy": step_policy,
            "scheduled_actions": schedules,
        }

    def _create_target_tracking_policy(
        self,
        endpoint_name: str,
        variant_name: str,
        resource_id: str,
        target_invocations: int = 10,
    ) -> str:
        """Create target tracking scaling policy."""
        response = self.autoscaling.put_scaling_policy(
            PolicyName=f"{endpoint_name}-target-tracking",
            ServiceNamespace="sagemaker",
            ResourceId=resource_id,
            ScalableDimension="sagemaker:variant:DesiredInstanceCount",
            PolicyType="TargetTrackingScaling",
            TargetTrackingScalingPolicyConfiguration={
                "TargetValue": float(target_invocations),
                "CustomizedMetricSpecification": {
                    "MetricName": "InvocationsPerInstance",
                    "Namespace": "AWS/SageMaker",
                    "Dimensions": [
                        {"Name": "EndpointName", "Value": endpoint_name},
                        {"Name": "VariantName", "Value": variant_name},
                    ],
                    "Statistic": "Average",
                },
                "ScaleInCooldown": 300,   # 5 min cool-down before scale-in
                "ScaleOutCooldown": 60,   # 1 min cool-down before scale-out
            },
        )
        arn = response["PolicyARN"]
        logger.info(f"Target tracking policy created: {arn}")
        return arn

    def _create_step_scaling_policy(
        self,
        endpoint_name: str,
        variant_name: str,
        resource_id: str,
    ) -> str:
        """Create step scaling policy triggered by high latency."""
        # Create CloudWatch alarm for latency
        cw = boto3.client("cloudwatch", region_name=self.region)

        # Step scaling policy
        response = self.autoscaling.put_scaling_policy(
            PolicyName=f"{endpoint_name}-latency-step",
            ServiceNamespace="sagemaker",
            ResourceId=resource_id,
            ScalableDimension="sagemaker:variant:DesiredInstanceCount",
            PolicyType="StepScaling",
            StepScalingPolicyConfiguration={
                "AdjustmentType": "ChangeInCapacity",
                "StepAdjustments": [
                    {
                        "MetricIntervalLowerBound": 0,
                        "MetricIntervalUpperBound": 500,
                        "ScalingAdjustment": 1,
                    },
                    {
                        "MetricIntervalLowerBound": 500,
                        "ScalingAdjustment": 2,
                    },
                ],
                "Cooldown": 120,
            },
        )
        policy_arn = response["PolicyARN"]

        # Create alarm that triggers the step policy
        cw.put_metric_alarm(
            AlarmName=f"{endpoint_name}-high-latency",
            Namespace="AWS/SageMaker",
            MetricName="ModelLatency",
            Dimensions=[
                {"Name": "EndpointName", "Value": endpoint_name},
                {"Name": "VariantName", "Value": variant_name},
            ],
            Statistic="Average",
            Period=60,
            EvaluationPeriods=2,
            Threshold=2000000,  # 2 seconds in microseconds
            ComparisonOperator="GreaterThanThreshold",
            AlarmActions=[policy_arn],
        )

        logger.info(f"Step scaling policy created: {policy_arn}")
        return policy_arn

    def _create_scheduled_scaling(
        self,
        endpoint_name: str,
        resource_id: str,
        min_instances: int,
        max_instances: int,
    ) -> list[str]:
        """Create scheduled scaling for peak/off-peak hours (JST)."""
        schedules = []

        # Peak hours: 18:00 JST (09:00 UTC) — min 3 instances
        self.autoscaling.put_scheduled_action(
            ServiceNamespace="sagemaker",
            ScheduledActionName=f"{endpoint_name}-peak-start",
            ResourceId=resource_id,
            ScalableDimension="sagemaker:variant:DesiredInstanceCount",
            Schedule="cron(0 9 * * ? *)",  # 09:00 UTC = 18:00 JST
            ScalableTargetAction={
                "MinCapacity": 3,
                "MaxCapacity": max_instances,
            },
        )
        schedules.append(f"{endpoint_name}-peak-start")

        # Off-peak: 00:00 JST (15:00 UTC) — min 1 instance
        self.autoscaling.put_scheduled_action(
            ServiceNamespace="sagemaker",
            ScheduledActionName=f"{endpoint_name}-offpeak-start",
            ResourceId=resource_id,
            ScalableDimension="sagemaker:variant:DesiredInstanceCount",
            Schedule="cron(0 15 * * ? *)",  # 15:00 UTC = 00:00 JST
            ScalableTargetAction={
                "MinCapacity": min_instances,
                "MaxCapacity": max(max_instances // 2, 2),
            },
        )
        schedules.append(f"{endpoint_name}-offpeak-start")

        logger.info(f"Scheduled scaling actions created: {schedules}")
        return schedules

    def get_scaling_status(self, endpoint_name: str) -> dict:
        """Get current scaling status and activity."""
        resource_id = f"endpoint/{endpoint_name}/variant/AllTraffic"

        # Get current target
        targets = self.autoscaling.describe_scalable_targets(
            ServiceNamespace="sagemaker",
            ResourceIds=[resource_id],
        )

        # Get scaling activities
        activities = self.autoscaling.describe_scaling_activities(
            ServiceNamespace="sagemaker",
            ResourceId=resource_id,
            MaxResults=5,
        )

        target = targets["ScalableTargets"][0] if targets["ScalableTargets"] else {}
        recent_activities = [
            {
                "cause": a.get("Cause", ""),
                "status": a["StatusCode"],
                "start": str(a.get("StartTime", "")),
                "end": str(a.get("EndTime", "")),
            }
            for a in activities.get("ScalingActivities", [])
        ]

        return {
            "current_min": target.get("MinCapacity"),
            "current_max": target.get("MaxCapacity"),
            "recent_activities": recent_activities,
        }

4.3 Scaling Timeline Visualization

graph LR
    subgraph "Scaling Response Timeline"
        A["00:00 JST<br/>Min: 1 instance"] --> B["09:00 JST<br/>Business ramp-up"]
        B --> C["12:00 JST<br/>Lunch peak<br/>Scale to 2"]
        C --> D["17:00 JST<br/>Pre-peak warm-up"]
        D --> E["18:00 JST<br/>Scheduled: Min 3"]
        E --> F["20:00 JST<br/>Peak traffic<br/>Scale to 4-5"]
        F --> G["23:00 JST<br/>Peak declining"]
        G --> H["00:00 JST<br/>Scheduled: Min 1<br/>Scale-in begins"]
    end

    style E fill:#ffcdd2
    style F fill:#ffcdd2
    style H fill:#c8e6c9

5. Dynamic Pattern Switching

5.1 Real-Time Pattern Evaluation

"""
MangaAssist — Dynamic pattern switching.
Evaluates and switches deployment patterns based on real-time conditions.
"""

import time
import logging
from dataclasses import dataclass
from collections import deque
from typing import Optional

logger = logging.getLogger(__name__)


@dataclass
class PatternHealth:
    """Real-time health metrics for a deployment pattern."""
    pattern: str
    is_available: bool
    current_latency_p50_ms: float
    current_latency_p99_ms: float
    error_rate: float  # 0.0 - 1.0
    capacity_utilization: float  # 0.0 - 1.0
    last_updated: float  # timestamp


class DynamicPatternSwitcher:
    """
    Monitors deployment pattern health and switches dynamically
    when a pattern degrades below thresholds.
    """

    def __init__(
        self,
        latency_threshold_ms: float = 2500,
        error_rate_threshold: float = 0.05,
        capacity_threshold: float = 0.85,
    ):
        self.latency_threshold = latency_threshold_ms
        self.error_rate_threshold = error_rate_threshold
        self.capacity_threshold = capacity_threshold

        self._pattern_health: dict[str, PatternHealth] = {}
        self._switch_history: deque = deque(maxlen=100)
        self._current_primary = "bedrock_provisioned"

    def update_health(self, health: PatternHealth) -> None:
        """Update health metrics for a deployment pattern."""
        self._pattern_health[health.pattern] = health

    def evaluate_and_switch(self) -> Optional[str]:
        """
        Evaluate all patterns and switch primary if needed.
        Returns the new primary pattern, or None if no switch needed.
        """
        current = self._pattern_health.get(self._current_primary)
        if current is None:
            return None

        # Check if current primary is degraded
        is_degraded = (
            not current.is_available
            or current.current_latency_p99_ms > self.latency_threshold
            or current.error_rate > self.error_rate_threshold
            or current.capacity_utilization > self.capacity_threshold
        )

        if not is_degraded:
            return None

        # Find best alternative
        fallback_priority = [
            "bedrock_provisioned",
            "bedrock_ondemand_haiku",
            "bedrock_ondemand_sonnet",
            "sagemaker_realtime",
        ]

        for candidate in fallback_priority:
            if candidate == self._current_primary:
                continue

            candidate_health = self._pattern_health.get(candidate)
            if candidate_health is None:
                continue

            if (
                candidate_health.is_available
                and candidate_health.error_rate < self.error_rate_threshold
                and candidate_health.capacity_utilization < self.capacity_threshold
            ):
                old_primary = self._current_primary
                self._current_primary = candidate

                self._switch_history.append({
                    "timestamp": time.time(),
                    "from": old_primary,
                    "to": candidate,
                    "reason": self._get_switch_reason(current),
                })

                logger.warning(
                    "Switching primary deployment pattern",
                    extra={
                        "from": old_primary,
                        "to": candidate,
                        "reason": self._get_switch_reason(current),
                    },
                )
                return candidate

        logger.error("No healthy deployment pattern available")
        return None

    def _get_switch_reason(self, health: PatternHealth) -> str:
        """Determine why a switch is necessary."""
        reasons = []
        if not health.is_available:
            reasons.append("unavailable")
        if health.current_latency_p99_ms > self.latency_threshold:
            reasons.append(f"high_latency({health.current_latency_p99_ms:.0f}ms)")
        if health.error_rate > self.error_rate_threshold:
            reasons.append(f"high_errors({health.error_rate:.1%})")
        if health.capacity_utilization > self.capacity_threshold:
            reasons.append(f"capacity({health.capacity_utilization:.1%})")
        return ", ".join(reasons) or "unknown"

    def get_routing_weights(self) -> dict[str, float]:
        """
        Get traffic routing weights for all patterns.
        Primary gets most traffic; healthy alternatives get a small share.
        """
        weights = {}
        for pattern, health in self._pattern_health.items():
            if not health.is_available:
                weights[pattern] = 0.0
            elif pattern == self._current_primary:
                weights[pattern] = 0.80
            elif health.error_rate < self.error_rate_threshold:
                weights[pattern] = 0.05  # Small canary allocation
            else:
                weights[pattern] = 0.0

        # Normalize
        total = sum(weights.values())
        if total > 0:
            weights = {k: round(v / total, 3) for k, v in weights.items()}

        return weights

5.2 Pattern Health Dashboard

graph TB
    subgraph "Health Monitor"
        H1[Bedrock Provisioned<br/>P99: 600ms ✓<br/>Errors: 0.1% ✓<br/>Capacity: 65%] --> Router
        H2[Bedrock On-Demand Haiku<br/>P99: 1200ms ✓<br/>Errors: 0.3% ✓<br/>Capacity: N/A] --> Router
        H3[Bedrock On-Demand Sonnet<br/>P99: 1800ms ✓<br/>Errors: 0.2% ✓<br/>Capacity: N/A] --> Router
        H4[SageMaker Endpoint<br/>P99: 500ms ✓<br/>Errors: 0.1% ✓<br/>Capacity: 45%] --> Router
    end

    Router[Traffic Router<br/>Weights: 80/10/5/5] --> Response[Client Response]

    style H1 fill:#c8e6c9
    style H2 fill:#c8e6c9
    style H3 fill:#c8e6c9
    style H4 fill:#c8e6c9
    style Router fill:#fff9c4

6. Monitoring and Observability

6.1 Pattern Selection Metrics

"""
MangaAssist — Deployment pattern monitoring.
Tracks selection decisions, performance, and cost across patterns.
"""

import json
import time
import logging
import boto3
from typing import Optional

logger = logging.getLogger(__name__)


class DeploymentPatternMonitor:
    """
    Publishes custom CloudWatch metrics for deployment pattern monitoring.
    Tracks routing decisions, latency by pattern, and cost accumulation.
    """

    def __init__(self, region: str = "ap-northeast-1"):
        self.cw = boto3.client("cloudwatch", region_name=region)
        self.namespace = "MangaAssist/Deployment"

    def record_routing_decision(
        self,
        pattern: str,
        reason: str,
        latency_ms: float,
        cost_usd: float,
        success: bool,
    ) -> None:
        """Record a routing decision and its outcome."""
        metric_data = [
            {
                "MetricName": "RoutingDecision",
                "Dimensions": [
                    {"Name": "Pattern", "Value": pattern},
                    {"Name": "Reason", "Value": reason},
                ],
                "Value": 1,
                "Unit": "Count",
            },
            {
                "MetricName": "InferenceLatency",
                "Dimensions": [
                    {"Name": "Pattern", "Value": pattern},
                ],
                "Value": latency_ms,
                "Unit": "Milliseconds",
            },
            {
                "MetricName": "InferenceCost",
                "Dimensions": [
                    {"Name": "Pattern", "Value": pattern},
                ],
                "Value": cost_usd,
                "Unit": "None",
            },
            {
                "MetricName": "InferenceSuccess",
                "Dimensions": [
                    {"Name": "Pattern", "Value": pattern},
                ],
                "Value": 1 if success else 0,
                "Unit": "Count",
            },
        ]

        self.cw.put_metric_data(
            Namespace=self.namespace,
            MetricData=metric_data,
        )

    def record_pattern_switch(
        self,
        from_pattern: str,
        to_pattern: str,
        reason: str,
    ) -> None:
        """Record a deployment pattern switch event."""
        self.cw.put_metric_data(
            Namespace=self.namespace,
            MetricData=[
                {
                    "MetricName": "PatternSwitch",
                    "Dimensions": [
                        {"Name": "FromPattern", "Value": from_pattern},
                        {"Name": "ToPattern", "Value": to_pattern},
                        {"Name": "Reason", "Value": reason},
                    ],
                    "Value": 1,
                    "Unit": "Count",
                },
            ],
        )

    def create_dashboard(self, dashboard_name: str = "MangaAssist-Deployment") -> None:
        """Create a CloudWatch dashboard for deployment monitoring."""
        dashboard_body = {
            "widgets": [
                {
                    "type": "metric",
                    "properties": {
                        "title": "Routing Decisions by Pattern",
                        "metrics": [
                            [self.namespace, "RoutingDecision", "Pattern", p]
                            for p in [
                                "provisioned-sonnet",
                                "ondemand-haiku",
                                "ondemand-sonnet",
                                "sagemaker-custom",
                            ]
                        ],
                        "period": 300,
                        "stat": "Sum",
                    },
                },
                {
                    "type": "metric",
                    "properties": {
                        "title": "Inference Latency P99 by Pattern",
                        "metrics": [
                            [self.namespace, "InferenceLatency", "Pattern", p]
                            for p in [
                                "provisioned-sonnet",
                                "ondemand-haiku",
                                "ondemand-sonnet",
                                "sagemaker-custom",
                            ]
                        ],
                        "period": 300,
                        "stat": "p99",
                    },
                },
                {
                    "type": "metric",
                    "properties": {
                        "title": "Cumulative Daily Cost",
                        "metrics": [
                            [self.namespace, "InferenceCost", "Pattern", p]
                            for p in [
                                "provisioned-sonnet",
                                "ondemand-haiku",
                                "ondemand-sonnet",
                                "sagemaker-custom",
                            ]
                        ],
                        "period": 3600,
                        "stat": "Sum",
                    },
                },
            ],
        }

        self.cw.put_dashboard(
            DashboardName=dashboard_name,
            DashboardBody=json.dumps(dashboard_body),
        )
        logger.info(f"Dashboard created: {dashboard_name}")

Key Takeaways

  1. Latency budgets drive pattern selection — MangaAssist's 3-second end-to-end target leaves only 1.5s for FM inference; this eliminates Lambda without provisioned concurrency (cold starts push p99 above 3.5s) and favors Bedrock provisioned or SageMaker for consistent sub-second inference.

  2. Throughput sizing is a math problem — at 1M messages/day with 3.5x peak multiplier, MangaAssist needs ~40 RPS peak capacity; this maps to 2 Sonnet model units for provisioned throughput or 2-4 SageMaker g5.2xlarge instances.

  3. TCO includes operational cost — SageMaker may have the lowest per-message cost at scale, but 12 hours/month of engineer time for GPU management, model updates, and scaling tuning adds $1,200-2,400/month that Bedrock avoids entirely.

  4. SageMaker auto-scaling requires three policy types — target tracking handles steady growth, step scaling reacts to latency spikes, and scheduled scaling pre-provisions for known peaks; using only one leaves gaps.

  5. Scale-out cooldown must be aggressive (60s) — new SageMaker instances take 5-15 minutes to become available, so the scaling trigger must fire as early as possible; scale-in cooldown should be conservative (300s) to avoid flapping.

  6. Dynamic pattern switching provides resilience — monitoring all patterns in real time and automatically routing away from degraded ones ensures MangaAssist maintains its 3-second SLA even when individual deployment targets experience issues.

  7. The hybrid approach optimizes for each traffic segment — peak hours use provisioned throughput (cost-efficient at high volume), simple queries use Haiku on-demand (cheapest per request), complex queries use Sonnet (quality-first), and manga-specific queries use a fine-tuned SageMaker model (best accuracy).