LOCAL PREVIEW View on GitHub

Resource Allocation Architecture for GenAI Applications

AWS AIP-C01 Task 4.2 — Skill 4.2.5: Right-size resources to optimize FM application throughput Context: MangaAssist JP manga e-commerce chatbot — Bedrock Claude 3 (Sonnet/Haiku), OpenSearch Serverless, DynamoDB, ECS Fargate (orchestrator), API Gateway WebSocket, ElastiCache Redis. 1M messages/day, peak 20K concurrent sessions.


Skill Mapping

Certification Domain Task Skill
AWS AIP-C01 Domain 4 — Responsible AI & Monitoring Task 4.2 — Optimize FM application performance Skill 4.2.5 — Right-size resources to optimize FM application throughput

Skill scope: Design resource allocation strategies that match compute, memory, storage, and throughput capacity to the actual demands of each GenAI service layer — from FM invocation through orchestration to data retrieval — ensuring cost-efficient utilization without sacrificing latency or availability targets.


Mind Map — Resource Allocation Dimensions

mindmap
  root((Resource<br/>Allocation))
    Capacity Planning
      Token Processing Budget
        Tokens per Second Required
        Bedrock Model Units
        Peak vs Sustained Load
        Buffer Margin 20-30%
      Compute Sizing
        ECS vCPU per Invocation
        Memory per Context Window
        Task Count at Peak
        Fargate Spot Mix
      Storage & Retrieval
        OpenSearch Base OCU
        OpenSearch Search OCU
        DynamoDB RCU/WCU
        ElastiCache Node Type
    Utilization Monitoring
      Prompt/Completion Patterns
        Input Token Distribution
        Output Token Distribution
        Input-to-Output Ratio
        Waste Detection
      Service Utilization
        ECS CPU & Memory %
        Bedrock Throttle Rate
        Cache Hit Efficiency
        Search OCU Saturation
      Cost Efficiency
        Cost per Message
        Cost per Intent Category
        Idle Resource Detection
        Reserved vs On-Demand ROI
    Auto-Scaling
      GenAI-Aware Metrics
        Active Bedrock Invocations
        Pending Queue Depth
        Token Queue Backlog
        WebSocket Connections
      Scaling Policies
        Target Tracking
        Step Scaling
        Predictive Scaling
        Scheduled Actions
      Cooldown & Stability
        Scale-Out Aggressiveness
        Scale-In Conservatism
        Oscillation Prevention
        Warm Pool Strategy
    Cost Allocation
      Tagging Strategy
        By Intent Category
        By Tier (Sonnet/Haiku)
        By Environment
        By Feature
      Budget Controls
        Daily Token Budget
        Per-User Cost Cap
        Alert Thresholds
        Auto-Throttle on Overspend
      Optimization
        Model Tiering ROI
        Cache Savings Attribution
        Spot Savings Tracking
        Reserved Capacity Amortization

Token Processing Capacity Model

The fundamental equation for GenAI resource allocation starts with token throughput. Every downstream resource must be sized to support the token pipeline.

The Capacity Equation

Required Token Capacity = (requests/sec) x (avg_tokens/request) x (processing_time/token)

For MangaAssist at 1M messages/day:

Parameter Value Derivation
Messages per day 1,000,000 Given traffic volume
Average requests per second (sustained) ~11.6 req/s 1M / 86,400 seconds
Peak requests per second ~230 req/s 20K concurrent / avg 87s session with ~1 req/sec active
Avg input tokens per request 1,200 tokens System prompt (400) + context (500) + user query (100) + history (200)
Avg output tokens per request 350 tokens Typical manga recommendation response
Avg total tokens per request 1,550 tokens Input + output
Peak token throughput required ~356,500 tokens/sec 230 req/s x 1,550 tokens
Processing time per token (Sonnet) ~15ms Time-to-first-token + generation
Processing time per token (Haiku) ~5ms Faster for simple intent routing

Mapping to Bedrock Model Units

Bedrock Provisioned Throughput = Peak tokens/min / model_unit_capacity
Model Unit Capacity Required Units (peak) Buffer (25%) Total Units
Claude 3 Sonnet 50,000 tokens/min ~428 107 535
Claude 3 Haiku 100,000 tokens/min ~214 54 268
Blended (70% Haiku / 30% Sonnet) ~278 effective 70 348

MangaAssist design decision: Route 70% of traffic to Haiku (simple lookups, intent classification, FAQ) and 30% to Sonnet (complex recommendations, multi-turn reasoning). This reduces Bedrock cost by ~45% vs all-Sonnet while maintaining quality where it matters.


Utilization Monitoring for Prompt/Completion Patterns

Tracking the input-to-output token ratio reveals efficiency patterns — and waste.

Token Ratio Analysis

Pattern Input Tokens Output Tokens Ratio Diagnosis
Efficient recommendation 800 400 2:1 Healthy — concise context, rich response
Bloated system prompt 2,500 150 16.7:1 Wasteful — paying for input tokens that produce little output
Over-retrieved RAG context 3,000 300 10:1 Wasteful — too many chunks retrieved from OpenSearch
Multi-turn with full history 4,000 200 20:1 Critical waste — history should be summarized
Simple FAQ response 400 100 4:1 Acceptable — consider caching these entirely
Complex manga comparison 1,500 800 1.9:1 Optimal — high-value response justifies input cost

