RBAC and Least Privilege Patterns for FM Access
MangaAssist context: JP Manga store chatbot on AWS — Bedrock Claude 3 (Sonnet at $3/$15 per 1M tokens input/output, Haiku at $0.25/$1.25), OpenSearch Serverless (vector store), DynamoDB (sessions/products), ECS Fargate (orchestrator), API Gateway WebSocket, ElastiCache Redis. Target: useful answer in under 3 seconds, 1M messages/day scale.
Skill Mapping
| Attribute | Value |
|---|---|
| Certification | AWS Certified AI Practitioner (AIP-C01) |
| Domain | 2 — Implementation and Integration of Foundation Models |
| Task | 2.3 — Describe methods to integrate FM-powered applications into enterprise systems |
| Skill | 2.3.3 — Create secure access frameworks to ensure appropriate security controls (identity federation between FM services and enterprise systems, role-based access control for model and data access, least privilege API access to FMs) |
1. Fine-Grained Access Control Architecture
1.1 Access Control Layers
MangaAssist implements access control at four distinct layers, each enforced independently to provide defense-in-depth:
Layer 1: Organization Level — SCPs restrict entire accounts
Layer 2: Account Level — Permission boundaries limit teams
Layer 3: Role Level — IAM policies scope individual actions
Layer 4: Request Level — Application-layer RBAC per API call
flowchart TB
subgraph OrgLayer["Layer 1: Organization (SCPs)"]
SCP["Service Control Policies<br/>Block unapproved models org-wide"]
end
subgraph AccountLayer["Layer 2: Account (Permission Boundaries)"]
PB_Dev["Dev Team Boundary<br/>No production Bedrock access"]
PB_Ops["Ops Team Boundary<br/>No model customization"]
PB_Admin["Admin Team Boundary<br/>Full Bedrock scope"]
end
subgraph RoleLayer["Layer 3: Role (IAM Policies)"]
Role_ECS["ECS Task Role<br/>InvokeModel Haiku only"]
Role_Dev["Developer Role<br/>InvokeModel Sonnet + Haiku (staging)"]
Role_CI["CI/CD Role<br/>ListModels + GetModel only"]
end
subgraph AppLayer["Layer 4: Application (RBAC Engine)"]
RBAC["FMAccessController<br/>Per-request token limits, rate limits, cost guards"]
end
SCP --> PB_Dev
SCP --> PB_Ops
SCP --> PB_Admin
PB_Dev --> Role_Dev
PB_Ops --> Role_ECS
PB_Admin --> Role_ECS
PB_Admin --> Role_Dev
PB_Admin --> Role_CI
Role_ECS --> RBAC
Role_Dev --> RBAC
Role_CI --> RBAC
style OrgLayer fill:#ffebee,stroke:#c62828,stroke-width:2px
style AccountLayer fill:#fff3e0,stroke:#e65100,stroke-width:2px
style RoleLayer fill:#e8f5e9,stroke:#2e7d32,stroke-width:2px
style AppLayer fill:#e3f2fd,stroke:#1565c0,stroke-width:2px
1.2 Why Four Layers?
A single layer of access control is brittle. If the IAM policy has a misconfiguration (e.g., a wildcard resource), the SCP still blocks unauthorized models. If the SCP is too broad, the permission boundary still constrains the team. If both fail, the application-layer RBAC engine catches the violation and logs it.
At MangaAssist's scale of 1M messages/day, a single misconfigured wildcard could expose the Sonnet endpoint (at $3/$15 per 1M tokens) to all traffic, costing an additional ~$4,125/day. Four layers ensure this never happens.
2. Service Control Policies (SCPs)
2.1 SCP: Restrict Bedrock Models Organization-Wide
This SCP ensures that across the entire AWS Organization, only approved Bedrock models can be invoked. Any new model must be explicitly added here before any account can use it.
{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "DenyUnapprovedBedrockModels",
"Effect": "Deny",
"Action": [
"bedrock:InvokeModel",
"bedrock:InvokeModelWithResponseStream"
],
"Resource": "*",
"Condition": {
"ForAnyValue:StringNotEquals": {
"bedrock:FoundationModelId": [
"anthropic.claude-3-sonnet-20240229-v1:0",
"anthropic.claude-3-haiku-20240307-v1:0"
]
}
}
},
{
"Sid": "DenyBedrockModelCustomization",
"Effect": "Deny",
"Action": [
"bedrock:CreateModelCustomizationJob",
"bedrock:CreateProvisionedModelThroughput",
"bedrock:DeleteProvisionedModelThroughput"
],
"Resource": "*",
"Condition": {
"StringNotLike": {
"aws:PrincipalArn": [
"arn:aws:iam::*:role/manga-admin-role"
]
}
}
},
{
"Sid": "DenyBedrockOutsideApprovedRegions",
"Effect": "Deny",
"Action": "bedrock:*",
"Resource": "*",
"Condition": {
"StringNotEquals": {
"aws:RequestedRegion": [
"ap-northeast-1",
"us-east-1"
]
}
}
},
{
"Sid": "RequireBedrockVPCEndpoint",
"Effect": "Deny",
"Action": [
"bedrock:InvokeModel",
"bedrock:InvokeModelWithResponseStream"
],
"Resource": "*",
"Condition": {
"StringNotEquals": {
"aws:sourceVpce": "vpce-manga-bedrock-01"
}
}
}
]
}
2.2 SCP: Protect Security Infrastructure
{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "DenyCloudTrailModification",
"Effect": "Deny",
"Action": [
"cloudtrail:StopLogging",
"cloudtrail:DeleteTrail",
"cloudtrail:UpdateTrail"
],
"Resource": "*",
"Condition": {
"StringNotLike": {
"aws:PrincipalArn": [
"arn:aws:iam::*:role/manga-security-admin"
]
}
}
},
{
"Sid": "DenyGuardDutyDisable",
"Effect": "Deny",
"Action": [
"guardduty:DeleteDetector",
"guardduty:DisassociateFromMasterAccount",
"guardduty:UpdateDetector"
],
"Resource": "*"
},
{
"Sid": "DenySecurityHubDisable",
"Effect": "Deny",
"Action": [
"securityhub:DisableSecurityHub",
"securityhub:DeleteInsight"
],
"Resource": "*"
}
]
}
2.3 SCP Design Principles for FM Access
| Principle | Implementation | Why It Matters |
|---|---|---|
| Approved models only | Deny invoke on non-whitelisted model IDs | Prevents cost surprise from expensive models |
| Approved regions only | Deny Bedrock actions outside ap-northeast-1 / us-east-1 | Data residency compliance for JP customers |
| VPC endpoint required | Deny invoke without sourceVpce condition | All FM traffic stays private |
| Protect audit trail | Deny CloudTrail/GuardDuty modification | Attackers cannot cover tracks |
| Admin-only customization | Deny model customization except admin role | Fine-tuning is a privileged operation |
3. Permission Boundaries for FM Teams
3.1 What Permission Boundaries Do
Permission boundaries set the maximum permissions that any IAM role within a team can have. Even if someone creates a new role with bedrock:* in the policy, the permission boundary ensures it can never exceed the defined ceiling.
3.2 Developer Team Permission Boundary
{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "AllowBedrockInvokeNonProduction",
"Effect": "Allow",
"Action": [
"bedrock:InvokeModel",
"bedrock:InvokeModelWithResponseStream",
"bedrock:ListFoundationModels",
"bedrock:GetFoundationModel"
],
"Resource": "*",
"Condition": {
"StringNotEquals": {
"aws:ResourceTag/Environment": "production"
}
}
},
{
"Sid": "AllowDynamoDBDevelopment",
"Effect": "Allow",
"Action": [
"dynamodb:GetItem",
"dynamodb:PutItem",
"dynamodb:Query",
"dynamodb:Scan",
"dynamodb:DeleteItem"
],
"Resource": [
"arn:aws:dynamodb:ap-northeast-1:123456789012:table/manga-sessions-dev",
"arn:aws:dynamodb:ap-northeast-1:123456789012:table/manga-sessions-dev/index/*",
"arn:aws:dynamodb:ap-northeast-1:123456789012:table/manga-products-dev",
"arn:aws:dynamodb:ap-northeast-1:123456789012:table/manga-products-dev/index/*"
]
},
{
"Sid": "AllowDynamoDBProductionReadOnly",
"Effect": "Allow",
"Action": [
"dynamodb:GetItem",
"dynamodb:Query"
],
"Resource": [
"arn:aws:dynamodb:ap-northeast-1:123456789012:table/manga-sessions",
"arn:aws:dynamodb:ap-northeast-1:123456789012:table/manga-products"
]
},
{
"Sid": "AllowOpenSearchDevelopment",
"Effect": "Allow",
"Action": [
"aoss:APIAccessAll"
],
"Resource": [
"arn:aws:aoss:ap-northeast-1:123456789012:collection/manga-vectors-dev"
]
},
{
"Sid": "AllowCloudWatchMonitoring",
"Effect": "Allow",
"Action": [
"cloudwatch:GetMetricData",
"cloudwatch:ListMetrics",
"logs:GetLogEvents",
"logs:FilterLogEvents",
"logs:DescribeLogGroups"
],
"Resource": "*"
},
{
"Sid": "DenyProductionModification",
"Effect": "Deny",
"Action": [
"dynamodb:DeleteTable",
"dynamodb:CreateTable",
"dynamodb:UpdateTable",
"aoss:DeleteCollection",
"aoss:CreateCollection"
],
"Resource": "*"
}
]
}
3.3 Operations Team Permission Boundary
{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "AllowBedrockHaikuProductionOnly",
"Effect": "Allow",
"Action": [
"bedrock:InvokeModel",
"bedrock:InvokeModelWithResponseStream"
],
"Resource": [
"arn:aws:bedrock:ap-northeast-1::foundation-model/anthropic.claude-3-haiku-20240307-v1:0"
]
},
{
"Sid": "AllowBedrockReadOnly",
"Effect": "Allow",
"Action": [
"bedrock:ListFoundationModels",
"bedrock:GetFoundationModel",
"bedrock:GetModelInvocationLoggingConfiguration"
],
"Resource": "*"
},
{
"Sid": "AllowFullMonitoring",
"Effect": "Allow",
"Action": [
"cloudwatch:*",
"logs:*",
"xray:*"
],
"Resource": "*"
},
{
"Sid": "AllowECSOperations",
"Effect": "Allow",
"Action": [
"ecs:DescribeServices",
"ecs:DescribeTasks",
"ecs:ListTasks",
"ecs:UpdateService",
"ecs:DescribeClusters"
],
"Resource": [
"arn:aws:ecs:ap-northeast-1:123456789012:cluster/manga-prod",
"arn:aws:ecs:ap-northeast-1:123456789012:service/manga-prod/*",
"arn:aws:ecs:ap-northeast-1:123456789012:task/manga-prod/*"
]
},
{
"Sid": "AllowDataReadOnly",
"Effect": "Allow",
"Action": [
"dynamodb:GetItem",
"dynamodb:Query",
"dynamodb:DescribeTable"
],
"Resource": [
"arn:aws:dynamodb:ap-northeast-1:123456789012:table/manga-*"
]
},
{
"Sid": "DenyModelCustomization",
"Effect": "Deny",
"Action": [
"bedrock:CreateModelCustomizationJob",
"bedrock:DeleteCustomModel",
"bedrock:CreateProvisionedModelThroughput",
"bedrock:DeleteProvisionedModelThroughput"
],
"Resource": "*"
}
]
}
3.4 Permission Boundary Manager
"""
permission_boundary_manager.py
Manages IAM permission boundaries for MangaAssist FM teams.
Ensures that no role within a team can exceed the defined ceiling,
regardless of what policies are attached to it.
"""
import json
import logging
import boto3
from typing import Dict, List, Optional, Any
from dataclasses import dataclass, field
from datetime import datetime, timezone
logger = logging.getLogger(__name__)
@dataclass
class TeamBoundaryConfig:
"""Configuration for a team's permission boundary."""
team_name: str
boundary_policy_name: str
boundary_policy_document: Dict
applicable_role_prefix: str
environment_scope: List[str]
max_session_duration: int = 3600
require_mfa: bool = True
tags: Dict[str, str] = field(default_factory=dict)
class PermissionBoundaryManager:
"""
Creates and manages IAM permission boundaries for MangaAssist teams.
Permission boundaries are the second layer of defense (after SCPs).
They set an absolute ceiling on what any IAM entity within a team
can do, regardless of what policies are attached.
Key behaviors:
- Creates managed policies for each team boundary
- Attaches boundaries to all roles matching the team prefix
- Validates that no role exceeds its boundary
- Reports compliance drift to Security Hub
Teams and their boundaries:
dev-team -> Can invoke Bedrock in staging/dev, no production
ops-team -> Can invoke Haiku in production, full monitoring
admin-team -> Full Bedrock access, model customization
ci-cd -> List/Get models only, deploy ECS services
"""
def __init__(self, region: str = "ap-northeast-1"):
self.iam_client = boto3.client("iam")
self.region = region
self.account_id = boto3.client("sts").get_caller_identity()["Account"]
def create_boundary(self, config: TeamBoundaryConfig) -> str:
"""
Create or update a managed IAM policy for a team's permission boundary.
Returns the policy ARN.
"""
policy_arn = (
f"arn:aws:iam::{self.account_id}:policy/"
f"{config.boundary_policy_name}"
)
try:
# Check if policy already exists
self.iam_client.get_policy(PolicyArn=policy_arn)
# Policy exists: create a new version
policy_doc = json.dumps(config.boundary_policy_document)
self.iam_client.create_policy_version(
PolicyArn=policy_arn,
PolicyDocument=policy_doc,
SetAsDefault=True,
)
logger.info(
f"Updated permission boundary: {config.boundary_policy_name}"
)
except self.iam_client.exceptions.NoSuchEntityException:
# Policy does not exist: create it
policy_doc = json.dumps(config.boundary_policy_document)
response = self.iam_client.create_policy(
PolicyName=config.boundary_policy_name,
PolicyDocument=policy_doc,
Description=(
f"Permission boundary for {config.team_name} team "
f"in MangaAssist"
),
Tags=[
{"Key": "Team", "Value": config.team_name},
{"Key": "Project", "Value": "MangaAssist"},
{"Key": "ManagedBy", "Value": "PermissionBoundaryManager"},
],
)
policy_arn = response["Policy"]["Arn"]
logger.info(
f"Created permission boundary: {config.boundary_policy_name} "
f"({policy_arn})"
)
return policy_arn
def attach_boundary_to_team_roles(
self, config: TeamBoundaryConfig, policy_arn: str
) -> List[str]:
"""
Attach the permission boundary to all IAM roles matching the
team's role prefix.
Returns a list of role names that were updated.
"""
updated_roles = []
paginator = self.iam_client.get_paginator("list_roles")
for page in paginator.paginate():
for role in page["Roles"]:
role_name = role["RoleName"]
if role_name.startswith(config.applicable_role_prefix):
self.iam_client.put_role_permissions_boundary(
RoleName=role_name,
PermissionsBoundary=policy_arn,
)
updated_roles.append(role_name)
logger.info(
f"Attached boundary {config.boundary_policy_name} "
f"to role {role_name}"
)
logger.info(
f"Updated {len(updated_roles)} roles for team "
f"{config.team_name}"
)
return updated_roles
def validate_boundary_compliance(
self, config: TeamBoundaryConfig, policy_arn: str
) -> List[Dict[str, Any]]:
"""
Validate that all roles matching the team prefix have the
correct permission boundary attached.
Returns a list of non-compliant roles.
"""
non_compliant = []
paginator = self.iam_client.get_paginator("list_roles")
for page in paginator.paginate():
for role in page["Roles"]:
role_name = role["RoleName"]
if role_name.startswith(config.applicable_role_prefix):
current_boundary = role.get(
"PermissionsBoundary", {}
).get("PermissionsBoundaryArn", "")
if current_boundary != policy_arn:
non_compliant.append({
"role_name": role_name,
"expected_boundary": policy_arn,
"actual_boundary": current_boundary or "NONE",
"team": config.team_name,
"severity": "HIGH",
})
if non_compliant:
logger.warning(
f"Found {len(non_compliant)} non-compliant roles for "
f"team {config.team_name}"
)
else:
logger.info(
f"All roles for team {config.team_name} are compliant"
)
return non_compliant
def cleanup_old_policy_versions(self, policy_arn: str) -> int:
"""
Remove non-default policy versions to stay within the
AWS limit of 5 versions per managed policy.
"""
versions = self.iam_client.list_policy_versions(
PolicyArn=policy_arn
)["Versions"]
deleted_count = 0
for version in versions:
if not version["IsDefaultVersion"]:
self.iam_client.delete_policy_version(
PolicyArn=policy_arn,
VersionId=version["VersionId"],
)
deleted_count += 1
if deleted_count:
logger.info(
f"Cleaned up {deleted_count} old policy versions for "
f"{policy_arn}"
)
return deleted_count
def generate_compliance_report(
self, team_configs: List[TeamBoundaryConfig]
) -> Dict[str, Any]:
"""
Generate a compliance report across all teams showing boundary
attachment status, non-compliant roles, and remediation actions.
"""
report = {
"generated_at": datetime.now(timezone.utc).isoformat(),
"account_id": self.account_id,
"region": self.region,
"teams": [],
"total_roles_checked": 0,
"total_non_compliant": 0,
}
for config in team_configs:
policy_arn = (
f"arn:aws:iam::{self.account_id}:policy/"
f"{config.boundary_policy_name}"
)
violations = self.validate_boundary_compliance(
config, policy_arn
)
team_entry = {
"team_name": config.team_name,
"boundary_policy": config.boundary_policy_name,
"role_prefix": config.applicable_role_prefix,
"violations": violations,
"compliant": len(violations) == 0,
}
report["teams"].append(team_entry)
report["total_non_compliant"] += len(violations)
return report
4. API Key Rotation and Token-Based Authentication
4.1 Why API Key Rotation Matters
MangaAssist uses API keys for service-to-service authentication (e.g., the ECS orchestrator calling internal microservices). These keys must be rotated regularly to limit the blast radius of a compromise.
Rotation strategy: - API keys are stored in AWS Secrets Manager - Secrets Manager triggers automatic rotation every 30 days - A Lambda function generates the new key, tests it, and promotes it - During rotation, both old and new keys are valid (dual-key window) - The old key is revoked after the new key is confirmed working
4.2 API Key Rotator
"""
api_key_rotator.py
Automated API key rotation for MangaAssist service-to-service authentication.
Integrates with AWS Secrets Manager for secure key lifecycle management.
"""
import json
import logging
import secrets
import string
import boto3
from typing import Dict, Optional, Any
from dataclasses import dataclass
from datetime import datetime, timezone, timedelta
logger = logging.getLogger(__name__)
@dataclass
class APIKeyConfig:
"""Configuration for an API key managed by the rotator."""
secret_name: str
description: str
key_length: int = 64
rotation_days: int = 30
dual_key_window_hours: int = 24
allowed_services: list = None
rate_limit_per_minute: int = 1000
tags: Dict[str, str] = None
def __post_init__(self):
if self.allowed_services is None:
self.allowed_services = []
if self.tags is None:
self.tags = {}
class APIKeyRotator:
"""
Manages the lifecycle of API keys for MangaAssist FM endpoints.
Handles:
- Secure key generation with cryptographic randomness
- Storage in AWS Secrets Manager with encryption
- Automatic rotation via Secrets Manager rotation Lambda
- Dual-key window for zero-downtime rotation
- Revocation and cleanup of expired keys
- Audit logging of all key operations
Rotation flow:
1. Secrets Manager triggers rotation Lambda
2. Lambda calls create_secret (generate new key)
3. Lambda calls set_secret (store in pending state)
4. Lambda calls test_secret (validate new key works)
5. Lambda calls finish_secret (promote new key, mark old as previous)
"""
def __init__(self, region: str = "ap-northeast-1"):
self.secrets_client = boto3.client(
"secretsmanager", region_name=region
)
self.apigw_client = boto3.client("apigatewayv2", region_name=region)
self.region = region
def create_api_key(self, config: APIKeyConfig) -> Dict[str, Any]:
"""
Create a new API key and store it in Secrets Manager.
The key is generated using Python's secrets module for
cryptographic randomness, ensuring it cannot be predicted.
"""
# Generate a cryptographically secure API key
alphabet = string.ascii_letters + string.digits
api_key = "manga_" + "".join(
secrets.choice(alphabet) for _ in range(config.key_length)
)
# Create the secret value
secret_value = {
"api_key": api_key,
"created_at": datetime.now(timezone.utc).isoformat(),
"rotation_days": config.rotation_days,
"allowed_services": config.allowed_services,
"rate_limit_per_minute": config.rate_limit_per_minute,
"version": "v1",
}
# Store in Secrets Manager
try:
response = self.secrets_client.create_secret(
Name=config.secret_name,
Description=config.description,
SecretString=json.dumps(secret_value),
Tags=[
{"Key": k, "Value": v}
for k, v in {
**config.tags,
"Project": "MangaAssist",
"ManagedBy": "APIKeyRotator",
"RotationDays": str(config.rotation_days),
}.items()
],
)
except self.secrets_client.exceptions.ResourceExistsException:
response = self.secrets_client.update_secret(
SecretId=config.secret_name,
SecretString=json.dumps(secret_value),
)
logger.info(f"Created API key secret: {config.secret_name}")
return {
"secret_name": config.secret_name,
"secret_arn": response.get("ARN", ""),
"key_prefix": api_key[:12] + "...", # Log prefix only
}
def enable_automatic_rotation(
self,
secret_name: str,
rotation_lambda_arn: str,
rotation_days: int = 30,
) -> Dict[str, Any]:
"""
Enable automatic rotation for an API key secret.
The rotation Lambda function handles the four-step rotation
process: createSecret, setSecret, testSecret, finishSecret.
"""
response = self.secrets_client.rotate_secret(
SecretId=secret_name,
RotationLambdaARN=rotation_lambda_arn,
RotationRules={
"AutomaticallyAfterDays": rotation_days,
"Duration": "24h", # Dual-key window
"ScheduleExpression": f"rate({rotation_days} days)",
},
)
logger.info(
f"Enabled automatic rotation for {secret_name} "
f"every {rotation_days} days"
)
return {
"secret_name": secret_name,
"rotation_enabled": True,
"rotation_days": rotation_days,
"next_rotation": response.get("VersionId", ""),
}
def rotate_key_now(self, secret_name: str) -> Dict[str, Any]:
"""
Trigger an immediate key rotation outside the normal schedule.
Use this when a key is suspected of being compromised.
"""
# Get the current secret for comparison
current = self.secrets_client.get_secret_value(SecretId=secret_name)
current_value = json.loads(current["SecretString"])
current_version = current.get("VersionId", "unknown")
# Generate a new key
alphabet = string.ascii_letters + string.digits
key_length = len(current_value.get("api_key", "")) - 6 # minus prefix
new_key = "manga_" + "".join(
secrets.choice(alphabet) for _ in range(max(key_length, 64))
)
# Update the secret
new_value = {
**current_value,
"api_key": new_key,
"created_at": datetime.now(timezone.utc).isoformat(),
"previous_version": current_version,
"rotation_reason": "manual_emergency",
"version": f"v{int(current_value.get('version', 'v1')[1:]) + 1}",
}
self.secrets_client.update_secret(
SecretId=secret_name,
SecretString=json.dumps(new_value),
)
logger.warning(
f"Emergency key rotation completed for {secret_name} "
f"(old version: {current_version})"
)
return {
"secret_name": secret_name,
"old_version": current_version,
"new_key_prefix": new_key[:12] + "...",
"rotated_at": datetime.now(timezone.utc).isoformat(),
"reason": "manual_emergency",
}
def validate_key(
self, secret_name: str, provided_key: str
) -> Dict[str, Any]:
"""
Validate a provided API key against the stored secret.
Checks both the current and previous versions during the
dual-key rotation window.
"""
try:
# Check current version
current = self.secrets_client.get_secret_value(
SecretId=secret_name, VersionStage="AWSCURRENT"
)
current_value = json.loads(current["SecretString"])
if secrets.compare_digest(
provided_key, current_value.get("api_key", "")
):
return {
"valid": True,
"version": "current",
"allowed_services": current_value.get(
"allowed_services", []
),
"rate_limit": current_value.get(
"rate_limit_per_minute", 1000
),
}
# Check previous version (dual-key window)
try:
previous = self.secrets_client.get_secret_value(
SecretId=secret_name, VersionStage="AWSPREVIOUS"
)
previous_value = json.loads(previous["SecretString"])
if secrets.compare_digest(
provided_key, previous_value.get("api_key", "")
):
logger.info(
f"Key validated against previous version for "
f"{secret_name} (dual-key window)"
)
return {
"valid": True,
"version": "previous",
"allowed_services": previous_value.get(
"allowed_services", []
),
"rate_limit": previous_value.get(
"rate_limit_per_minute", 1000
),
"warning": "Using previous key version; update to current",
}
except Exception:
pass # No previous version available
return {"valid": False, "reason": "Key does not match any version"}
except self.secrets_client.exceptions.ResourceNotFoundException:
return {"valid": False, "reason": f"Secret {secret_name} not found"}
def list_managed_keys(self) -> List[Dict[str, Any]]:
"""
List all API keys managed by this rotator.
Filters secrets by the ManagedBy tag.
"""
keys = []
paginator = self.secrets_client.get_paginator("list_secrets")
for page in paginator.paginate(
Filters=[
{
"Key": "tag-key",
"Values": ["ManagedBy"],
},
{
"Key": "tag-value",
"Values": ["APIKeyRotator"],
},
]
):
for secret in page.get("SecretList", []):
keys.append({
"name": secret["Name"],
"arn": secret["ARN"],
"last_rotated": (
secret.get("LastRotatedDate", "").isoformat()
if secret.get("LastRotatedDate")
else "never"
),
"rotation_enabled": secret.get(
"RotationEnabled", False
),
"next_rotation": (
secret.get("NextRotationDate", "").isoformat()
if secret.get("NextRotationDate")
else "N/A"
),
})
return keys
5. FM Access Auditor
5.1 Audit Architecture
Every FM access decision in MangaAssist is logged to CloudTrail and CloudWatch Logs, creating an immutable audit trail. The FM Access Auditor queries these logs to detect anomalies, generate compliance reports, and trigger alerts.
flowchart LR
subgraph Sources["Audit Event Sources"]
Bedrock["Bedrock API Calls"]
Controller["FMAccessController Decisions"]
Cognito["Cognito Auth Events"]
APIGW["API Gateway Access Logs"]
end
subgraph Collection["Log Collection"]
CloudTrail["CloudTrail<br/>(Management + Data Events)"]
CWLogs["CloudWatch Logs<br/>(Application Logs)"]
end
subgraph Analysis["Audit Analysis"]
Auditor["FMAccessAuditor<br/>(Query + Analyze)"]
Athena["Athena<br/>(CloudTrail Queries)"]
Insights["CloudWatch Insights<br/>(Log Queries)"]
end
subgraph Output["Audit Outputs"]
Report["Compliance Report"]
Alert["Security Alerts"]
Dashboard["Security Dashboard"]
end
Bedrock --> CloudTrail
Controller --> CWLogs
Cognito --> CWLogs
APIGW --> CWLogs
CloudTrail --> Auditor
CWLogs --> Auditor
Auditor --> Athena
Auditor --> Insights
Auditor --> Report
Auditor --> Alert
Auditor --> Dashboard
style Sources fill:#e3f2fd,stroke:#1565c0,stroke-width:2px
style Collection fill:#fff3e0,stroke:#e65100,stroke-width:2px
style Analysis fill:#e8f5e9,stroke:#2e7d32,stroke-width:2px
style Output fill:#fce4ec,stroke:#b71c1c,stroke-width:2px
5.2 FM Access Auditor Implementation
"""
fm_access_auditor.py
Audits all Foundation Model access in MangaAssist for compliance,
anomaly detection, and security reporting.
"""
import json
import logging
import boto3
from typing import Dict, List, Optional, Any
from dataclasses import dataclass, field
from datetime import datetime, timezone, timedelta
from collections import defaultdict
logger = logging.getLogger(__name__)
@dataclass
class AuditFinding:
"""Represents a security finding from the audit process."""
finding_id: str
severity: str # CRITICAL, HIGH, MEDIUM, LOW, INFO
category: str # access_violation, anomaly, compliance, cost
title: str
description: str
affected_resource: str
recommended_action: str
evidence: Dict[str, Any] = field(default_factory=dict)
timestamp: datetime = None
def __post_init__(self):
if self.timestamp is None:
self.timestamp = datetime.now(timezone.utc)
class FMAccessAuditor:
"""
Audits Foundation Model access patterns in MangaAssist.
Capabilities:
- Query CloudTrail for Bedrock API call history
- Analyze access patterns for anomalies (unusual hours, spikes, etc.)
- Detect permission violations (denied requests, escalation attempts)
- Generate compliance reports for security reviews
- Trigger SNS alerts for critical findings
- Track cost attribution by role and model
Anomaly detection rules:
1. More than 2x average RPM for any role in a 5-minute window
2. Access attempts outside normal business hours (JST 22:00-06:00)
3. More than 5 denied access attempts from a single identity in 1 hour
4. Sudden model tier upgrade (operator trying to invoke Sonnet)
5. Access from unexpected IP ranges
"""
ANOMALY_THRESHOLDS = {
"rpm_multiplier": 2.0,
"off_hours_start": 22, # 10 PM JST
"off_hours_end": 6, # 6 AM JST
"max_denials_per_hour": 5,
"cost_spike_multiplier": 3.0,
}
def __init__(
self,
region: str = "ap-northeast-1",
alert_topic_arn: Optional[str] = None,
):
self.region = region
self.cloudtrail_client = boto3.client(
"cloudtrail", region_name=region
)
self.logs_client = boto3.client("logs", region_name=region)
self.sns_client = boto3.client("sns", region_name=region)
self.cloudwatch = boto3.client("cloudwatch", region_name=region)
self.alert_topic_arn = alert_topic_arn
def audit_bedrock_access(
self, hours: int = 24
) -> List[AuditFinding]:
"""
Audit all Bedrock API calls in the specified time window.
Queries CloudTrail for InvokeModel events and analyzes them
for violations and anomalies.
"""
findings = []
start_time = datetime.now(timezone.utc) - timedelta(hours=hours)
events = self._query_cloudtrail_events(
event_source="bedrock.amazonaws.com",
start_time=start_time,
)
# Group events by identity for pattern analysis
events_by_identity = defaultdict(list)
events_by_model = defaultdict(list)
denied_events = []
for event in events:
identity = event.get("userIdentity", {}).get("arn", "unknown")
model = self._extract_model_from_event(event)
events_by_identity[identity].append(event)
events_by_model[model].append(event)
if event.get("errorCode"):
denied_events.append(event)
# Check 1: Excessive denied requests
denial_findings = self._check_denial_patterns(denied_events)
findings.extend(denial_findings)
# Check 2: Off-hours access
off_hours_findings = self._check_off_hours_access(events)
findings.extend(off_hours_findings)
# Check 3: Rate anomalies
rate_findings = self._check_rate_anomalies(events_by_identity)
findings.extend(rate_findings)
# Check 4: Unauthorized model access attempts
model_findings = self._check_model_access_violations(events)
findings.extend(model_findings)
# Alert on critical findings
critical_findings = [
f for f in findings if f.severity in ("CRITICAL", "HIGH")
]
if critical_findings and self.alert_topic_arn:
self._send_alert(critical_findings)
logger.info(
f"Audit complete: {len(events)} events, "
f"{len(findings)} findings "
f"({len(critical_findings)} critical/high)"
)
return findings
def _check_denial_patterns(
self, denied_events: List[Dict]
) -> List[AuditFinding]:
"""
Detect identities with excessive access denials,
which may indicate credential compromise or escalation attempts.
"""
findings = []
denials_by_identity = defaultdict(list)
for event in denied_events:
identity = event.get("userIdentity", {}).get("arn", "unknown")
denials_by_identity[identity].append(event)
threshold = self.ANOMALY_THRESHOLDS["max_denials_per_hour"]
for identity, events in denials_by_identity.items():
if len(events) >= threshold:
findings.append(AuditFinding(
finding_id=f"DENY-{hash(identity) % 100000:05d}",
severity="HIGH",
category="access_violation",
title="Excessive Bedrock Access Denials",
description=(
f"Identity '{identity}' had {len(events)} denied "
f"Bedrock access attempts (threshold: {threshold}). "
f"This may indicate a compromised credential or "
f"privilege escalation attempt."
),
affected_resource=identity,
recommended_action=(
"Review the identity's recent activity. If the "
"denials are unexpected, rotate credentials and "
"investigate the source IP addresses."
),
evidence={
"denial_count": len(events),
"threshold": threshold,
"error_codes": list(set(
e.get("errorCode", "") for e in events
)),
"source_ips": list(set(
e.get("sourceIPAddress", "") for e in events
)),
},
))
return findings
def _check_off_hours_access(
self, events: List[Dict]
) -> List[AuditFinding]:
"""
Detect Bedrock access outside normal business hours (JST).
Off-hours access may indicate automated abuse or compromised creds.
"""
findings = []
off_hours_events = []
for event in events:
event_time = event.get("eventTime", "")
if event_time:
try:
dt = datetime.fromisoformat(
event_time.replace("Z", "+00:00")
)
jst_hour = (dt.hour + 9) % 24 # UTC to JST
start = self.ANOMALY_THRESHOLDS["off_hours_start"]
end = self.ANOMALY_THRESHOLDS["off_hours_end"]
if jst_hour >= start or jst_hour < end:
off_hours_events.append(event)
except (ValueError, TypeError):
pass
if len(off_hours_events) > 10:
identities = set(
e.get("userIdentity", {}).get("arn", "unknown")
for e in off_hours_events
)
findings.append(AuditFinding(
finding_id=f"OFFHRS-{len(off_hours_events):05d}",
severity="MEDIUM",
category="anomaly",
title="Significant Off-Hours Bedrock Access",
description=(
f"{len(off_hours_events)} Bedrock API calls detected "
f"outside business hours (JST 22:00-06:00) from "
f"{len(identities)} unique identities."
),
affected_resource="bedrock.amazonaws.com",
recommended_action=(
"Verify these calls are from legitimate automated "
"processes. If unexpected, investigate the source "
"identities and restrict off-hours access."
),
evidence={
"off_hours_count": len(off_hours_events),
"unique_identities": list(identities)[:10],
},
))
return findings
def _check_rate_anomalies(
self, events_by_identity: Dict[str, List]
) -> List[AuditFinding]:
"""
Detect identities with request rates significantly above baseline.
"""
findings = []
multiplier = self.ANOMALY_THRESHOLDS["rpm_multiplier"]
# Calculate average rate across all identities
all_counts = [len(evts) for evts in events_by_identity.values()]
if not all_counts:
return findings
avg_count = sum(all_counts) / len(all_counts)
for identity, events in events_by_identity.items():
if len(events) > avg_count * multiplier and len(events) > 50:
findings.append(AuditFinding(
finding_id=f"RATE-{hash(identity) % 100000:05d}",
severity="MEDIUM",
category="anomaly",
title="Abnormal Bedrock Request Rate",
description=(
f"Identity '{identity}' made {len(events)} "
f"requests ({multiplier}x above average of "
f"{avg_count:.0f})."
),
affected_resource=identity,
recommended_action=(
"Review whether this rate is expected. Consider "
"adding or tightening rate limits for this identity."
),
evidence={
"request_count": len(events),
"average_count": round(avg_count),
"multiplier": multiplier,
},
))
return findings
def _check_model_access_violations(
self, events: List[Dict]
) -> List[AuditFinding]:
"""
Detect attempts to access unauthorized models (e.g., an operator
trying to invoke Sonnet).
"""
findings = []
sonnet_model_id = "anthropic.claude-3-sonnet"
for event in events:
if event.get("errorCode") == "AccessDeniedException":
model = self._extract_model_from_event(event)
if sonnet_model_id in model:
identity = event.get("userIdentity", {}).get(
"arn", "unknown"
)
findings.append(AuditFinding(
finding_id=f"MODEL-{hash(identity) % 100000:05d}",
severity="HIGH",
category="access_violation",
title="Unauthorized Premium Model Access Attempt",
description=(
f"Identity '{identity}' attempted to invoke "
f"premium model '{model}' but was denied. "
f"This role does not have permission for "
f"premium-tier models."
),
affected_resource=model,
recommended_action=(
"Verify this was not an intentional escalation "
"attempt. If the identity legitimately needs "
"Sonnet access, submit an access request through "
"the standard approval process."
),
evidence={
"model_id": model,
"identity": identity,
"error_code": event.get("errorCode", ""),
"source_ip": event.get("sourceIPAddress", ""),
"event_time": event.get("eventTime", ""),
},
))
return findings
def _query_cloudtrail_events(
self,
event_source: str,
start_time: datetime,
) -> List[Dict]:
"""Query CloudTrail for events from a specific service."""
events = []
try:
paginator = self.cloudtrail_client.get_paginator("lookup_events")
for page in paginator.paginate(
LookupAttributes=[
{
"AttributeKey": "EventSource",
"AttributeValue": event_source,
}
],
StartTime=start_time,
EndTime=datetime.now(timezone.utc),
):
for event in page.get("Events", []):
try:
cloud_trail_event = json.loads(
event.get("CloudTrailEvent", "{}")
)
events.append(cloud_trail_event)
except json.JSONDecodeError:
continue
except Exception as e:
logger.error(f"Failed to query CloudTrail: {e}")
return events
def _extract_model_from_event(self, event: Dict) -> str:
"""Extract the model ID from a Bedrock CloudTrail event."""
resources = event.get("resources", [])
for resource in resources:
arn = resource.get("ARN", "")
if "foundation-model" in arn:
return arn.split("/")[-1]
request_params = event.get("requestParameters", {})
return request_params.get("modelId", "unknown")
def _send_alert(self, findings: List[AuditFinding]) -> None:
"""Send SNS alert for critical/high severity findings."""
if not self.alert_topic_arn:
return
message = {
"source": "MangaAssist-FMAccessAuditor",
"timestamp": datetime.now(timezone.utc).isoformat(),
"finding_count": len(findings),
"findings": [
{
"id": f.finding_id,
"severity": f.severity,
"title": f.title,
"description": f.description,
"action": f.recommended_action,
}
for f in findings
],
}
try:
self.sns_client.publish(
TopicArn=self.alert_topic_arn,
Subject=f"MangaAssist Security Alert: {len(findings)} findings",
Message=json.dumps(message, indent=2),
)
logger.info(
f"Sent alert for {len(findings)} critical/high findings"
)
except Exception as e:
logger.error(f"Failed to send SNS alert: {e}")
def generate_compliance_report(
self, hours: int = 168
) -> Dict[str, Any]:
"""
Generate a weekly compliance report summarizing all FM access.
Includes:
- Total API calls by model and role
- Access denial summary
- Anomaly detections
- Cost attribution
- Recommendations
"""
findings = self.audit_bedrock_access(hours=hours)
report = {
"report_type": "weekly_fm_access_compliance",
"generated_at": datetime.now(timezone.utc).isoformat(),
"period_hours": hours,
"findings_summary": {
"total": len(findings),
"critical": len(
[f for f in findings if f.severity == "CRITICAL"]
),
"high": len(
[f for f in findings if f.severity == "HIGH"]
),
"medium": len(
[f for f in findings if f.severity == "MEDIUM"]
),
"low": len(
[f for f in findings if f.severity == "LOW"]
),
},
"findings": [
{
"id": f.finding_id,
"severity": f.severity,
"category": f.category,
"title": f.title,
"description": f.description,
"action": f.recommended_action,
}
for f in findings
],
"compliance_status": (
"COMPLIANT" if not any(
f.severity in ("CRITICAL", "HIGH") for f in findings
)
else "NON_COMPLIANT"
),
}
return report
6. Token-Based Authentication Patterns
6.1 JWT Token Lifecycle for FM Access
stateDiagram-v2
[*] --> Unauthenticated: User opens app
Unauthenticated --> Authenticating: Login initiated
Authenticating --> TokenIssued: Cognito returns tokens
TokenIssued --> Active: JWT validated by Lambda authorizer
Active --> Active: API calls with valid token
Active --> Refreshing: Token nearing expiry (< 5 min)
Refreshing --> Active: Refresh token exchange succeeds
Refreshing --> Expired: Refresh token also expired
Active --> Expired: Token expires (60 min)
Expired --> Unauthenticated: Must re-authenticate
Active --> Revoked: Admin revokes session
Revoked --> Unauthenticated: Must re-authenticate
6.2 Token Validation in Lambda Authorizer
"""
lambda_authorizer.py
WebSocket API Gateway Lambda authorizer for MangaAssist.
Validates Cognito JWT tokens and generates IAM policies.
"""
import json
import logging
import time
import urllib.request
from typing import Dict, Any, Optional
logger = logging.getLogger(__name__)
# Configuration
USER_POOL_ID = "ap-northeast-1_XXXXXXXXX"
REGION = "ap-northeast-1"
APP_CLIENT_ID = "xxxxxxxxxxxxxxxxxxxxxxxxxx"
# JWKS cache
_jwks_cache = None
_jwks_cache_time = 0
JWKS_CACHE_TTL = 3600 # 1 hour
def handler(event: Dict[str, Any], context: Any) -> Dict[str, Any]:
"""
Lambda authorizer for WebSocket API Gateway.
Validates the JWT token from the Authorization header and
returns an IAM policy that allows or denies the WebSocket
connection based on the user's role.
"""
token = _extract_token(event)
if not token:
logger.warning("No token found in request")
raise Exception("Unauthorized")
try:
claims = _validate_jwt(token)
except Exception as e:
logger.warning(f"Token validation failed: {e}")
raise Exception("Unauthorized")
# Extract role and build policy
role = claims.get("custom:role", "readonly")
user_id = claims.get("sub", "unknown")
method_arn = event.get("methodArn", "")
policy = _build_policy(user_id, role, method_arn)
# Add context for downstream services
policy["context"] = {
"user_id": user_id,
"role": role,
"email": claims.get("email", ""),
"department": claims.get("custom:department", ""),
"model_access_tier": claims.get("custom:model_access_tier", "none"),
"mfa_verified": str("mfa" in claims.get("amr", [])),
}
logger.info(
f"Authorized: user={user_id}, role={role}"
)
return policy
def _extract_token(event: Dict[str, Any]) -> Optional[str]:
"""Extract the Bearer token from request headers."""
headers = event.get("headers", {})
auth_header = headers.get("Authorization", headers.get("authorization", ""))
if auth_header.startswith("Bearer "):
return auth_header[7:]
return auth_header if auth_header else None
def _validate_jwt(token: str) -> Dict[str, Any]:
"""
Validate a Cognito JWT token.
Checks: signature, expiration, issuer, and token_use.
"""
import jwt as pyjwt
from jwt.algorithms import RSAAlgorithm
header = pyjwt.get_unverified_header(token)
kid = header.get("kid")
signing_key = _get_signing_key(kid)
issuer = f"https://cognito-idp.{REGION}.amazonaws.com/{USER_POOL_ID}"
claims = pyjwt.decode(
token,
signing_key,
algorithms=["RS256"],
issuer=issuer,
options={
"verify_exp": True,
"verify_iss": True,
"verify_aud": False,
},
)
# Verify token_use is "id" (not "access")
if claims.get("token_use") != "id":
raise ValueError("Expected id token, got access token")
return claims
def _get_signing_key(kid: str):
"""Retrieve signing key from Cognito JWKS with caching."""
global _jwks_cache, _jwks_cache_time
from jwt.algorithms import RSAAlgorithm
if _jwks_cache is None or time.time() - _jwks_cache_time > JWKS_CACHE_TTL:
jwks_url = (
f"https://cognito-idp.{REGION}.amazonaws.com/"
f"{USER_POOL_ID}/.well-known/jwks.json"
)
with urllib.request.urlopen(jwks_url) as resp:
_jwks_cache = json.loads(resp.read())
_jwks_cache_time = time.time()
for key in _jwks_cache.get("keys", []):
if key["kid"] == kid:
return RSAAlgorithm.from_jwk(json.dumps(key))
raise ValueError(f"Key {kid} not found in JWKS")
def _build_policy(
principal_id: str, role: str, method_arn: str
) -> Dict[str, Any]:
"""
Build an IAM policy document for the API Gateway authorizer.
Different roles get different route access:
- admin: all routes
- developer: chat, search, admin-dashboard
- operator: chat, health, metrics
- readonly: health, metrics only
"""
arn_parts = method_arn.split(":")
region = arn_parts[3]
account_id = arn_parts[4]
api_gateway_arn = arn_parts[5].split("/")
api_id = api_gateway_arn[0]
stage = api_gateway_arn[1]
base_arn = f"arn:aws:execute-api:{region}:{account_id}:{api_id}/{stage}"
route_permissions = {
"admin": [
f"{base_arn}/*",
],
"developer": [
f"{base_arn}/chat/*",
f"{base_arn}/search/*",
f"{base_arn}/admin-dashboard/*",
],
"operator": [
f"{base_arn}/chat/*",
f"{base_arn}/health/*",
f"{base_arn}/metrics/*",
],
"readonly": [
f"{base_arn}/health/*",
f"{base_arn}/metrics/*",
],
}
allowed_resources = route_permissions.get(role, route_permissions["readonly"])
return {
"principalId": principal_id,
"policyDocument": {
"Version": "2012-10-17",
"Statement": [
{
"Action": "execute-api:Invoke",
"Effect": "Allow",
"Resource": allowed_resources,
},
],
},
}
7. Cross-Account FM Access Patterns
7.1 When Cross-Account Access is Needed
MangaAssist may use separate AWS accounts for different environments (dev, staging, production) or for separating the ML platform team from the application team. Cross-account access to Bedrock must be carefully controlled.
7.2 Cross-Account Role Assumption
{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "AllowCrossAccountBedrockAccess",
"Effect": "Allow",
"Action": "sts:AssumeRole",
"Resource": [
"arn:aws:iam::222233334444:role/manga-bedrock-cross-account"
],
"Condition": {
"StringEquals": {
"sts:ExternalId": "MangaAssist-Prod-2024",
"aws:PrincipalTag/Team": "manga-platform"
},
"IpAddress": {
"aws:SourceIp": "10.0.0.0/8"
}
}
}
]
}
7.3 Trust Policy on the Target Account Role
{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "TrustMangaAssistProdAccount",
"Effect": "Allow",
"Principal": {
"AWS": "arn:aws:iam::111122223333:root"
},
"Action": "sts:AssumeRole",
"Condition": {
"StringEquals": {
"sts:ExternalId": "MangaAssist-Prod-2024"
},
"Bool": {
"aws:MultiFactorAuthPresent": "true"
}
}
}
]
}
8. IAM Access Analyzer Integration
8.1 Automated Policy Validation
IAM Access Analyzer can validate IAM policies before they are deployed, catching overly permissive policies before they reach production.
"""
iam_policy_validator.py
Validates IAM policies using IAM Access Analyzer before deployment.
"""
import json
import logging
import boto3
from typing import Dict, List, Any
logger = logging.getLogger(__name__)
class IAMPolicyValidator:
"""
Validates IAM policies for MangaAssist using IAM Access Analyzer.
Checks for:
- Overly permissive actions (e.g., bedrock:*)
- Missing resource constraints (Resource: *)
- Missing condition keys (no VPC endpoint restriction)
- Security anti-patterns (allowing PassRole without constraints)
"""
BEDROCK_RESTRICTED_ACTIONS = [
"bedrock:CreateModelCustomizationJob",
"bedrock:DeleteCustomModel",
"bedrock:CreateProvisionedModelThroughput",
"bedrock:DeleteProvisionedModelThroughput",
]
def __init__(self, region: str = "ap-northeast-1"):
self.access_analyzer = boto3.client(
"accessanalyzer", region_name=region
)
self.iam_client = boto3.client("iam")
def validate_policy(
self, policy_document: Dict
) -> Dict[str, Any]:
"""
Validate an IAM policy document using Access Analyzer.
Returns findings with severity and recommendations.
"""
response = self.access_analyzer.validate_policy(
policyDocument=json.dumps(policy_document),
policyType="IDENTITY_POLICY",
)
findings = response.get("findings", [])
result = {
"valid": all(
f["findingType"] != "ERROR" for f in findings
),
"errors": [
f for f in findings if f["findingType"] == "ERROR"
],
"warnings": [
f for f in findings if f["findingType"] == "WARNING"
],
"suggestions": [
f for f in findings if f["findingType"] == "SUGGESTION"
],
}
# Custom MangaAssist checks
custom_findings = self._check_manga_specific_rules(policy_document)
result["custom_findings"] = custom_findings
if custom_findings:
result["valid"] = False
return result
def _check_manga_specific_rules(
self, policy_document: Dict
) -> List[Dict[str, str]]:
"""
Apply MangaAssist-specific policy validation rules.
These go beyond standard IAM analysis to enforce the project's
security requirements.
"""
findings = []
statements = policy_document.get("Statement", [])
for stmt in statements:
actions = stmt.get("Action", [])
if isinstance(actions, str):
actions = [actions]
resources = stmt.get("Resource", [])
if isinstance(resources, str):
resources = [resources]
conditions = stmt.get("Condition", {})
# Check 1: No wildcard Bedrock actions
if stmt.get("Effect") == "Allow":
for action in actions:
if action == "bedrock:*":
findings.append({
"rule": "no_wildcard_bedrock",
"severity": "HIGH",
"message": (
"Wildcard bedrock:* action found. Use "
"specific actions like bedrock:InvokeModel."
),
})
# Check 2: InvokeModel must have resource constraint
if stmt.get("Effect") == "Allow":
invoke_actions = [
a for a in actions
if "InvokeModel" in a
]
if invoke_actions and "*" in resources:
findings.append({
"rule": "invoke_needs_resource",
"severity": "HIGH",
"message": (
"InvokeModel with Resource: * allows access to "
"all models. Specify model ARNs explicitly."
),
})
# Check 3: InvokeModel should require VPC endpoint
if stmt.get("Effect") == "Allow":
invoke_actions = [
a for a in actions
if "InvokeModel" in a
]
if invoke_actions and not conditions.get(
"StringEquals", {}
).get("aws:sourceVpce"):
findings.append({
"rule": "invoke_needs_vpce",
"severity": "MEDIUM",
"message": (
"InvokeModel without VPC endpoint condition. "
"Add aws:sourceVpce condition for network "
"security."
),
})
# Check 4: Restricted actions should only be in deny statements
if stmt.get("Effect") == "Allow":
for action in actions:
if action in self.BEDROCK_RESTRICTED_ACTIONS:
findings.append({
"rule": "restricted_action_in_allow",
"severity": "CRITICAL",
"message": (
f"Restricted action '{action}' found in "
f"Allow statement. This should only appear "
f"in admin policies with strict conditions."
),
})
return findings
9. Best Practices Summary
9.1 RBAC Best Practices for FM Access
| Practice | Implementation | Benefit |
|---|---|---|
| Principle of least privilege | Each role can only invoke the minimum models needed | Prevents cost overrun and data exposure |
| Environment separation | Developers cannot invoke Sonnet in production | Staging mistakes do not hit production costs |
| Permission boundaries | Teams cannot exceed their ceiling even with new roles | Prevents privilege escalation via new role creation |
| SCPs at org level | Only approved models and regions allowed | Organization-wide guardrail that individual accounts cannot override |
| Time-limited credentials | 1-hour tokens, 7-day refresh | Limits blast radius of credential theft |
| Automated rotation | 30-day API key rotation via Secrets Manager | Reduces window of exposure for compromised keys |
| Continuous audit | FMAccessAuditor queries CloudTrail hourly | Detects anomalies before they become incidents |
| Policy validation | IAM Access Analyzer checks before deployment | Catches misconfigurations in CI/CD pipeline |
9.2 Common Anti-Patterns to Avoid
| Anti-Pattern | Risk | Correct Approach |
|---|---|---|
bedrock:* in any Allow statement |
Allows all Bedrock actions including model deletion | Specify exact actions: bedrock:InvokeModel |
Resource: * for InvokeModel |
Allows invoking any model, including expensive ones | Specify model ARNs explicitly |
| Long-lived IAM access keys | Keys can be exfiltrated and used indefinitely | Use temporary credentials via STS/Cognito |
| Same IAM role for all environments | Dev mistakes affect production | Separate roles per environment with tag conditions |
| No rate limiting on FM calls | Runaway automation can consume entire budget | Application-layer rate limits + WAF rate rules |
| Shared API keys across services | Cannot attribute costs or detect specific compromises | Per-service keys with Secrets Manager rotation |
Key Takeaways
-
Four layers of access control (SCPs, permission boundaries, IAM policies, application RBAC) ensure that no single misconfiguration grants unauthorized FM access.
-
Permission boundaries are the most underused but most powerful tool: they set an absolute ceiling that cannot be exceeded even by creating new roles with broader policies.
-
API key rotation through Secrets Manager with a dual-key window enables zero-downtime credential rotation, critical for a 24/7 chatbot serving 1M messages/day.
-
The FM Access Auditor provides continuous compliance monitoring, detecting anomalies like off-hours access, rate spikes, and unauthorized model access attempts before they become incidents.
-
Policy validation in CI/CD using IAM Access Analyzer catches overly permissive policies before they reach production, shifting security left in the development lifecycle.