Resource Allocation Architecture for GenAI Applications
AWS AIP-C01 Task 4.2 — Skill 4.2.5: Right-size resources to optimize FM application throughput Context: MangaAssist JP manga e-commerce chatbot — Bedrock Claude 3 (Sonnet/Haiku), OpenSearch Serverless, DynamoDB, ECS Fargate (orchestrator), API Gateway WebSocket, ElastiCache Redis. 1M messages/day, peak 20K concurrent sessions.
Skill Mapping
| Certification | Domain | Task | Skill |
|---|---|---|---|
| AWS AIP-C01 | Domain 4 — Responsible AI & Monitoring | Task 4.2 — Optimize FM application performance | Skill 4.2.5 — Right-size resources to optimize FM application throughput |
Skill scope: Design resource allocation strategies that match compute, memory, storage, and throughput capacity to the actual demands of each GenAI service layer — from FM invocation through orchestration to data retrieval — ensuring cost-efficient utilization without sacrificing latency or availability targets.
Mind Map — Resource Allocation Dimensions
mindmap
root((Resource<br/>Allocation))
Capacity Planning
Token Processing Budget
Tokens per Second Required
Bedrock Model Units
Peak vs Sustained Load
Buffer Margin 20-30%
Compute Sizing
ECS vCPU per Invocation
Memory per Context Window
Task Count at Peak
Fargate Spot Mix
Storage & Retrieval
OpenSearch Base OCU
OpenSearch Search OCU
DynamoDB RCU/WCU
ElastiCache Node Type
Utilization Monitoring
Prompt/Completion Patterns
Input Token Distribution
Output Token Distribution
Input-to-Output Ratio
Waste Detection
Service Utilization
ECS CPU & Memory %
Bedrock Throttle Rate
Cache Hit Efficiency
Search OCU Saturation
Cost Efficiency
Cost per Message
Cost per Intent Category
Idle Resource Detection
Reserved vs On-Demand ROI
Auto-Scaling
GenAI-Aware Metrics
Active Bedrock Invocations
Pending Queue Depth
Token Queue Backlog
WebSocket Connections
Scaling Policies
Target Tracking
Step Scaling
Predictive Scaling
Scheduled Actions
Cooldown & Stability
Scale-Out Aggressiveness
Scale-In Conservatism
Oscillation Prevention
Warm Pool Strategy
Cost Allocation
Tagging Strategy
By Intent Category
By Tier (Sonnet/Haiku)
By Environment
By Feature
Budget Controls
Daily Token Budget
Per-User Cost Cap
Alert Thresholds
Auto-Throttle on Overspend
Optimization
Model Tiering ROI
Cache Savings Attribution
Spot Savings Tracking
Reserved Capacity Amortization
Token Processing Capacity Model
The fundamental equation for GenAI resource allocation starts with token throughput. Every downstream resource must be sized to support the token pipeline.
The Capacity Equation
Required Token Capacity = (requests/sec) x (avg_tokens/request) x (processing_time/token)
For MangaAssist at 1M messages/day:
| Parameter | Value | Derivation |
|---|---|---|
| Messages per day | 1,000,000 | Given traffic volume |
| Average requests per second (sustained) | ~11.6 req/s | 1M / 86,400 seconds |
| Peak requests per second | ~230 req/s | 20K concurrent / avg 87s session with ~1 req/sec active |
| Avg input tokens per request | 1,200 tokens | System prompt (400) + context (500) + user query (100) + history (200) |
| Avg output tokens per request | 350 tokens | Typical manga recommendation response |
| Avg total tokens per request | 1,550 tokens | Input + output |
| Peak token throughput required | ~356,500 tokens/sec | 230 req/s x 1,550 tokens |
| Processing time per token (Sonnet) | ~15ms | Time-to-first-token + generation |
| Processing time per token (Haiku) | ~5ms | Faster for simple intent routing |
Mapping to Bedrock Model Units
Bedrock Provisioned Throughput = Peak tokens/min / model_unit_capacity
| Model | Unit Capacity | Required Units (peak) | Buffer (25%) | Total Units |
|---|---|---|---|---|
| Claude 3 Sonnet | 50,000 tokens/min | ~428 | 107 | 535 |
| Claude 3 Haiku | 100,000 tokens/min | ~214 | 54 | 268 |
| Blended (70% Haiku / 30% Sonnet) | — | ~278 effective | 70 | 348 |
MangaAssist design decision: Route 70% of traffic to Haiku (simple lookups, intent classification, FAQ) and 30% to Sonnet (complex recommendations, multi-turn reasoning). This reduces Bedrock cost by ~45% vs all-Sonnet while maintaining quality where it matters.
Utilization Monitoring for Prompt/Completion Patterns
Tracking the input-to-output token ratio reveals efficiency patterns — and waste.
Token Ratio Analysis
| Pattern | Input Tokens | Output Tokens | Ratio | Diagnosis |
|---|---|---|---|---|
| Efficient recommendation | 800 | 400 | 2:1 | Healthy — concise context, rich response |
| Bloated system prompt | 2,500 | 150 | 16.7:1 | Wasteful — paying for input tokens that produce little output |
| Over-retrieved RAG context | 3,000 | 300 | 10:1 | Wasteful — too many chunks retrieved from OpenSearch |
| Multi-turn with full history | 4,000 | 200 | 20:1 | Critical waste — history should be summarized |
| Simple FAQ response | 400 | 100 | 4:1 | Acceptable — consider caching these entirely |
| Complex manga comparison | 1,500 | 800 | 1.9:1 | Optimal — high-value response justifies input cost |
Target ratio for MangaAssist: 2:1 to 5:1 input-to-output ratio. Anything above 10:1 triggers a prompt optimization review.
GenAI-Specific Resource Dimensions
Each service in the MangaAssist stack has distinct resource dimensions that must be allocated in proportion to the token pipeline.
| Service | Resource Dimension | Metric | Target Utilization | Scaling Trigger | Cost Impact |
|---|---|---|---|---|---|
| Bedrock Claude 3 Sonnet | Provisioned throughput (model units) | InvocationsPerMinute, TokensProcessed | 70% of provisioned | >80% for 3 min | $63/model unit/hr |
| Bedrock Claude 3 Haiku | Provisioned throughput (model units) | InvocationsPerMinute, TokensProcessed | 75% of provisioned | >85% for 3 min | $8/model unit/hr |
| ECS Fargate (orchestrator) | vCPU + Memory per task | CPUUtilization, MemoryUtilization | 65% CPU, 70% Memory | CPU >75% for 2 min | $0.04/vCPU-hr + $0.004/GB-hr |
| OpenSearch Serverless | OCU (base + search) | SearchLatency, IndexingRate | Search P95 < 50ms | P95 > 80ms for 5 min | $0.24/OCU-hr |
| ElastiCache Redis | Node type (memory + connections) | CurrConnections, BytesUsedForCache, CacheHitRate | Memory < 75%, Connections < 80% | Memory > 80% | $0.068/hr (r6g.large) |
| DynamoDB | RCU/WCU or on-demand | ConsumedReadCapacityUnits, ThrottledRequests | 70% of provisioned | ThrottledRequests > 0 | $0.25/WCU-mo, $0.05/RCU-mo |
| API Gateway WebSocket | Connections + message rate | ConnectionCount, MessageCount | < 80% of account limit | > 70% account limit | $1/million messages |
Auto-Scaling Configurations for GenAI Traffic
GenAI traffic is bursty and correlated — a new manga release can spike traffic 10x in minutes. Traditional CPU-based scaling is too slow.
ECS Auto-Scaling with GenAI-Aware Metrics
flowchart TD
subgraph Metrics["GenAI-Aware Scaling Metrics"]
A["Active Bedrock Invocations<br/>(custom CloudWatch metric)"]
B["WebSocket Connection Count<br/>(API Gateway metric)"]
C["Pending Queue Depth<br/>(SQS/internal metric)"]
D["ECS CPU Utilization<br/>(standard metric)"]
end
subgraph Policy["Scaling Policy Stack"]
E["Target Tracking Policy<br/>Target: 10 Bedrock invocations/task"]
F["Step Scaling Policy<br/>CPU > 75% → +2 tasks<br/>CPU > 90% → +5 tasks"]
G["Predictive Scaling<br/>Based on historical patterns<br/>Pre-scale for manga release events"]
H["Scheduled Scaling<br/>JP prime time: 19:00-23:00 JST<br/>Minimum 50 tasks"]
end
subgraph Guardrails["Scaling Guardrails"]
I["Min Tasks: 10<br/>Max Tasks: 200<br/>Scale-out cooldown: 60s<br/>Scale-in cooldown: 300s"]
end
A --> E
B --> E
C --> F
D --> F
E --> I
F --> I
G --> I
H --> I
I --> J["ECS Service<br/>Desired Count Updated"]
Step Scaling Policy Detail
| Metric Threshold | Action | Cooldown | Rationale |
|---|---|---|---|
| CPU > 60% for 2 min | Add 1 task | 60s | Gradual scale for normal growth |
| CPU > 75% for 1 min | Add 2 tasks | 60s | Faster response to increasing load |
| CPU > 90% for 30s | Add 5 tasks | 30s | Emergency scale for traffic spikes |
| CPU < 40% for 10 min | Remove 1 task | 300s | Conservative scale-in to avoid flapping |
| CPU < 25% for 15 min | Remove 2 tasks | 300s | More aggressive scale-in during quiet periods |
Cost Allocation Tagging Strategy
Every resource in the MangaAssist stack must be tagged for cost attribution. This enables per-feature, per-tier, and per-environment cost analysis.
flowchart LR
subgraph Tags["Cost Allocation Tags"]
direction TB
T1["app: mangaassist"]
T2["environment: prod / staging / dev"]
T3["intent: recommendation / search / faq / order"]
T4["tier: sonnet / haiku / cache-hit"]
T5["team: genai-platform"]
T6["cost-center: chatbot-ops"]
end
subgraph Resources["Tagged Resources"]
R1["Bedrock Model Invocations"]
R2["ECS Fargate Tasks"]
R3["OpenSearch Collections"]
R4["DynamoDB Tables"]
R5["ElastiCache Clusters"]
R6["API Gateway APIs"]
end
Tags --> Resources
subgraph Analysis["Cost Views"]
A1["Cost per Intent Category"]
A2["Cost per Model Tier"]
A3["Cost per Environment"]
A4["Cost Trend by Feature"]
end
Resources --> Analysis
Architecture — Resource Allocation Across All Services
flowchart TB
subgraph Traffic["Incoming Traffic"]
U["Users<br/>1M msg/day<br/>Peak 20K concurrent"]
end
subgraph Gateway["API Gateway WebSocket"]
GW["Connection Limit: 100K<br/>Message Rate: 10K/sec<br/>Scaling: Managed"]
end
subgraph Orchestration["ECS Fargate — Orchestrator"]
direction TB
ECS["Task Definition:<br/>2 vCPU / 4 GB Memory<br/>Min: 10 / Max: 200 tasks<br/>Target: 65% CPU"]
ECS_AS["Auto-Scaling:<br/>Target Tracking (Bedrock invocations/task)<br/>Step Scaling (CPU)<br/>Predictive (JP prime time)"]
end
subgraph FM["Bedrock FM Layer"]
direction TB
SONNET["Claude 3 Sonnet<br/>Provisioned: 535 units<br/>Use: Complex reasoning<br/>30% of traffic"]
HAIKU["Claude 3 Haiku<br/>Provisioned: 268 units<br/>Use: Simple queries<br/>70% of traffic"]
end
subgraph Data["Data Layer"]
direction TB
OS["OpenSearch Serverless<br/>Base OCU: 4<br/>Search OCU: 4-12 (auto)<br/>Latency target: P95 < 50ms"]
DYNAMO["DynamoDB<br/>Mode: On-Demand<br/>Session table: ~2K RCU peak<br/>Auto-scaling backup provisioned"]
CACHE["ElastiCache Redis<br/>r6g.large (2 nodes)<br/>13 GB memory each<br/>Eviction: allkeys-lfu"]
end
U --> GW
GW --> ECS
ECS --> SONNET
ECS --> HAIKU
ECS --> OS
ECS --> DYNAMO
ECS --> CACHE
subgraph Monitor["Resource Monitoring"]
CW["CloudWatch Dashboards<br/>+ Custom Metrics<br/>+ Composite Alarms"]
end
ECS_AS -.-> ECS
ECS -.-> Monitor
FM -.-> Monitor
Data -.-> Monitor
Python — CapacityPlanner Class
"""
MangaAssist Resource Allocation — Capacity Planner
Calculates required capacity across all services based on traffic projections.
"""
import math
from dataclasses import dataclass, field
from typing import Dict, Optional
from datetime import datetime, timedelta
@dataclass
class TrafficProfile:
"""Defines expected traffic characteristics for MangaAssist."""
messages_per_day: int = 1_000_000
peak_concurrent_sessions: int = 20_000
avg_session_duration_sec: int = 87
avg_requests_per_session: float = 3.2
peak_to_average_ratio: float = 20.0 # peak can be 20x average
sonnet_traffic_pct: float = 0.30
haiku_traffic_pct: float = 0.70
# Token characteristics
avg_input_tokens: int = 1_200
avg_output_tokens: int = 350
max_input_tokens: int = 4_000
max_output_tokens: int = 1_000
@dataclass
class ServiceCapacity:
"""Capacity allocation for a single service."""
service_name: str
resource_type: str
current_allocation: float
required_allocation: float
recommended_allocation: float # with buffer
buffer_pct: float
utilization_target_pct: float
estimated_monthly_cost: float
scaling_policy: str
class CapacityPlanner:
"""
Calculates required capacity for every service in the MangaAssist stack.
Starts from traffic volume and derives per-service requirements using the
token processing capacity model:
required_compute = (req/s) x (tokens/req) x (processing_time/token)
"""
# Bedrock pricing and capacity constants
SONNET_TOKENS_PER_UNIT_MIN = 50_000
HAIKU_TOKENS_PER_UNIT_MIN = 100_000
SONNET_UNIT_COST_HR = 63.0
HAIKU_UNIT_COST_HR = 8.0
SONNET_MS_PER_TOKEN = 15
HAIKU_MS_PER_TOKEN = 5
# ECS Fargate constants
ECS_VCPU_PER_TASK = 2
ECS_MEMORY_GB_PER_TASK = 4
ECS_VCPU_COST_HR = 0.04
ECS_MEM_COST_GB_HR = 0.004
MAX_BEDROCK_INVOCATIONS_PER_TASK = 10 # concurrent invocations
# OpenSearch constants
OCU_COST_HR = 0.24
BASE_OCU_MIN = 2
SEARCH_OCU_MIN = 2
# ElastiCache constants
REDIS_MEMORY_PER_CACHED_RESPONSE_KB = 8 # avg embedding + response
REDIS_NODE_MEMORY_GB = 13 # r6g.large
# DynamoDB constants
RCU_COST_MONTH = 0.05
WCU_COST_MONTH = 0.25
def __init__(
self,
traffic: TrafficProfile,
buffer_pct: float = 0.25,
):
self.traffic = traffic
self.buffer_pct = buffer_pct
self._capacity_plan: Dict[str, ServiceCapacity] = {}
def calculate_sustained_rps(self) -> float:
"""Average requests per second across the day."""
return self.traffic.messages_per_day / 86_400
def calculate_peak_rps(self) -> float:
"""Peak requests per second based on concurrent session model."""
active_sessions = self.traffic.peak_concurrent_sessions
avg_think_time_sec = (
self.traffic.avg_session_duration_sec
/ self.traffic.avg_requests_per_session
)
return active_sessions / avg_think_time_sec
def calculate_peak_tokens_per_sec(self) -> Dict[str, float]:
"""Peak token throughput by model tier."""
peak_rps = self.calculate_peak_rps()
total_tokens_per_req = (
self.traffic.avg_input_tokens + self.traffic.avg_output_tokens
)
sonnet_tps = (
peak_rps
* self.traffic.sonnet_traffic_pct
* total_tokens_per_req
)
haiku_tps = (
peak_rps
* self.traffic.haiku_traffic_pct
* total_tokens_per_req
)
return {
"sonnet_tokens_per_sec": sonnet_tps,
"haiku_tokens_per_sec": haiku_tps,
"total_tokens_per_sec": sonnet_tps + haiku_tps,
}
def plan_bedrock_capacity(self) -> Dict[str, ServiceCapacity]:
"""Calculate Bedrock provisioned throughput model units."""
tokens = self.calculate_peak_tokens_per_sec()
# Convert tokens/sec to tokens/min for Bedrock units
sonnet_tpm = tokens["sonnet_tokens_per_sec"] * 60
haiku_tpm = tokens["haiku_tokens_per_sec"] * 60
sonnet_units_raw = sonnet_tpm / self.SONNET_TOKENS_PER_UNIT_MIN
haiku_units_raw = haiku_tpm / self.HAIKU_TOKENS_PER_UNIT_MIN
sonnet_units_buffered = math.ceil(
sonnet_units_raw * (1 + self.buffer_pct)
)
haiku_units_buffered = math.ceil(
haiku_units_raw * (1 + self.buffer_pct)
)
self._capacity_plan["bedrock_sonnet"] = ServiceCapacity(
service_name="Bedrock Claude 3 Sonnet",
resource_type="Model Units (Provisioned Throughput)",
current_allocation=0,
required_allocation=sonnet_units_raw,
recommended_allocation=sonnet_units_buffered,
buffer_pct=self.buffer_pct,
utilization_target_pct=0.70,
estimated_monthly_cost=sonnet_units_buffered * self.SONNET_UNIT_COST_HR * 730,
scaling_policy="Manual provisioning with monthly review",
)
self._capacity_plan["bedrock_haiku"] = ServiceCapacity(
service_name="Bedrock Claude 3 Haiku",
resource_type="Model Units (Provisioned Throughput)",
current_allocation=0,
required_allocation=haiku_units_raw,
recommended_allocation=haiku_units_buffered,
buffer_pct=self.buffer_pct,
utilization_target_pct=0.75,
estimated_monthly_cost=haiku_units_buffered * self.HAIKU_UNIT_COST_HR * 730,
scaling_policy="Manual provisioning with monthly review",
)
return {
"sonnet": self._capacity_plan["bedrock_sonnet"],
"haiku": self._capacity_plan["bedrock_haiku"],
}
def plan_ecs_capacity(self) -> ServiceCapacity:
"""Calculate ECS Fargate task count for orchestrator."""
peak_rps = self.calculate_peak_rps()
# Each task handles MAX_BEDROCK_INVOCATIONS_PER_TASK concurrent
# Bedrock invocations (async I/O bound, not CPU bound)
tasks_required = math.ceil(
peak_rps / self.MAX_BEDROCK_INVOCATIONS_PER_TASK
)
tasks_buffered = math.ceil(tasks_required * (1 + self.buffer_pct))
cost_per_task_hr = (
self.ECS_VCPU_PER_TASK * self.ECS_VCPU_COST_HR
+ self.ECS_MEMORY_GB_PER_TASK * self.ECS_MEM_COST_GB_HR
)
self._capacity_plan["ecs_orchestrator"] = ServiceCapacity(
service_name="ECS Fargate Orchestrator",
resource_type=f"Tasks ({self.ECS_VCPU_PER_TASK} vCPU / {self.ECS_MEMORY_GB_PER_TASK} GB)",
current_allocation=0,
required_allocation=tasks_required,
recommended_allocation=tasks_buffered,
buffer_pct=self.buffer_pct,
utilization_target_pct=0.65,
estimated_monthly_cost=tasks_buffered * cost_per_task_hr * 730,
scaling_policy="Target tracking: 10 Bedrock invocations/task + step scaling on CPU",
)
return self._capacity_plan["ecs_orchestrator"]
def plan_opensearch_capacity(self) -> ServiceCapacity:
"""Calculate OpenSearch Serverless OCU requirements."""
peak_rps = self.calculate_peak_rps()
# Estimate: each search OCU handles ~50 vector search queries/sec
search_queries_per_sec = peak_rps * 0.6 # 60% of requests need RAG
search_ocu_required = max(
self.SEARCH_OCU_MIN,
math.ceil(search_queries_per_sec / 50),
)
search_ocu_buffered = math.ceil(
search_ocu_required * (1 + self.buffer_pct)
)
total_ocu = self.BASE_OCU_MIN + search_ocu_buffered
self._capacity_plan["opensearch"] = ServiceCapacity(
service_name="OpenSearch Serverless",
resource_type=f"OCU (Base: {self.BASE_OCU_MIN} + Search: {search_ocu_buffered})",
current_allocation=0,
required_allocation=self.BASE_OCU_MIN + search_ocu_required,
recommended_allocation=total_ocu,
buffer_pct=self.buffer_pct,
utilization_target_pct=0.70,
estimated_monthly_cost=total_ocu * self.OCU_COST_HR * 730,
scaling_policy="OpenSearch auto-scaling for search OCU; base OCU fixed",
)
return self._capacity_plan["opensearch"]
def plan_elasticache_capacity(self) -> ServiceCapacity:
"""Calculate ElastiCache Redis capacity for semantic cache."""
# Target: cache top 30% of queries (reduces Bedrock calls significantly)
daily_unique_queries = int(self.traffic.messages_per_day * 0.4)
cache_target_entries = int(daily_unique_queries * 0.30)
memory_required_gb = (
cache_target_entries * self.REDIS_MEMORY_PER_CACHED_RESPONSE_KB
) / (1024 * 1024)
nodes_required = max(2, math.ceil(
memory_required_gb / (self.REDIS_NODE_MEMORY_GB * 0.75)
)) # 75% memory target
self._capacity_plan["elasticache"] = ServiceCapacity(
service_name="ElastiCache Redis",
resource_type=f"r6g.large nodes ({self.REDIS_NODE_MEMORY_GB} GB each)",
current_allocation=0,
required_allocation=nodes_required,
recommended_allocation=nodes_required,
buffer_pct=0.0, # node count already accounts for replication
utilization_target_pct=0.75,
estimated_monthly_cost=nodes_required * 0.068 * 730,
scaling_policy="Vertical scaling (node type upgrade) + eviction policy allkeys-lfu",
)
return self._capacity_plan["elasticache"]
def plan_dynamodb_capacity(self) -> ServiceCapacity:
"""Calculate DynamoDB capacity for session storage."""
peak_rps = self.calculate_peak_rps()
# Each request reads session history (1-3 reads) and writes 1 update
peak_rcu = math.ceil(peak_rps * 2.5) # avg 2.5 reads per request
peak_wcu = math.ceil(peak_rps * 1.2) # 1 write + occasional metadata
# On-demand pricing estimate (simpler, no throttle risk)
on_demand_monthly = (
peak_rcu * 86_400 * 30 * 0.25 / 1_000_000 # read request units
+ peak_wcu * 86_400 * 30 * 1.25 / 1_000_000 # write request units
)
self._capacity_plan["dynamodb"] = ServiceCapacity(
service_name="DynamoDB Session Table",
resource_type=f"On-Demand (peak ~{peak_rcu} RCU / ~{peak_wcu} WCU equivalent)",
current_allocation=0,
required_allocation=peak_rcu,
recommended_allocation=peak_rcu,
buffer_pct=0.0, # on-demand auto-scales
utilization_target_pct=0.80,
estimated_monthly_cost=on_demand_monthly,
scaling_policy="On-demand mode (auto-scales to traffic)",
)
return self._capacity_plan["dynamodb"]
def generate_full_plan(self) -> Dict[str, ServiceCapacity]:
"""Generate capacity plan for all services."""
self.plan_bedrock_capacity()
self.plan_ecs_capacity()
self.plan_opensearch_capacity()
self.plan_elasticache_capacity()
self.plan_dynamodb_capacity()
return self._capacity_plan
def print_summary(self) -> None:
"""Print a formatted summary of the capacity plan."""
plan = self.generate_full_plan()
total_monthly = 0.0
print("=" * 80)
print("MANGAASSIST RESOURCE ALLOCATION — CAPACITY PLAN")
print(f"Traffic: {self.traffic.messages_per_day:,} msg/day | "
f"Peak: {self.traffic.peak_concurrent_sessions:,} concurrent")
print(f"Buffer: {self.buffer_pct:.0%}")
print("=" * 80)
for key, cap in plan.items():
print(f"\n--- {cap.service_name} ---")
print(f" Resource: {cap.resource_type}")
print(f" Required: {cap.required_allocation:.1f}")
print(f" Recommended (with buffer): {cap.recommended_allocation:.1f}")
print(f" Utilization target: {cap.utilization_target_pct:.0%}")
print(f" Scaling: {cap.scaling_policy}")
print(f" Est. monthly cost: ${cap.estimated_monthly_cost:,.2f}")
total_monthly += cap.estimated_monthly_cost
print("\n" + "=" * 80)
print(f"TOTAL ESTIMATED MONTHLY COST: ${total_monthly:,.2f}")
print("=" * 80)
# --- Usage ---
if __name__ == "__main__":
traffic = TrafficProfile(
messages_per_day=1_000_000,
peak_concurrent_sessions=20_000,
)
planner = CapacityPlanner(traffic=traffic, buffer_pct=0.25)
planner.print_summary()
Python — ResourceUtilizationMonitor
"""
MangaAssist Resource Allocation — Utilization Monitor
Tracks real-time utilization across services and detects inefficiency.
"""
import time
import logging
from dataclasses import dataclass, field
from typing import Dict, List, Optional
from datetime import datetime, timedelta
from enum import Enum
import boto3
logger = logging.getLogger(__name__)
class UtilizationStatus(Enum):
"""Health status based on utilization level."""
UNDER_UTILIZED = "under_utilized" # below 30% — wasting money
HEALTHY = "healthy" # 30-70% — optimal range
ELEVATED = "elevated" # 70-85% — approaching limits
CRITICAL = "critical" # above 85% — scaling needed NOW
@dataclass
class UtilizationSnapshot:
"""Point-in-time utilization reading for a service."""
service_name: str
metric_name: str
current_value: float
target_value: float
utilization_pct: float
status: UtilizationStatus
timestamp: datetime
cost_impact_hourly: float = 0.0
recommendation: str = ""
@dataclass
class TokenEfficiencyReport:
"""Analysis of token usage patterns for waste detection."""
total_requests: int
avg_input_tokens: float
avg_output_tokens: float
input_output_ratio: float
waste_pct: float # % of requests with ratio > 10:1
top_waste_intents: List[Dict[str, float]] = field(default_factory=list)
estimated_monthly_savings_if_optimized: float = 0.0
class ResourceUtilizationMonitor:
"""
Monitors resource utilization across MangaAssist services and flags
inefficiency — both over-provisioning (cost waste) and under-provisioning
(performance risk).
"""
UTILIZATION_THRESHOLDS = {
"under_utilized": 0.30,
"healthy_upper": 0.70,
"elevated_upper": 0.85,
}
def __init__(self, region: str = "ap-northeast-1"):
self.region = region
self.cloudwatch = boto3.client("cloudwatch", region_name=region)
self.bedrock = boto3.client("bedrock-runtime", region_name=region)
self._snapshots: List[UtilizationSnapshot] = []
def _classify_utilization(self, pct: float) -> UtilizationStatus:
"""Classify utilization percentage into status."""
if pct < self.UTILIZATION_THRESHOLDS["under_utilized"]:
return UtilizationStatus.UNDER_UTILIZED
elif pct < self.UTILIZATION_THRESHOLDS["healthy_upper"]:
return UtilizationStatus.HEALTHY
elif pct < self.UTILIZATION_THRESHOLDS["elevated_upper"]:
return UtilizationStatus.ELEVATED
else:
return UtilizationStatus.CRITICAL
def _get_cloudwatch_avg(
self,
namespace: str,
metric_name: str,
dimensions: List[Dict[str, str]],
period_minutes: int = 5,
) -> Optional[float]:
"""Fetch average metric value from CloudWatch."""
try:
response = self.cloudwatch.get_metric_statistics(
Namespace=namespace,
MetricName=metric_name,
Dimensions=dimensions,
StartTime=datetime.utcnow() - timedelta(minutes=period_minutes),
EndTime=datetime.utcnow(),
Period=period_minutes * 60,
Statistics=["Average"],
)
datapoints = response.get("Datapoints", [])
if datapoints:
return datapoints[-1]["Average"]
return None
except Exception as e:
logger.error(f"CloudWatch query failed: {e}")
return None
def check_ecs_utilization(
self,
cluster_name: str = "mangaassist-prod",
service_name: str = "orchestrator",
) -> UtilizationSnapshot:
"""Monitor ECS Fargate orchestrator CPU and memory utilization."""
cpu_pct = self._get_cloudwatch_avg(
namespace="AWS/ECS",
metric_name="CPUUtilization",
dimensions=[
{"Name": "ClusterName", "Value": cluster_name},
{"Name": "ServiceName", "Value": service_name},
],
)
if cpu_pct is None:
cpu_pct = 0.0
status = self._classify_utilization(cpu_pct / 100)
recommendation = ""
if status == UtilizationStatus.UNDER_UTILIZED:
recommendation = (
"ECS tasks are under-utilized. Consider reducing min task "
"count or switching to smaller task size to save costs."
)
elif status == UtilizationStatus.CRITICAL:
recommendation = (
"ECS CPU critical. Immediate scale-out needed. Check if "
"auto-scaling policy is responding. Consider increasing "
"max task count."
)
snapshot = UtilizationSnapshot(
service_name="ECS Fargate Orchestrator",
metric_name="CPUUtilization",
current_value=cpu_pct,
target_value=65.0,
utilization_pct=cpu_pct,
status=status,
timestamp=datetime.utcnow(),
recommendation=recommendation,
)
self._snapshots.append(snapshot)
return snapshot
def check_bedrock_utilization(
self,
model_id: str = "anthropic.claude-3-sonnet-20240229-v1:0",
provisioned_units: int = 535,
) -> UtilizationSnapshot:
"""Monitor Bedrock model invocation rate vs provisioned capacity."""
invocations = self._get_cloudwatch_avg(
namespace="AWS/Bedrock",
metric_name="Invocations",
dimensions=[
{"Name": "ModelId", "Value": model_id},
],
)
if invocations is None:
invocations = 0.0
# Estimate utilization as invocations / theoretical max
max_invocations_per_min = provisioned_units * 2 # rough estimate
utilization_pct = (
(invocations / max_invocations_per_min * 100)
if max_invocations_per_min > 0 else 0.0
)
status = self._classify_utilization(utilization_pct / 100)
snapshot = UtilizationSnapshot(
service_name=f"Bedrock {model_id.split('.')[1].split('-')[0].title()}",
metric_name="InvocationUtilization",
current_value=invocations,
target_value=max_invocations_per_min * 0.70,
utilization_pct=utilization_pct,
status=status,
timestamp=datetime.utcnow(),
)
self._snapshots.append(snapshot)
return snapshot
def check_dynamodb_utilization(
self,
table_name: str = "mangaassist-sessions",
) -> UtilizationSnapshot:
"""Monitor DynamoDB consumed capacity vs provisioned/limits."""
consumed_rcu = self._get_cloudwatch_avg(
namespace="AWS/DynamoDB",
metric_name="ConsumedReadCapacityUnits",
dimensions=[
{"Name": "TableName", "Value": table_name},
],
)
throttle_count = self._get_cloudwatch_avg(
namespace="AWS/DynamoDB",
metric_name="ReadThrottleEvents",
dimensions=[
{"Name": "TableName", "Value": table_name},
],
)
consumed_rcu = consumed_rcu or 0.0
throttle_count = throttle_count or 0.0
# For on-demand: flag if throttle events detected
status = UtilizationStatus.HEALTHY
recommendation = ""
if throttle_count > 0:
status = UtilizationStatus.CRITICAL
recommendation = (
f"DynamoDB throttling detected ({throttle_count:.0f} events). "
"On-demand table may have hit partition throughput limits. "
"Check for hot partitions in session key design."
)
snapshot = UtilizationSnapshot(
service_name="DynamoDB Sessions",
metric_name="ConsumedReadCapacity",
current_value=consumed_rcu,
target_value=0.0, # on-demand has no fixed target
utilization_pct=0.0,
status=status,
timestamp=datetime.utcnow(),
recommendation=recommendation,
)
self._snapshots.append(snapshot)
return snapshot
def check_elasticache_utilization(
self,
cluster_id: str = "mangaassist-cache",
) -> UtilizationSnapshot:
"""Monitor ElastiCache memory and connection utilization."""
memory_pct = self._get_cloudwatch_avg(
namespace="AWS/ElastiCache",
metric_name="DatabaseMemoryUsagePercentage",
dimensions=[
{"Name": "CacheClusterId", "Value": cluster_id},
],
)
memory_pct = memory_pct or 0.0
status = self._classify_utilization(memory_pct / 100)
recommendation = ""
if status == UtilizationStatus.CRITICAL:
recommendation = (
"ElastiCache memory critical. Verify allkeys-lfu eviction "
"is active. Consider upgrading node type or reducing "
"cache TTL for low-value entries."
)
elif status == UtilizationStatus.UNDER_UTILIZED:
recommendation = (
"ElastiCache memory under-utilized. Consider downgrading "
"node type to save costs, or increasing cache coverage."
)
snapshot = UtilizationSnapshot(
service_name="ElastiCache Redis",
metric_name="MemoryUsagePercentage",
current_value=memory_pct,
target_value=75.0,
utilization_pct=memory_pct,
status=status,
timestamp=datetime.utcnow(),
recommendation=recommendation,
)
self._snapshots.append(snapshot)
return snapshot
def analyze_token_efficiency(
self,
invocation_logs: List[Dict],
) -> TokenEfficiencyReport:
"""
Analyze token usage patterns to detect waste.
A request with input:output ratio > 10:1 is flagged as wasteful —
it means large prompts are producing small outputs (bloated system
prompt, over-retrieved RAG context, or unnecessary history).
"""
if not invocation_logs:
return TokenEfficiencyReport(
total_requests=0,
avg_input_tokens=0,
avg_output_tokens=0,
input_output_ratio=0,
waste_pct=0,
)
total_input = sum(log["input_tokens"] for log in invocation_logs)
total_output = sum(log["output_tokens"] for log in invocation_logs)
n = len(invocation_logs)
avg_input = total_input / n
avg_output = total_output / n
ratio = avg_input / avg_output if avg_output > 0 else float("inf")
# Count wasteful requests (ratio > 10:1)
wasteful = [
log for log in invocation_logs
if log["output_tokens"] > 0
and log["input_tokens"] / log["output_tokens"] > 10
]
waste_pct = len(wasteful) / n * 100
# Group waste by intent
intent_waste: Dict[str, List[float]] = {}
for log in wasteful:
intent = log.get("intent", "unknown")
if intent not in intent_waste:
intent_waste[intent] = []
intent_waste[intent].append(
log["input_tokens"] / log["output_tokens"]
)
top_waste = sorted(
[
{"intent": k, "avg_ratio": sum(v) / len(v), "count": len(v)}
for k, v in intent_waste.items()
],
key=lambda x: x["count"],
reverse=True,
)[:5]
# Estimate savings: if wasteful requests reduced to 5:1 ratio
wasted_tokens = sum(
log["input_tokens"] - (log["output_tokens"] * 5)
for log in wasteful
if log["input_tokens"] / max(log["output_tokens"], 1) > 5
)
# Rough cost: $0.003 per 1K input tokens (Sonnet)
monthly_savings = (wasted_tokens / 1000) * 0.003 * 30
return TokenEfficiencyReport(
total_requests=n,
avg_input_tokens=avg_input,
avg_output_tokens=avg_output,
input_output_ratio=ratio,
waste_pct=waste_pct,
top_waste_intents=top_waste,
estimated_monthly_savings_if_optimized=monthly_savings,
)
def run_full_check(self) -> List[UtilizationSnapshot]:
"""Run utilization checks across all MangaAssist services."""
self._snapshots.clear()
self.check_ecs_utilization()
self.check_bedrock_utilization(
model_id="anthropic.claude-3-sonnet-20240229-v1:0",
provisioned_units=535,
)
self.check_bedrock_utilization(
model_id="anthropic.claude-3-haiku-20240307-v1:0",
provisioned_units=268,
)
self.check_dynamodb_utilization()
self.check_elasticache_utilization()
return self._snapshots
def get_alerts(self) -> List[UtilizationSnapshot]:
"""Return only non-healthy snapshots requiring attention."""
return [
s for s in self._snapshots
if s.status != UtilizationStatus.HEALTHY
]
# --- Usage ---
if __name__ == "__main__":
monitor = ResourceUtilizationMonitor(region="ap-northeast-1")
snapshots = monitor.run_full_check()
for snap in snapshots:
icon = {
UtilizationStatus.UNDER_UTILIZED: "[WASTE]",
UtilizationStatus.HEALTHY: "[ OK ]",
UtilizationStatus.ELEVATED: "[WARN ]",
UtilizationStatus.CRITICAL: "[CRIT ]",
}[snap.status]
print(f"{icon} {snap.service_name}: {snap.metric_name} = "
f"{snap.current_value:.1f} ({snap.utilization_pct:.1f}%)")
if snap.recommendation:
print(f" -> {snap.recommendation}")
Reference Table — Service x Metric x Target x Scaling x Cost
| Service | Key Metric | Target Utilization | Scaling Trigger | Scaling Action | Approx. Monthly Cost |
|---|---|---|---|---|---|
| Bedrock Sonnet | InvocationsPerMinute | 70% of provisioned units | >80% sustained 3 min | Increase provisioned units (manual) | ~$24,500/unit |
| Bedrock Haiku | InvocationsPerMinute | 75% of provisioned units | >85% sustained 3 min | Increase provisioned units (manual) | ~$5,840/unit |
| ECS Fargate | CPUUtilization | 65% | >75% for 2 min (step), 10 invocations/task (target) | Add 1-5 tasks | ~$70/task |
| OpenSearch Serverless (Search) | SearchLatency P95 | P95 < 50ms | P95 > 80ms for 5 min | Auto-scale search OCU | ~$175/OCU |
| OpenSearch Serverless (Base) | IndexingRate | Stable throughput | N/A (fixed base OCU) | Manual OCU adjustment | ~$175/OCU |
| ElastiCache Redis | DatabaseMemoryUsagePercentage | < 75% | > 80% for 5 min | Vertical scaling (node upgrade) | ~$50/node |
| DynamoDB (On-Demand) | ReadThrottleEvents | 0 throttle events | ThrottleEvents > 0 | Auto-scales (check hot key) | Variable (~$200-800) |
| API Gateway WebSocket | ConnectionCount | < 80% of limit | > 70% of account limit | Request limit increase | ~$1/million msgs |
Key Takeaways
-
Start from tokens, derive everything else: The token processing capacity model is the foundation. Every service allocation flows from
(req/s) x (tokens/req) x (processing_time/token). -
Monitor input-to-output ratio: A ratio above 10:1 means prompt bloat or over-retrieved context. This is the single most actionable efficiency metric for GenAI.
-
GenAI traffic needs GenAI-aware scaling: CPU-based auto-scaling is necessary but not sufficient. Custom metrics like "active Bedrock invocations per task" capture the actual bottleneck.
-
Tag everything for cost visibility: Without intent-level and tier-level cost tagging, you cannot optimize what you cannot measure.
-
Buffer for burstiness: GenAI traffic is correlated and bursty (manga releases, flash sales). A 25% buffer over calculated peak prevents throttling during real-world spikes.