Target ratio for MangaAssist: 2:1 to 5:1 input-to-output ratio. Anything above 10:1 triggers a prompt optimization review.


GenAI-Specific Resource Dimensions

Each service in the MangaAssist stack has distinct resource dimensions that must be allocated in proportion to the token pipeline.

Service Resource Dimension Metric Target Utilization Scaling Trigger Cost Impact
Bedrock Claude 3 Sonnet Provisioned throughput (model units) InvocationsPerMinute, TokensProcessed 70% of provisioned >80% for 3 min $63/model unit/hr
Bedrock Claude 3 Haiku Provisioned throughput (model units) InvocationsPerMinute, TokensProcessed 75% of provisioned >85% for 3 min $8/model unit/hr
ECS Fargate (orchestrator) vCPU + Memory per task CPUUtilization, MemoryUtilization 65% CPU, 70% Memory CPU >75% for 2 min $0.04/vCPU-hr + $0.004/GB-hr
OpenSearch Serverless OCU (base + search) SearchLatency, IndexingRate Search P95 < 50ms P95 > 80ms for 5 min $0.24/OCU-hr
ElastiCache Redis Node type (memory + connections) CurrConnections, BytesUsedForCache, CacheHitRate Memory < 75%, Connections < 80% Memory > 80% $0.068/hr (r6g.large)
DynamoDB RCU/WCU or on-demand ConsumedReadCapacityUnits, ThrottledRequests 70% of provisioned ThrottledRequests > 0 $0.25/WCU-mo, $0.05/RCU-mo
API Gateway WebSocket Connections + message rate ConnectionCount, MessageCount < 80% of account limit > 70% account limit $1/million messages

Auto-Scaling Configurations for GenAI Traffic

GenAI traffic is bursty and correlated — a new manga release can spike traffic 10x in minutes. Traditional CPU-based scaling is too slow.

ECS Auto-Scaling with GenAI-Aware Metrics

flowchart TD
    subgraph Metrics["GenAI-Aware Scaling Metrics"]
        A["Active Bedrock Invocations<br/>(custom CloudWatch metric)"]
        B["WebSocket Connection Count<br/>(API Gateway metric)"]
        C["Pending Queue Depth<br/>(SQS/internal metric)"]
        D["ECS CPU Utilization<br/>(standard metric)"]
    end

    subgraph Policy["Scaling Policy Stack"]
        E["Target Tracking Policy<br/>Target: 10 Bedrock invocations/task"]
        F["Step Scaling Policy<br/>CPU > 75% → +2 tasks<br/>CPU > 90% → +5 tasks"]
        G["Predictive Scaling<br/>Based on historical patterns<br/>Pre-scale for manga release events"]
        H["Scheduled Scaling<br/>JP prime time: 19:00-23:00 JST<br/>Minimum 50 tasks"]
    end

    subgraph Guardrails["Scaling Guardrails"]
        I["Min Tasks: 10<br/>Max Tasks: 200<br/>Scale-out cooldown: 60s<br/>Scale-in cooldown: 300s"]
    end

    A --> E
    B --> E
    C --> F
    D --> F

    E --> I
    F --> I
    G --> I
    H --> I

    I --> J["ECS Service<br/>Desired Count Updated"]

Step Scaling Policy Detail

Metric Threshold Action Cooldown Rationale
CPU > 60% for 2 min Add 1 task 60s Gradual scale for normal growth
CPU > 75% for 1 min Add 2 tasks 60s Faster response to increasing load
CPU > 90% for 30s Add 5 tasks 30s Emergency scale for traffic spikes
CPU < 40% for 10 min Remove 1 task 300s Conservative scale-in to avoid flapping
CPU < 25% for 15 min Remove 2 tasks 300s More aggressive scale-in during quiet periods

Cost Allocation Tagging Strategy

Every resource in the MangaAssist stack must be tagged for cost attribution. This enables per-feature, per-tier, and per-environment cost analysis.

flowchart LR
    subgraph Tags["Cost Allocation Tags"]
        direction TB
        T1["app: mangaassist"]
        T2["environment: prod / staging / dev"]
        T3["intent: recommendation / search / faq / order"]
        T4["tier: sonnet / haiku / cache-hit"]
        T5["team: genai-platform"]
        T6["cost-center: chatbot-ops"]
    end

    subgraph Resources["Tagged Resources"]
        R1["Bedrock Model Invocations"]
        R2["ECS Fargate Tasks"]
        R3["OpenSearch Collections"]
        R4["DynamoDB Tables"]
        R5["ElastiCache Clusters"]
        R6["API Gateway APIs"]
    end

    Tags --> Resources

    subgraph Analysis["Cost Views"]
        A1["Cost per Intent Category"]
        A2["Cost per Model Tier"]
        A3["Cost per Environment"]
        A4["Cost Trend by Feature"]
    end

    Resources --> Analysis

Architecture — Resource Allocation Across All Services

flowchart TB
    subgraph Traffic["Incoming Traffic"]
        U["Users<br/>1M msg/day<br/>Peak 20K concurrent"]
    end

    subgraph Gateway["API Gateway WebSocket"]
        GW["Connection Limit: 100K<br/>Message Rate: 10K/sec<br/>Scaling: Managed"]
    end

    subgraph Orchestration["ECS Fargate — Orchestrator"]
        direction TB
        ECS["Task Definition:<br/>2 vCPU / 4 GB Memory<br/>Min: 10 / Max: 200 tasks<br/>Target: 65% CPU"]
        ECS_AS["Auto-Scaling:<br/>Target Tracking (Bedrock invocations/task)<br/>Step Scaling (CPU)<br/>Predictive (JP prime time)"]
    end

    subgraph FM["Bedrock FM Layer"]
        direction TB
        SONNET["Claude 3 Sonnet<br/>Provisioned: 535 units<br/>Use: Complex reasoning<br/>30% of traffic"]
        HAIKU["Claude 3 Haiku<br/>Provisioned: 268 units<br/>Use: Simple queries<br/>70% of traffic"]
    end

    subgraph Data["Data Layer"]
        direction TB
        OS["OpenSearch Serverless<br/>Base OCU: 4<br/>Search OCU: 4-12 (auto)<br/>Latency target: P95 < 50ms"]
        DYNAMO["DynamoDB<br/>Mode: On-Demand<br/>Session table: ~2K RCU peak<br/>Auto-scaling backup provisioned"]
        CACHE["ElastiCache Redis<br/>r6g.large (2 nodes)<br/>13 GB memory each<br/>Eviction: allkeys-lfu"]
    end

    U --> GW
    GW --> ECS
    ECS --> SONNET
    ECS --> HAIKU
    ECS --> OS
    ECS --> DYNAMO
    ECS --> CACHE

    subgraph Monitor["Resource Monitoring"]
        CW["CloudWatch Dashboards<br/>+ Custom Metrics<br/>+ Composite Alarms"]
    end

    ECS_AS -.-> ECS
    ECS -.-> Monitor
    FM -.-> Monitor
    Data -.-> Monitor

Python — CapacityPlanner Class

"""
MangaAssist Resource Allocation — Capacity Planner
Calculates required capacity across all services based on traffic projections.
"""

import math
from dataclasses import dataclass, field
from typing import Dict, Optional
from datetime import datetime, timedelta


@dataclass
class TrafficProfile:
    """Defines expected traffic characteristics for MangaAssist."""
    messages_per_day: int = 1_000_000
    peak_concurrent_sessions: int = 20_000
    avg_session_duration_sec: int = 87
    avg_requests_per_session: float = 3.2
    peak_to_average_ratio: float = 20.0  # peak can be 20x average
    sonnet_traffic_pct: float = 0.30
    haiku_traffic_pct: float = 0.70

    # Token characteristics
    avg_input_tokens: int = 1_200
    avg_output_tokens: int = 350
    max_input_tokens: int = 4_000
    max_output_tokens: int = 1_000


@dataclass
class ServiceCapacity:
    """Capacity allocation for a single service."""
    service_name: str
    resource_type: str
    current_allocation: float
    required_allocation: float
    recommended_allocation: float  # with buffer
    buffer_pct: float
    utilization_target_pct: float
    estimated_monthly_cost: float
    scaling_policy: str


class CapacityPlanner:
    """
    Calculates required capacity for every service in the MangaAssist stack.

    Starts from traffic volume and derives per-service requirements using the
    token processing capacity model:
        required_compute = (req/s) x (tokens/req) x (processing_time/token)
    """

    # Bedrock pricing and capacity constants
    SONNET_TOKENS_PER_UNIT_MIN = 50_000
    HAIKU_TOKENS_PER_UNIT_MIN = 100_000
    SONNET_UNIT_COST_HR = 63.0
    HAIKU_UNIT_COST_HR = 8.0
    SONNET_MS_PER_TOKEN = 15
    HAIKU_MS_PER_TOKEN = 5

    # ECS Fargate constants
    ECS_VCPU_PER_TASK = 2
    ECS_MEMORY_GB_PER_TASK = 4
    ECS_VCPU_COST_HR = 0.04
    ECS_MEM_COST_GB_HR = 0.004
    MAX_BEDROCK_INVOCATIONS_PER_TASK = 10  # concurrent invocations

    # OpenSearch constants
    OCU_COST_HR = 0.24
    BASE_OCU_MIN = 2
    SEARCH_OCU_MIN = 2

    # ElastiCache constants
    REDIS_MEMORY_PER_CACHED_RESPONSE_KB = 8  # avg embedding + response
    REDIS_NODE_MEMORY_GB = 13  # r6g.large

    # DynamoDB constants
    RCU_COST_MONTH = 0.05
    WCU_COST_MONTH = 0.25

    def __init__(
        self,
        traffic: TrafficProfile,
        buffer_pct: float = 0.25,
    ):
        self.traffic = traffic
        self.buffer_pct = buffer_pct
        self._capacity_plan: Dict[str, ServiceCapacity] = {}

    def calculate_sustained_rps(self) -> float:
        """Average requests per second across the day."""
        return self.traffic.messages_per_day / 86_400

    def calculate_peak_rps(self) -> float:
        """Peak requests per second based on concurrent session model."""
        active_sessions = self.traffic.peak_concurrent_sessions
        avg_think_time_sec = (
            self.traffic.avg_session_duration_sec
            / self.traffic.avg_requests_per_session
        )
        return active_sessions / avg_think_time_sec

    def calculate_peak_tokens_per_sec(self) -> Dict[str, float]:
        """Peak token throughput by model tier."""
        peak_rps = self.calculate_peak_rps()
        total_tokens_per_req = (
            self.traffic.avg_input_tokens + self.traffic.avg_output_tokens
        )

        sonnet_tps = (
            peak_rps
            * self.traffic.sonnet_traffic_pct
            * total_tokens_per_req
        )
        haiku_tps = (
            peak_rps
            * self.traffic.haiku_traffic_pct
            * total_tokens_per_req
        )

        return {
            "sonnet_tokens_per_sec": sonnet_tps,
            "haiku_tokens_per_sec": haiku_tps,
            "total_tokens_per_sec": sonnet_tps + haiku_tps,
        }

    def plan_bedrock_capacity(self) -> Dict[str, ServiceCapacity]:
        """Calculate Bedrock provisioned throughput model units."""
        tokens = self.calculate_peak_tokens_per_sec()

        # Convert tokens/sec to tokens/min for Bedrock units
        sonnet_tpm = tokens["sonnet_tokens_per_sec"] * 60
        haiku_tpm = tokens["haiku_tokens_per_sec"] * 60

        sonnet_units_raw = sonnet_tpm / self.SONNET_TOKENS_PER_UNIT_MIN
        haiku_units_raw = haiku_tpm / self.HAIKU_TOKENS_PER_UNIT_MIN

        sonnet_units_buffered = math.ceil(
            sonnet_units_raw * (1 + self.buffer_pct)
        )
        haiku_units_buffered = math.ceil(
            haiku_units_raw * (1 + self.buffer_pct)
        )

        self._capacity_plan["bedrock_sonnet"] = ServiceCapacity(
            service_name="Bedrock Claude 3 Sonnet",
            resource_type="Model Units (Provisioned Throughput)",
            current_allocation=0,
            required_allocation=sonnet_units_raw,
            recommended_allocation=sonnet_units_buffered,
            buffer_pct=self.buffer_pct,
            utilization_target_pct=0.70,
            estimated_monthly_cost=sonnet_units_buffered * self.SONNET_UNIT_COST_HR * 730,
            scaling_policy="Manual provisioning with monthly review",
        )

        self._capacity_plan["bedrock_haiku"] = ServiceCapacity(
            service_name="Bedrock Claude 3 Haiku",
            resource_type="Model Units (Provisioned Throughput)",
            current_allocation=0,
            required_allocation=haiku_units_raw,
            recommended_allocation=haiku_units_buffered,
            buffer_pct=self.buffer_pct,
            utilization_target_pct=0.75,
            estimated_monthly_cost=haiku_units_buffered * self.HAIKU_UNIT_COST_HR * 730,
            scaling_policy="Manual provisioning with monthly review",
        )

        return {
            "sonnet": self._capacity_plan["bedrock_sonnet"],
            "haiku": self._capacity_plan["bedrock_haiku"],
        }

    def plan_ecs_capacity(self) -> ServiceCapacity:
        """Calculate ECS Fargate task count for orchestrator."""
        peak_rps = self.calculate_peak_rps()

        # Each task handles MAX_BEDROCK_INVOCATIONS_PER_TASK concurrent
        # Bedrock invocations (async I/O bound, not CPU bound)
        tasks_required = math.ceil(
            peak_rps / self.MAX_BEDROCK_INVOCATIONS_PER_TASK
        )
        tasks_buffered = math.ceil(tasks_required * (1 + self.buffer_pct))

        cost_per_task_hr = (
            self.ECS_VCPU_PER_TASK * self.ECS_VCPU_COST_HR
            + self.ECS_MEMORY_GB_PER_TASK * self.ECS_MEM_COST_GB_HR
        )

        self._capacity_plan["ecs_orchestrator"] = ServiceCapacity(
            service_name="ECS Fargate Orchestrator",
            resource_type=f"Tasks ({self.ECS_VCPU_PER_TASK} vCPU / {self.ECS_MEMORY_GB_PER_TASK} GB)",
            current_allocation=0,
            required_allocation=tasks_required,
            recommended_allocation=tasks_buffered,
            buffer_pct=self.buffer_pct,
            utilization_target_pct=0.65,
            estimated_monthly_cost=tasks_buffered * cost_per_task_hr * 730,
            scaling_policy="Target tracking: 10 Bedrock invocations/task + step scaling on CPU",
        )

        return self._capacity_plan["ecs_orchestrator"]

    def plan_opensearch_capacity(self) -> ServiceCapacity:
        """Calculate OpenSearch Serverless OCU requirements."""
        peak_rps = self.calculate_peak_rps()

        # Estimate: each search OCU handles ~50 vector search queries/sec
        search_queries_per_sec = peak_rps * 0.6  # 60% of requests need RAG
        search_ocu_required = max(
            self.SEARCH_OCU_MIN,
            math.ceil(search_queries_per_sec / 50),
        )
        search_ocu_buffered = math.ceil(
            search_ocu_required * (1 + self.buffer_pct)
        )

        total_ocu = self.BASE_OCU_MIN + search_ocu_buffered

        self._capacity_plan["opensearch"] = ServiceCapacity(
            service_name="OpenSearch Serverless",
            resource_type=f"OCU (Base: {self.BASE_OCU_MIN} + Search: {search_ocu_buffered})",
            current_allocation=0,
            required_allocation=self.BASE_OCU_MIN + search_ocu_required,
            recommended_allocation=total_ocu,
            buffer_pct=self.buffer_pct,
            utilization_target_pct=0.70,
            estimated_monthly_cost=total_ocu * self.OCU_COST_HR * 730,
            scaling_policy="OpenSearch auto-scaling for search OCU; base OCU fixed",
        )

        return self._capacity_plan["opensearch"]

    def plan_elasticache_capacity(self) -> ServiceCapacity:
        """Calculate ElastiCache Redis capacity for semantic cache."""
        # Target: cache top 30% of queries (reduces Bedrock calls significantly)
        daily_unique_queries = int(self.traffic.messages_per_day * 0.4)
        cache_target_entries = int(daily_unique_queries * 0.30)
        memory_required_gb = (
            cache_target_entries * self.REDIS_MEMORY_PER_CACHED_RESPONSE_KB
        ) / (1024 * 1024)

        nodes_required = max(2, math.ceil(
            memory_required_gb / (self.REDIS_NODE_MEMORY_GB * 0.75)
        ))  # 75% memory target

        self._capacity_plan["elasticache"] = ServiceCapacity(
            service_name="ElastiCache Redis",
            resource_type=f"r6g.large nodes ({self.REDIS_NODE_MEMORY_GB} GB each)",
            current_allocation=0,
            required_allocation=nodes_required,
            recommended_allocation=nodes_required,
            buffer_pct=0.0,  # node count already accounts for replication
            utilization_target_pct=0.75,
            estimated_monthly_cost=nodes_required * 0.068 * 730,
            scaling_policy="Vertical scaling (node type upgrade) + eviction policy allkeys-lfu",
        )

        return self._capacity_plan["elasticache"]

    def plan_dynamodb_capacity(self) -> ServiceCapacity:
        """Calculate DynamoDB capacity for session storage."""
        peak_rps = self.calculate_peak_rps()

        # Each request reads session history (1-3 reads) and writes 1 update
        peak_rcu = math.ceil(peak_rps * 2.5)  # avg 2.5 reads per request
        peak_wcu = math.ceil(peak_rps * 1.2)  # 1 write + occasional metadata

        # On-demand pricing estimate (simpler, no throttle risk)
        on_demand_monthly = (
            peak_rcu * 86_400 * 30 * 0.25 / 1_000_000  # read request units
            + peak_wcu * 86_400 * 30 * 1.25 / 1_000_000  # write request units
        )

        self._capacity_plan["dynamodb"] = ServiceCapacity(
            service_name="DynamoDB Session Table",
            resource_type=f"On-Demand (peak ~{peak_rcu} RCU / ~{peak_wcu} WCU equivalent)",
            current_allocation=0,
            required_allocation=peak_rcu,
            recommended_allocation=peak_rcu,
            buffer_pct=0.0,  # on-demand auto-scales
            utilization_target_pct=0.80,
            estimated_monthly_cost=on_demand_monthly,
            scaling_policy="On-demand mode (auto-scales to traffic)",
        )

        return self._capacity_plan["dynamodb"]

    def generate_full_plan(self) -> Dict[str, ServiceCapacity]:
        """Generate capacity plan for all services."""
        self.plan_bedrock_capacity()
        self.plan_ecs_capacity()
        self.plan_opensearch_capacity()
        self.plan_elasticache_capacity()
        self.plan_dynamodb_capacity()
        return self._capacity_plan

    def print_summary(self) -> None:
        """Print a formatted summary of the capacity plan."""
        plan = self.generate_full_plan()
        total_monthly = 0.0

        print("=" * 80)
        print("MANGAASSIST RESOURCE ALLOCATION — CAPACITY PLAN")
        print(f"Traffic: {self.traffic.messages_per_day:,} msg/day | "
              f"Peak: {self.traffic.peak_concurrent_sessions:,} concurrent")
        print(f"Buffer: {self.buffer_pct:.0%}")
        print("=" * 80)

        for key, cap in plan.items():
            print(f"\n--- {cap.service_name} ---")
            print(f"  Resource: {cap.resource_type}")
            print(f"  Required: {cap.required_allocation:.1f}")
            print(f"  Recommended (with buffer): {cap.recommended_allocation:.1f}")
            print(f"  Utilization target: {cap.utilization_target_pct:.0%}")
            print(f"  Scaling: {cap.scaling_policy}")
            print(f"  Est. monthly cost: ${cap.estimated_monthly_cost:,.2f}")
            total_monthly += cap.estimated_monthly_cost

        print("\n" + "=" * 80)
        print(f"TOTAL ESTIMATED MONTHLY COST: ${total_monthly:,.2f}")
        print("=" * 80)


# --- Usage ---
if __name__ == "__main__":
    traffic = TrafficProfile(
        messages_per_day=1_000_000,
        peak_concurrent_sessions=20_000,
    )
    planner = CapacityPlanner(traffic=traffic, buffer_pct=0.25)
    planner.print_summary()

Python — ResourceUtilizationMonitor

"""
MangaAssist Resource Allocation — Utilization Monitor
Tracks real-time utilization across services and detects inefficiency.
"""

import time
import logging
from dataclasses import dataclass, field
from typing import Dict, List, Optional
from datetime import datetime, timedelta
from enum import Enum

import boto3

logger = logging.getLogger(__name__)


class UtilizationStatus(Enum):
    """Health status based on utilization level."""
    UNDER_UTILIZED = "under_utilized"   # below 30% — wasting money
    HEALTHY = "healthy"                  # 30-70% — optimal range
    ELEVATED = "elevated"               # 70-85% — approaching limits
    CRITICAL = "critical"               # above 85% — scaling needed NOW


@dataclass
class UtilizationSnapshot:
    """Point-in-time utilization reading for a service."""
    service_name: str
    metric_name: str
    current_value: float
    target_value: float
    utilization_pct: float
    status: UtilizationStatus
    timestamp: datetime
    cost_impact_hourly: float = 0.0
    recommendation: str = ""


@dataclass
class TokenEfficiencyReport:
    """Analysis of token usage patterns for waste detection."""
    total_requests: int
    avg_input_tokens: float
    avg_output_tokens: float
    input_output_ratio: float
    waste_pct: float  # % of requests with ratio > 10:1
    top_waste_intents: List[Dict[str, float]] = field(default_factory=list)
    estimated_monthly_savings_if_optimized: float = 0.0


class ResourceUtilizationMonitor:
    """
    Monitors resource utilization across MangaAssist services and flags
    inefficiency — both over-provisioning (cost waste) and under-provisioning
    (performance risk).
    """

    UTILIZATION_THRESHOLDS = {
        "under_utilized": 0.30,
        "healthy_upper": 0.70,
        "elevated_upper": 0.85,
    }

    def __init__(self, region: str = "ap-northeast-1"):
        self.region = region
        self.cloudwatch = boto3.client("cloudwatch", region_name=region)
        self.bedrock = boto3.client("bedrock-runtime", region_name=region)
        self._snapshots: List[UtilizationSnapshot] = []

    def _classify_utilization(self, pct: float) -> UtilizationStatus:
        """Classify utilization percentage into status."""
        if pct < self.UTILIZATION_THRESHOLDS["under_utilized"]:
            return UtilizationStatus.UNDER_UTILIZED
        elif pct < self.UTILIZATION_THRESHOLDS["healthy_upper"]:
            return UtilizationStatus.HEALTHY
        elif pct < self.UTILIZATION_THRESHOLDS["elevated_upper"]:
            return UtilizationStatus.ELEVATED
        else:
            return UtilizationStatus.CRITICAL

    def _get_cloudwatch_avg(
        self,
        namespace: str,
        metric_name: str,
        dimensions: List[Dict[str, str]],
        period_minutes: int = 5,
    ) -> Optional[float]:
        """Fetch average metric value from CloudWatch."""
        try:
            response = self.cloudwatch.get_metric_statistics(
                Namespace=namespace,
                MetricName=metric_name,
                Dimensions=dimensions,
                StartTime=datetime.utcnow() - timedelta(minutes=period_minutes),
                EndTime=datetime.utcnow(),
                Period=period_minutes * 60,
                Statistics=["Average"],
            )
            datapoints = response.get("Datapoints", [])
            if datapoints:
                return datapoints[-1]["Average"]
            return None
        except Exception as e:
            logger.error(f"CloudWatch query failed: {e}")
            return None

    def check_ecs_utilization(
        self,
        cluster_name: str = "mangaassist-prod",
        service_name: str = "orchestrator",
    ) -> UtilizationSnapshot:
        """Monitor ECS Fargate orchestrator CPU and memory utilization."""
        cpu_pct = self._get_cloudwatch_avg(
            namespace="AWS/ECS",
            metric_name="CPUUtilization",
            dimensions=[
                {"Name": "ClusterName", "Value": cluster_name},
                {"Name": "ServiceName", "Value": service_name},
            ],
        )

        if cpu_pct is None:
            cpu_pct = 0.0

        status = self._classify_utilization(cpu_pct / 100)

        recommendation = ""
        if status == UtilizationStatus.UNDER_UTILIZED:
            recommendation = (
                "ECS tasks are under-utilized. Consider reducing min task "
                "count or switching to smaller task size to save costs."
            )
        elif status == UtilizationStatus.CRITICAL:
            recommendation = (
                "ECS CPU critical. Immediate scale-out needed. Check if "
                "auto-scaling policy is responding. Consider increasing "
                "max task count."
            )

        snapshot = UtilizationSnapshot(
            service_name="ECS Fargate Orchestrator",
            metric_name="CPUUtilization",
            current_value=cpu_pct,
            target_value=65.0,
            utilization_pct=cpu_pct,
            status=status,
            timestamp=datetime.utcnow(),
            recommendation=recommendation,
        )

        self._snapshots.append(snapshot)
        return snapshot

    def check_bedrock_utilization(
        self,
        model_id: str = "anthropic.claude-3-sonnet-20240229-v1:0",
        provisioned_units: int = 535,
    ) -> UtilizationSnapshot:
        """Monitor Bedrock model invocation rate vs provisioned capacity."""
        invocations = self._get_cloudwatch_avg(
            namespace="AWS/Bedrock",
            metric_name="Invocations",
            dimensions=[
                {"Name": "ModelId", "Value": model_id},
            ],
        )

        if invocations is None:
            invocations = 0.0

        # Estimate utilization as invocations / theoretical max
        max_invocations_per_min = provisioned_units * 2  # rough estimate
        utilization_pct = (
            (invocations / max_invocations_per_min * 100)
            if max_invocations_per_min > 0 else 0.0
        )

        status = self._classify_utilization(utilization_pct / 100)

        snapshot = UtilizationSnapshot(
            service_name=f"Bedrock {model_id.split('.')[1].split('-')[0].title()}",
            metric_name="InvocationUtilization",
            current_value=invocations,
            target_value=max_invocations_per_min * 0.70,
            utilization_pct=utilization_pct,
            status=status,
            timestamp=datetime.utcnow(),
        )

        self._snapshots.append(snapshot)
        return snapshot

    def check_dynamodb_utilization(
        self,
        table_name: str = "mangaassist-sessions",
    ) -> UtilizationSnapshot:
        """Monitor DynamoDB consumed capacity vs provisioned/limits."""
        consumed_rcu = self._get_cloudwatch_avg(
            namespace="AWS/DynamoDB",
            metric_name="ConsumedReadCapacityUnits",
            dimensions=[
                {"Name": "TableName", "Value": table_name},
            ],
        )

        throttle_count = self._get_cloudwatch_avg(
            namespace="AWS/DynamoDB",
            metric_name="ReadThrottleEvents",
            dimensions=[
                {"Name": "TableName", "Value": table_name},
            ],
        )

        consumed_rcu = consumed_rcu or 0.0
        throttle_count = throttle_count or 0.0

        # For on-demand: flag if throttle events detected
        status = UtilizationStatus.HEALTHY
        recommendation = ""
        if throttle_count > 0:
            status = UtilizationStatus.CRITICAL
            recommendation = (
                f"DynamoDB throttling detected ({throttle_count:.0f} events). "
                "On-demand table may have hit partition throughput limits. "
                "Check for hot partitions in session key design."
            )

        snapshot = UtilizationSnapshot(
            service_name="DynamoDB Sessions",
            metric_name="ConsumedReadCapacity",
            current_value=consumed_rcu,
            target_value=0.0,  # on-demand has no fixed target
            utilization_pct=0.0,
            status=status,
            timestamp=datetime.utcnow(),
            recommendation=recommendation,
        )

        self._snapshots.append(snapshot)
        return snapshot

    def check_elasticache_utilization(
        self,
        cluster_id: str = "mangaassist-cache",
    ) -> UtilizationSnapshot:
        """Monitor ElastiCache memory and connection utilization."""
        memory_pct = self._get_cloudwatch_avg(
            namespace="AWS/ElastiCache",
            metric_name="DatabaseMemoryUsagePercentage",
            dimensions=[
                {"Name": "CacheClusterId", "Value": cluster_id},
            ],
        )

        memory_pct = memory_pct or 0.0
        status = self._classify_utilization(memory_pct / 100)

        recommendation = ""
        if status == UtilizationStatus.CRITICAL:
            recommendation = (
                "ElastiCache memory critical. Verify allkeys-lfu eviction "
                "is active. Consider upgrading node type or reducing "
                "cache TTL for low-value entries."
            )
        elif status == UtilizationStatus.UNDER_UTILIZED:
            recommendation = (
                "ElastiCache memory under-utilized. Consider downgrading "
                "node type to save costs, or increasing cache coverage."
            )

        snapshot = UtilizationSnapshot(
            service_name="ElastiCache Redis",
            metric_name="MemoryUsagePercentage",
            current_value=memory_pct,
            target_value=75.0,
            utilization_pct=memory_pct,
            status=status,
            timestamp=datetime.utcnow(),
            recommendation=recommendation,
        )

        self._snapshots.append(snapshot)
        return snapshot

    def analyze_token_efficiency(
        self,
        invocation_logs: List[Dict],
    ) -> TokenEfficiencyReport:
        """
        Analyze token usage patterns to detect waste.

        A request with input:output ratio > 10:1 is flagged as wasteful —
        it means large prompts are producing small outputs (bloated system
        prompt, over-retrieved RAG context, or unnecessary history).
        """
        if not invocation_logs:
            return TokenEfficiencyReport(
                total_requests=0,
                avg_input_tokens=0,
                avg_output_tokens=0,
                input_output_ratio=0,
                waste_pct=0,
            )

        total_input = sum(log["input_tokens"] for log in invocation_logs)
        total_output = sum(log["output_tokens"] for log in invocation_logs)
        n = len(invocation_logs)

        avg_input = total_input / n
        avg_output = total_output / n
        ratio = avg_input / avg_output if avg_output > 0 else float("inf")

        # Count wasteful requests (ratio > 10:1)
        wasteful = [
            log for log in invocation_logs
            if log["output_tokens"] > 0
            and log["input_tokens"] / log["output_tokens"] > 10
        ]
        waste_pct = len(wasteful) / n * 100

        # Group waste by intent
        intent_waste: Dict[str, List[float]] = {}
        for log in wasteful:
            intent = log.get("intent", "unknown")
            if intent not in intent_waste:
                intent_waste[intent] = []
            intent_waste[intent].append(
                log["input_tokens"] / log["output_tokens"]
            )

        top_waste = sorted(
            [
                {"intent": k, "avg_ratio": sum(v) / len(v), "count": len(v)}
                for k, v in intent_waste.items()
            ],
            key=lambda x: x["count"],
            reverse=True,
        )[:5]

        # Estimate savings: if wasteful requests reduced to 5:1 ratio
        wasted_tokens = sum(
            log["input_tokens"] - (log["output_tokens"] * 5)
            for log in wasteful
            if log["input_tokens"] / max(log["output_tokens"], 1) > 5
        )
        # Rough cost: $0.003 per 1K input tokens (Sonnet)
        monthly_savings = (wasted_tokens / 1000) * 0.003 * 30

        return TokenEfficiencyReport(
            total_requests=n,
            avg_input_tokens=avg_input,
            avg_output_tokens=avg_output,
            input_output_ratio=ratio,
            waste_pct=waste_pct,
            top_waste_intents=top_waste,
            estimated_monthly_savings_if_optimized=monthly_savings,
        )

    def run_full_check(self) -> List[UtilizationSnapshot]:
        """Run utilization checks across all MangaAssist services."""
        self._snapshots.clear()

        self.check_ecs_utilization()
        self.check_bedrock_utilization(
            model_id="anthropic.claude-3-sonnet-20240229-v1:0",
            provisioned_units=535,
        )
        self.check_bedrock_utilization(
            model_id="anthropic.claude-3-haiku-20240307-v1:0",
            provisioned_units=268,
        )
        self.check_dynamodb_utilization()
        self.check_elasticache_utilization()

        return self._snapshots

    def get_alerts(self) -> List[UtilizationSnapshot]:
        """Return only non-healthy snapshots requiring attention."""
        return [
            s for s in self._snapshots
            if s.status != UtilizationStatus.HEALTHY
        ]


# --- Usage ---
if __name__ == "__main__":
    monitor = ResourceUtilizationMonitor(region="ap-northeast-1")
    snapshots = monitor.run_full_check()

    for snap in snapshots:
        icon = {
            UtilizationStatus.UNDER_UTILIZED: "[WASTE]",
            UtilizationStatus.HEALTHY: "[ OK  ]",
            UtilizationStatus.ELEVATED: "[WARN ]",
            UtilizationStatus.CRITICAL: "[CRIT ]",
        }[snap.status]

        print(f"{icon} {snap.service_name}: {snap.metric_name} = "
              f"{snap.current_value:.1f} ({snap.utilization_pct:.1f}%)")
        if snap.recommendation:
            print(f"       -> {snap.recommendation}")

Reference Table — Service x Metric x Target x Scaling x Cost

Service Key Metric Target Utilization Scaling Trigger Scaling Action Approx. Monthly Cost
Bedrock Sonnet InvocationsPerMinute 70% of provisioned units >80% sustained 3 min Increase provisioned units (manual) ~$24,500/unit
Bedrock Haiku InvocationsPerMinute 75% of provisioned units >85% sustained 3 min Increase provisioned units (manual) ~$5,840/unit
ECS Fargate CPUUtilization 65% >75% for 2 min (step), 10 invocations/task (target) Add 1-5 tasks ~$70/task
OpenSearch Serverless (Search) SearchLatency P95 P95 < 50ms P95 > 80ms for 5 min Auto-scale search OCU ~$175/OCU
OpenSearch Serverless (Base) IndexingRate Stable throughput N/A (fixed base OCU) Manual OCU adjustment ~$175/OCU
ElastiCache Redis DatabaseMemoryUsagePercentage < 75% > 80% for 5 min Vertical scaling (node upgrade) ~$50/node
DynamoDB (On-Demand) ReadThrottleEvents 0 throttle events ThrottleEvents > 0 Auto-scales (check hot key) Variable (~$200-800)
API Gateway WebSocket ConnectionCount < 80% of limit > 70% of account limit Request limit increase ~$1/million msgs

Key Takeaways

  1. Start from tokens, derive everything else: The token processing capacity model is the foundation. Every service allocation flows from (req/s) x (tokens/req) x (processing_time/token).

  2. Monitor input-to-output ratio: A ratio above 10:1 means prompt bloat or over-retrieved context. This is the single most actionable efficiency metric for GenAI.

  3. GenAI traffic needs GenAI-aware scaling: CPU-based auto-scaling is necessary but not sufficient. Custom metrics like "active Bedrock invocations per task" capture the actual bottleneck.

  4. Tag everything for cost visibility: Without intent-level and tier-level cost tagging, you cannot optimize what you cannot measure.

  5. Buffer for burstiness: GenAI traffic is correlated and bursty (manga releases, flash sales). A 25% buffer over calculated peak prevents throttling during real-world spikes.