LOCAL PREVIEW View on GitHub

RBAC and Least Privilege Patterns for FM Access

MangaAssist context: JP Manga store chatbot on AWS — Bedrock Claude 3 (Sonnet at $3/$15 per 1M tokens input/output, Haiku at $0.25/$1.25), OpenSearch Serverless (vector store), DynamoDB (sessions/products), ECS Fargate (orchestrator), API Gateway WebSocket, ElastiCache Redis. Target: useful answer in under 3 seconds, 1M messages/day scale.


Skill Mapping

Attribute Value
Certification AWS Certified AI Practitioner (AIP-C01)
Domain 2 — Implementation and Integration of Foundation Models
Task 2.3 — Describe methods to integrate FM-powered applications into enterprise systems
Skill 2.3.3 — Create secure access frameworks to ensure appropriate security controls (identity federation between FM services and enterprise systems, role-based access control for model and data access, least privilege API access to FMs)

1. Fine-Grained Access Control Architecture

1.1 Access Control Layers

MangaAssist implements access control at four distinct layers, each enforced independently to provide defense-in-depth:

Layer 1: Organization Level     — SCPs restrict entire accounts
Layer 2: Account Level          — Permission boundaries limit teams
Layer 3: Role Level             — IAM policies scope individual actions
Layer 4: Request Level          — Application-layer RBAC per API call
flowchart TB
    subgraph OrgLayer["Layer 1: Organization (SCPs)"]
        SCP["Service Control Policies<br/>Block unapproved models org-wide"]
    end

    subgraph AccountLayer["Layer 2: Account (Permission Boundaries)"]
        PB_Dev["Dev Team Boundary<br/>No production Bedrock access"]
        PB_Ops["Ops Team Boundary<br/>No model customization"]
        PB_Admin["Admin Team Boundary<br/>Full Bedrock scope"]
    end

    subgraph RoleLayer["Layer 3: Role (IAM Policies)"]
        Role_ECS["ECS Task Role<br/>InvokeModel Haiku only"]
        Role_Dev["Developer Role<br/>InvokeModel Sonnet + Haiku (staging)"]
        Role_CI["CI/CD Role<br/>ListModels + GetModel only"]
    end

    subgraph AppLayer["Layer 4: Application (RBAC Engine)"]
        RBAC["FMAccessController<br/>Per-request token limits, rate limits, cost guards"]
    end

    SCP --> PB_Dev
    SCP --> PB_Ops
    SCP --> PB_Admin
    PB_Dev --> Role_Dev
    PB_Ops --> Role_ECS
    PB_Admin --> Role_ECS
    PB_Admin --> Role_Dev
    PB_Admin --> Role_CI
    Role_ECS --> RBAC
    Role_Dev --> RBAC
    Role_CI --> RBAC

    style OrgLayer fill:#ffebee,stroke:#c62828,stroke-width:2px
    style AccountLayer fill:#fff3e0,stroke:#e65100,stroke-width:2px
    style RoleLayer fill:#e8f5e9,stroke:#2e7d32,stroke-width:2px
    style AppLayer fill:#e3f2fd,stroke:#1565c0,stroke-width:2px

1.2 Why Four Layers?

A single layer of access control is brittle. If the IAM policy has a misconfiguration (e.g., a wildcard resource), the SCP still blocks unauthorized models. If the SCP is too broad, the permission boundary still constrains the team. If both fail, the application-layer RBAC engine catches the violation and logs it.

At MangaAssist's scale of 1M messages/day, a single misconfigured wildcard could expose the Sonnet endpoint (at $3/$15 per 1M tokens) to all traffic, costing an additional ~$4,125/day. Four layers ensure this never happens.


2. Service Control Policies (SCPs)

2.1 SCP: Restrict Bedrock Models Organization-Wide

This SCP ensures that across the entire AWS Organization, only approved Bedrock models can be invoked. Any new model must be explicitly added here before any account can use it.

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Sid": "DenyUnapprovedBedrockModels",
            "Effect": "Deny",
            "Action": [
                "bedrock:InvokeModel",
                "bedrock:InvokeModelWithResponseStream"
            ],
            "Resource": "*",
            "Condition": {
                "ForAnyValue:StringNotEquals": {
                    "bedrock:FoundationModelId": [
                        "anthropic.claude-3-sonnet-20240229-v1:0",
                        "anthropic.claude-3-haiku-20240307-v1:0"
                    ]
                }
            }
        },
        {
            "Sid": "DenyBedrockModelCustomization",
            "Effect": "Deny",
            "Action": [
                "bedrock:CreateModelCustomizationJob",
                "bedrock:CreateProvisionedModelThroughput",
                "bedrock:DeleteProvisionedModelThroughput"
            ],
            "Resource": "*",
            "Condition": {
                "StringNotLike": {
                    "aws:PrincipalArn": [
                        "arn:aws:iam::*:role/manga-admin-role"
                    ]
                }
            }
        },
        {
            "Sid": "DenyBedrockOutsideApprovedRegions",
            "Effect": "Deny",
            "Action": "bedrock:*",
            "Resource": "*",
            "Condition": {
                "StringNotEquals": {
                    "aws:RequestedRegion": [
                        "ap-northeast-1",
                        "us-east-1"
                    ]
                }
            }
        },
        {
            "Sid": "RequireBedrockVPCEndpoint",
            "Effect": "Deny",
            "Action": [
                "bedrock:InvokeModel",
                "bedrock:InvokeModelWithResponseStream"
            ],
            "Resource": "*",
            "Condition": {
                "StringNotEquals": {
                    "aws:sourceVpce": "vpce-manga-bedrock-01"
                }
            }
        }
    ]
}

2.2 SCP: Protect Security Infrastructure

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Sid": "DenyCloudTrailModification",
            "Effect": "Deny",
            "Action": [
                "cloudtrail:StopLogging",
                "cloudtrail:DeleteTrail",
                "cloudtrail:UpdateTrail"
            ],
            "Resource": "*",
            "Condition": {
                "StringNotLike": {
                    "aws:PrincipalArn": [
                        "arn:aws:iam::*:role/manga-security-admin"
                    ]
                }
            }
        },
        {
            "Sid": "DenyGuardDutyDisable",
            "Effect": "Deny",
            "Action": [
                "guardduty:DeleteDetector",
                "guardduty:DisassociateFromMasterAccount",
                "guardduty:UpdateDetector"
            ],
            "Resource": "*"
        },
        {
            "Sid": "DenySecurityHubDisable",
            "Effect": "Deny",
            "Action": [
                "securityhub:DisableSecurityHub",
                "securityhub:DeleteInsight"
            ],
            "Resource": "*"
        }
    ]
}

2.3 SCP Design Principles for FM Access

Principle Implementation Why It Matters
Approved models only Deny invoke on non-whitelisted model IDs Prevents cost surprise from expensive models
Approved regions only Deny Bedrock actions outside ap-northeast-1 / us-east-1 Data residency compliance for JP customers
VPC endpoint required Deny invoke without sourceVpce condition All FM traffic stays private
Protect audit trail Deny CloudTrail/GuardDuty modification Attackers cannot cover tracks
Admin-only customization Deny model customization except admin role Fine-tuning is a privileged operation

3. Permission Boundaries for FM Teams

3.1 What Permission Boundaries Do

Permission boundaries set the maximum permissions that any IAM role within a team can have. Even if someone creates a new role with bedrock:* in the policy, the permission boundary ensures it can never exceed the defined ceiling.

3.2 Developer Team Permission Boundary

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Sid": "AllowBedrockInvokeNonProduction",
            "Effect": "Allow",
            "Action": [
                "bedrock:InvokeModel",
                "bedrock:InvokeModelWithResponseStream",
                "bedrock:ListFoundationModels",
                "bedrock:GetFoundationModel"
            ],
            "Resource": "*",
            "Condition": {
                "StringNotEquals": {
                    "aws:ResourceTag/Environment": "production"
                }
            }
        },
        {
            "Sid": "AllowDynamoDBDevelopment",
            "Effect": "Allow",
            "Action": [
                "dynamodb:GetItem",
                "dynamodb:PutItem",
                "dynamodb:Query",
                "dynamodb:Scan",
                "dynamodb:DeleteItem"
            ],
            "Resource": [
                "arn:aws:dynamodb:ap-northeast-1:123456789012:table/manga-sessions-dev",
                "arn:aws:dynamodb:ap-northeast-1:123456789012:table/manga-sessions-dev/index/*",
                "arn:aws:dynamodb:ap-northeast-1:123456789012:table/manga-products-dev",
                "arn:aws:dynamodb:ap-northeast-1:123456789012:table/manga-products-dev/index/*"
            ]
        },
        {
            "Sid": "AllowDynamoDBProductionReadOnly",
            "Effect": "Allow",
            "Action": [
                "dynamodb:GetItem",
                "dynamodb:Query"
            ],
            "Resource": [
                "arn:aws:dynamodb:ap-northeast-1:123456789012:table/manga-sessions",
                "arn:aws:dynamodb:ap-northeast-1:123456789012:table/manga-products"
            ]
        },
        {
            "Sid": "AllowOpenSearchDevelopment",
            "Effect": "Allow",
            "Action": [
                "aoss:APIAccessAll"
            ],
            "Resource": [
                "arn:aws:aoss:ap-northeast-1:123456789012:collection/manga-vectors-dev"
            ]
        },
        {
            "Sid": "AllowCloudWatchMonitoring",
            "Effect": "Allow",
            "Action": [
                "cloudwatch:GetMetricData",
                "cloudwatch:ListMetrics",
                "logs:GetLogEvents",
                "logs:FilterLogEvents",
                "logs:DescribeLogGroups"
            ],
            "Resource": "*"
        },
        {
            "Sid": "DenyProductionModification",
            "Effect": "Deny",
            "Action": [
                "dynamodb:DeleteTable",
                "dynamodb:CreateTable",
                "dynamodb:UpdateTable",
                "aoss:DeleteCollection",
                "aoss:CreateCollection"
            ],
            "Resource": "*"
        }
    ]
}

3.3 Operations Team Permission Boundary

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Sid": "AllowBedrockHaikuProductionOnly",
            "Effect": "Allow",
            "Action": [
                "bedrock:InvokeModel",
                "bedrock:InvokeModelWithResponseStream"
            ],
            "Resource": [
                "arn:aws:bedrock:ap-northeast-1::foundation-model/anthropic.claude-3-haiku-20240307-v1:0"
            ]
        },
        {
            "Sid": "AllowBedrockReadOnly",
            "Effect": "Allow",
            "Action": [
                "bedrock:ListFoundationModels",
                "bedrock:GetFoundationModel",
                "bedrock:GetModelInvocationLoggingConfiguration"
            ],
            "Resource": "*"
        },
        {
            "Sid": "AllowFullMonitoring",
            "Effect": "Allow",
            "Action": [
                "cloudwatch:*",
                "logs:*",
                "xray:*"
            ],
            "Resource": "*"
        },
        {
            "Sid": "AllowECSOperations",
            "Effect": "Allow",
            "Action": [
                "ecs:DescribeServices",
                "ecs:DescribeTasks",
                "ecs:ListTasks",
                "ecs:UpdateService",
                "ecs:DescribeClusters"
            ],
            "Resource": [
                "arn:aws:ecs:ap-northeast-1:123456789012:cluster/manga-prod",
                "arn:aws:ecs:ap-northeast-1:123456789012:service/manga-prod/*",
                "arn:aws:ecs:ap-northeast-1:123456789012:task/manga-prod/*"
            ]
        },
        {
            "Sid": "AllowDataReadOnly",
            "Effect": "Allow",
            "Action": [
                "dynamodb:GetItem",
                "dynamodb:Query",
                "dynamodb:DescribeTable"
            ],
            "Resource": [
                "arn:aws:dynamodb:ap-northeast-1:123456789012:table/manga-*"
            ]
        },
        {
            "Sid": "DenyModelCustomization",
            "Effect": "Deny",
            "Action": [
                "bedrock:CreateModelCustomizationJob",
                "bedrock:DeleteCustomModel",
                "bedrock:CreateProvisionedModelThroughput",
                "bedrock:DeleteProvisionedModelThroughput"
            ],
            "Resource": "*"
        }
    ]
}

3.4 Permission Boundary Manager

"""
permission_boundary_manager.py
Manages IAM permission boundaries for MangaAssist FM teams.
Ensures that no role within a team can exceed the defined ceiling,
regardless of what policies are attached to it.
"""

import json
import logging
import boto3
from typing import Dict, List, Optional, Any
from dataclasses import dataclass, field
from datetime import datetime, timezone

logger = logging.getLogger(__name__)


@dataclass
class TeamBoundaryConfig:
    """Configuration for a team's permission boundary."""
    team_name: str
    boundary_policy_name: str
    boundary_policy_document: Dict
    applicable_role_prefix: str
    environment_scope: List[str]
    max_session_duration: int = 3600
    require_mfa: bool = True
    tags: Dict[str, str] = field(default_factory=dict)


class PermissionBoundaryManager:
    """
    Creates and manages IAM permission boundaries for MangaAssist teams.

    Permission boundaries are the second layer of defense (after SCPs).
    They set an absolute ceiling on what any IAM entity within a team
    can do, regardless of what policies are attached.

    Key behaviors:
    - Creates managed policies for each team boundary
    - Attaches boundaries to all roles matching the team prefix
    - Validates that no role exceeds its boundary
    - Reports compliance drift to Security Hub

    Teams and their boundaries:
        dev-team    -> Can invoke Bedrock in staging/dev, no production
        ops-team    -> Can invoke Haiku in production, full monitoring
        admin-team  -> Full Bedrock access, model customization
        ci-cd       -> List/Get models only, deploy ECS services
    """

    def __init__(self, region: str = "ap-northeast-1"):
        self.iam_client = boto3.client("iam")
        self.region = region
        self.account_id = boto3.client("sts").get_caller_identity()["Account"]

    def create_boundary(self, config: TeamBoundaryConfig) -> str:
        """
        Create or update a managed IAM policy for a team's permission boundary.

        Returns the policy ARN.
        """
        policy_arn = (
            f"arn:aws:iam::{self.account_id}:policy/"
            f"{config.boundary_policy_name}"
        )

        try:
            # Check if policy already exists
            self.iam_client.get_policy(PolicyArn=policy_arn)

            # Policy exists: create a new version
            policy_doc = json.dumps(config.boundary_policy_document)
            self.iam_client.create_policy_version(
                PolicyArn=policy_arn,
                PolicyDocument=policy_doc,
                SetAsDefault=True,
            )
            logger.info(
                f"Updated permission boundary: {config.boundary_policy_name}"
            )

        except self.iam_client.exceptions.NoSuchEntityException:
            # Policy does not exist: create it
            policy_doc = json.dumps(config.boundary_policy_document)
            response = self.iam_client.create_policy(
                PolicyName=config.boundary_policy_name,
                PolicyDocument=policy_doc,
                Description=(
                    f"Permission boundary for {config.team_name} team "
                    f"in MangaAssist"
                ),
                Tags=[
                    {"Key": "Team", "Value": config.team_name},
                    {"Key": "Project", "Value": "MangaAssist"},
                    {"Key": "ManagedBy", "Value": "PermissionBoundaryManager"},
                ],
            )
            policy_arn = response["Policy"]["Arn"]
            logger.info(
                f"Created permission boundary: {config.boundary_policy_name} "
                f"({policy_arn})"
            )

        return policy_arn

    def attach_boundary_to_team_roles(
        self, config: TeamBoundaryConfig, policy_arn: str
    ) -> List[str]:
        """
        Attach the permission boundary to all IAM roles matching the
        team's role prefix.

        Returns a list of role names that were updated.
        """
        updated_roles = []
        paginator = self.iam_client.get_paginator("list_roles")

        for page in paginator.paginate():
            for role in page["Roles"]:
                role_name = role["RoleName"]
                if role_name.startswith(config.applicable_role_prefix):
                    self.iam_client.put_role_permissions_boundary(
                        RoleName=role_name,
                        PermissionsBoundary=policy_arn,
                    )
                    updated_roles.append(role_name)
                    logger.info(
                        f"Attached boundary {config.boundary_policy_name} "
                        f"to role {role_name}"
                    )

        logger.info(
            f"Updated {len(updated_roles)} roles for team "
            f"{config.team_name}"
        )
        return updated_roles

    def validate_boundary_compliance(
        self, config: TeamBoundaryConfig, policy_arn: str
    ) -> List[Dict[str, Any]]:
        """
        Validate that all roles matching the team prefix have the
        correct permission boundary attached.

        Returns a list of non-compliant roles.
        """
        non_compliant = []
        paginator = self.iam_client.get_paginator("list_roles")

        for page in paginator.paginate():
            for role in page["Roles"]:
                role_name = role["RoleName"]
                if role_name.startswith(config.applicable_role_prefix):
                    current_boundary = role.get(
                        "PermissionsBoundary", {}
                    ).get("PermissionsBoundaryArn", "")

                    if current_boundary != policy_arn:
                        non_compliant.append({
                            "role_name": role_name,
                            "expected_boundary": policy_arn,
                            "actual_boundary": current_boundary or "NONE",
                            "team": config.team_name,
                            "severity": "HIGH",
                        })

        if non_compliant:
            logger.warning(
                f"Found {len(non_compliant)} non-compliant roles for "
                f"team {config.team_name}"
            )
        else:
            logger.info(
                f"All roles for team {config.team_name} are compliant"
            )

        return non_compliant

    def cleanup_old_policy_versions(self, policy_arn: str) -> int:
        """
        Remove non-default policy versions to stay within the
        AWS limit of 5 versions per managed policy.
        """
        versions = self.iam_client.list_policy_versions(
            PolicyArn=policy_arn
        )["Versions"]

        deleted_count = 0
        for version in versions:
            if not version["IsDefaultVersion"]:
                self.iam_client.delete_policy_version(
                    PolicyArn=policy_arn,
                    VersionId=version["VersionId"],
                )
                deleted_count += 1

        if deleted_count:
            logger.info(
                f"Cleaned up {deleted_count} old policy versions for "
                f"{policy_arn}"
            )

        return deleted_count

    def generate_compliance_report(
        self, team_configs: List[TeamBoundaryConfig]
    ) -> Dict[str, Any]:
        """
        Generate a compliance report across all teams showing boundary
        attachment status, non-compliant roles, and remediation actions.
        """
        report = {
            "generated_at": datetime.now(timezone.utc).isoformat(),
            "account_id": self.account_id,
            "region": self.region,
            "teams": [],
            "total_roles_checked": 0,
            "total_non_compliant": 0,
        }

        for config in team_configs:
            policy_arn = (
                f"arn:aws:iam::{self.account_id}:policy/"
                f"{config.boundary_policy_name}"
            )
            violations = self.validate_boundary_compliance(
                config, policy_arn
            )

            team_entry = {
                "team_name": config.team_name,
                "boundary_policy": config.boundary_policy_name,
                "role_prefix": config.applicable_role_prefix,
                "violations": violations,
                "compliant": len(violations) == 0,
            }
            report["teams"].append(team_entry)
            report["total_non_compliant"] += len(violations)

        return report

4. API Key Rotation and Token-Based Authentication

4.1 Why API Key Rotation Matters

MangaAssist uses API keys for service-to-service authentication (e.g., the ECS orchestrator calling internal microservices). These keys must be rotated regularly to limit the blast radius of a compromise.

Rotation strategy: - API keys are stored in AWS Secrets Manager - Secrets Manager triggers automatic rotation every 30 days - A Lambda function generates the new key, tests it, and promotes it - During rotation, both old and new keys are valid (dual-key window) - The old key is revoked after the new key is confirmed working

4.2 API Key Rotator

"""
api_key_rotator.py
Automated API key rotation for MangaAssist service-to-service authentication.
Integrates with AWS Secrets Manager for secure key lifecycle management.
"""

import json
import logging
import secrets
import string
import boto3
from typing import Dict, Optional, Any
from dataclasses import dataclass
from datetime import datetime, timezone, timedelta

logger = logging.getLogger(__name__)


@dataclass
class APIKeyConfig:
    """Configuration for an API key managed by the rotator."""
    secret_name: str
    description: str
    key_length: int = 64
    rotation_days: int = 30
    dual_key_window_hours: int = 24
    allowed_services: list = None
    rate_limit_per_minute: int = 1000
    tags: Dict[str, str] = None

    def __post_init__(self):
        if self.allowed_services is None:
            self.allowed_services = []
        if self.tags is None:
            self.tags = {}


class APIKeyRotator:
    """
    Manages the lifecycle of API keys for MangaAssist FM endpoints.

    Handles:
    - Secure key generation with cryptographic randomness
    - Storage in AWS Secrets Manager with encryption
    - Automatic rotation via Secrets Manager rotation Lambda
    - Dual-key window for zero-downtime rotation
    - Revocation and cleanup of expired keys
    - Audit logging of all key operations

    Rotation flow:
        1. Secrets Manager triggers rotation Lambda
        2. Lambda calls create_secret (generate new key)
        3. Lambda calls set_secret (store in pending state)
        4. Lambda calls test_secret (validate new key works)
        5. Lambda calls finish_secret (promote new key, mark old as previous)
    """

    def __init__(self, region: str = "ap-northeast-1"):
        self.secrets_client = boto3.client(
            "secretsmanager", region_name=region
        )
        self.apigw_client = boto3.client("apigatewayv2", region_name=region)
        self.region = region

    def create_api_key(self, config: APIKeyConfig) -> Dict[str, Any]:
        """
        Create a new API key and store it in Secrets Manager.

        The key is generated using Python's secrets module for
        cryptographic randomness, ensuring it cannot be predicted.
        """
        # Generate a cryptographically secure API key
        alphabet = string.ascii_letters + string.digits
        api_key = "manga_" + "".join(
            secrets.choice(alphabet) for _ in range(config.key_length)
        )

        # Create the secret value
        secret_value = {
            "api_key": api_key,
            "created_at": datetime.now(timezone.utc).isoformat(),
            "rotation_days": config.rotation_days,
            "allowed_services": config.allowed_services,
            "rate_limit_per_minute": config.rate_limit_per_minute,
            "version": "v1",
        }

        # Store in Secrets Manager
        try:
            response = self.secrets_client.create_secret(
                Name=config.secret_name,
                Description=config.description,
                SecretString=json.dumps(secret_value),
                Tags=[
                    {"Key": k, "Value": v}
                    for k, v in {
                        **config.tags,
                        "Project": "MangaAssist",
                        "ManagedBy": "APIKeyRotator",
                        "RotationDays": str(config.rotation_days),
                    }.items()
                ],
            )
        except self.secrets_client.exceptions.ResourceExistsException:
            response = self.secrets_client.update_secret(
                SecretId=config.secret_name,
                SecretString=json.dumps(secret_value),
            )

        logger.info(f"Created API key secret: {config.secret_name}")

        return {
            "secret_name": config.secret_name,
            "secret_arn": response.get("ARN", ""),
            "key_prefix": api_key[:12] + "...",  # Log prefix only
        }

    def enable_automatic_rotation(
        self,
        secret_name: str,
        rotation_lambda_arn: str,
        rotation_days: int = 30,
    ) -> Dict[str, Any]:
        """
        Enable automatic rotation for an API key secret.

        The rotation Lambda function handles the four-step rotation
        process: createSecret, setSecret, testSecret, finishSecret.
        """
        response = self.secrets_client.rotate_secret(
            SecretId=secret_name,
            RotationLambdaARN=rotation_lambda_arn,
            RotationRules={
                "AutomaticallyAfterDays": rotation_days,
                "Duration": "24h",  # Dual-key window
                "ScheduleExpression": f"rate({rotation_days} days)",
            },
        )

        logger.info(
            f"Enabled automatic rotation for {secret_name} "
            f"every {rotation_days} days"
        )

        return {
            "secret_name": secret_name,
            "rotation_enabled": True,
            "rotation_days": rotation_days,
            "next_rotation": response.get("VersionId", ""),
        }

    def rotate_key_now(self, secret_name: str) -> Dict[str, Any]:
        """
        Trigger an immediate key rotation outside the normal schedule.

        Use this when a key is suspected of being compromised.
        """
        # Get the current secret for comparison
        current = self.secrets_client.get_secret_value(SecretId=secret_name)
        current_value = json.loads(current["SecretString"])
        current_version = current.get("VersionId", "unknown")

        # Generate a new key
        alphabet = string.ascii_letters + string.digits
        key_length = len(current_value.get("api_key", "")) - 6  # minus prefix
        new_key = "manga_" + "".join(
            secrets.choice(alphabet) for _ in range(max(key_length, 64))
        )

        # Update the secret
        new_value = {
            **current_value,
            "api_key": new_key,
            "created_at": datetime.now(timezone.utc).isoformat(),
            "previous_version": current_version,
            "rotation_reason": "manual_emergency",
            "version": f"v{int(current_value.get('version', 'v1')[1:]) + 1}",
        }

        self.secrets_client.update_secret(
            SecretId=secret_name,
            SecretString=json.dumps(new_value),
        )

        logger.warning(
            f"Emergency key rotation completed for {secret_name} "
            f"(old version: {current_version})"
        )

        return {
            "secret_name": secret_name,
            "old_version": current_version,
            "new_key_prefix": new_key[:12] + "...",
            "rotated_at": datetime.now(timezone.utc).isoformat(),
            "reason": "manual_emergency",
        }

    def validate_key(
        self, secret_name: str, provided_key: str
    ) -> Dict[str, Any]:
        """
        Validate a provided API key against the stored secret.

        Checks both the current and previous versions during the
        dual-key rotation window.
        """
        try:
            # Check current version
            current = self.secrets_client.get_secret_value(
                SecretId=secret_name, VersionStage="AWSCURRENT"
            )
            current_value = json.loads(current["SecretString"])

            if secrets.compare_digest(
                provided_key, current_value.get("api_key", "")
            ):
                return {
                    "valid": True,
                    "version": "current",
                    "allowed_services": current_value.get(
                        "allowed_services", []
                    ),
                    "rate_limit": current_value.get(
                        "rate_limit_per_minute", 1000
                    ),
                }

            # Check previous version (dual-key window)
            try:
                previous = self.secrets_client.get_secret_value(
                    SecretId=secret_name, VersionStage="AWSPREVIOUS"
                )
                previous_value = json.loads(previous["SecretString"])

                if secrets.compare_digest(
                    provided_key, previous_value.get("api_key", "")
                ):
                    logger.info(
                        f"Key validated against previous version for "
                        f"{secret_name} (dual-key window)"
                    )
                    return {
                        "valid": True,
                        "version": "previous",
                        "allowed_services": previous_value.get(
                            "allowed_services", []
                        ),
                        "rate_limit": previous_value.get(
                            "rate_limit_per_minute", 1000
                        ),
                        "warning": "Using previous key version; update to current",
                    }
            except Exception:
                pass  # No previous version available

            return {"valid": False, "reason": "Key does not match any version"}

        except self.secrets_client.exceptions.ResourceNotFoundException:
            return {"valid": False, "reason": f"Secret {secret_name} not found"}

    def list_managed_keys(self) -> List[Dict[str, Any]]:
        """
        List all API keys managed by this rotator.

        Filters secrets by the ManagedBy tag.
        """
        keys = []
        paginator = self.secrets_client.get_paginator("list_secrets")

        for page in paginator.paginate(
            Filters=[
                {
                    "Key": "tag-key",
                    "Values": ["ManagedBy"],
                },
                {
                    "Key": "tag-value",
                    "Values": ["APIKeyRotator"],
                },
            ]
        ):
            for secret in page.get("SecretList", []):
                keys.append({
                    "name": secret["Name"],
                    "arn": secret["ARN"],
                    "last_rotated": (
                        secret.get("LastRotatedDate", "").isoformat()
                        if secret.get("LastRotatedDate")
                        else "never"
                    ),
                    "rotation_enabled": secret.get(
                        "RotationEnabled", False
                    ),
                    "next_rotation": (
                        secret.get("NextRotationDate", "").isoformat()
                        if secret.get("NextRotationDate")
                        else "N/A"
                    ),
                })

        return keys

5. FM Access Auditor

5.1 Audit Architecture

Every FM access decision in MangaAssist is logged to CloudTrail and CloudWatch Logs, creating an immutable audit trail. The FM Access Auditor queries these logs to detect anomalies, generate compliance reports, and trigger alerts.

flowchart LR
    subgraph Sources["Audit Event Sources"]
        Bedrock["Bedrock API Calls"]
        Controller["FMAccessController Decisions"]
        Cognito["Cognito Auth Events"]
        APIGW["API Gateway Access Logs"]
    end

    subgraph Collection["Log Collection"]
        CloudTrail["CloudTrail<br/>(Management + Data Events)"]
        CWLogs["CloudWatch Logs<br/>(Application Logs)"]
    end

    subgraph Analysis["Audit Analysis"]
        Auditor["FMAccessAuditor<br/>(Query + Analyze)"]
        Athena["Athena<br/>(CloudTrail Queries)"]
        Insights["CloudWatch Insights<br/>(Log Queries)"]
    end

    subgraph Output["Audit Outputs"]
        Report["Compliance Report"]
        Alert["Security Alerts"]
        Dashboard["Security Dashboard"]
    end

    Bedrock --> CloudTrail
    Controller --> CWLogs
    Cognito --> CWLogs
    APIGW --> CWLogs
    CloudTrail --> Auditor
    CWLogs --> Auditor
    Auditor --> Athena
    Auditor --> Insights
    Auditor --> Report
    Auditor --> Alert
    Auditor --> Dashboard

    style Sources fill:#e3f2fd,stroke:#1565c0,stroke-width:2px
    style Collection fill:#fff3e0,stroke:#e65100,stroke-width:2px
    style Analysis fill:#e8f5e9,stroke:#2e7d32,stroke-width:2px
    style Output fill:#fce4ec,stroke:#b71c1c,stroke-width:2px

5.2 FM Access Auditor Implementation

"""
fm_access_auditor.py
Audits all Foundation Model access in MangaAssist for compliance,
anomaly detection, and security reporting.
"""

import json
import logging
import boto3
from typing import Dict, List, Optional, Any
from dataclasses import dataclass, field
from datetime import datetime, timezone, timedelta
from collections import defaultdict

logger = logging.getLogger(__name__)


@dataclass
class AuditFinding:
    """Represents a security finding from the audit process."""
    finding_id: str
    severity: str           # CRITICAL, HIGH, MEDIUM, LOW, INFO
    category: str           # access_violation, anomaly, compliance, cost
    title: str
    description: str
    affected_resource: str
    recommended_action: str
    evidence: Dict[str, Any] = field(default_factory=dict)
    timestamp: datetime = None

    def __post_init__(self):
        if self.timestamp is None:
            self.timestamp = datetime.now(timezone.utc)


class FMAccessAuditor:
    """
    Audits Foundation Model access patterns in MangaAssist.

    Capabilities:
    - Query CloudTrail for Bedrock API call history
    - Analyze access patterns for anomalies (unusual hours, spikes, etc.)
    - Detect permission violations (denied requests, escalation attempts)
    - Generate compliance reports for security reviews
    - Trigger SNS alerts for critical findings
    - Track cost attribution by role and model

    Anomaly detection rules:
        1. More than 2x average RPM for any role in a 5-minute window
        2. Access attempts outside normal business hours (JST 22:00-06:00)
        3. More than 5 denied access attempts from a single identity in 1 hour
        4. Sudden model tier upgrade (operator trying to invoke Sonnet)
        5. Access from unexpected IP ranges
    """

    ANOMALY_THRESHOLDS = {
        "rpm_multiplier": 2.0,
        "off_hours_start": 22,      # 10 PM JST
        "off_hours_end": 6,         # 6 AM JST
        "max_denials_per_hour": 5,
        "cost_spike_multiplier": 3.0,
    }

    def __init__(
        self,
        region: str = "ap-northeast-1",
        alert_topic_arn: Optional[str] = None,
    ):
        self.region = region
        self.cloudtrail_client = boto3.client(
            "cloudtrail", region_name=region
        )
        self.logs_client = boto3.client("logs", region_name=region)
        self.sns_client = boto3.client("sns", region_name=region)
        self.cloudwatch = boto3.client("cloudwatch", region_name=region)
        self.alert_topic_arn = alert_topic_arn

    def audit_bedrock_access(
        self, hours: int = 24
    ) -> List[AuditFinding]:
        """
        Audit all Bedrock API calls in the specified time window.

        Queries CloudTrail for InvokeModel events and analyzes them
        for violations and anomalies.
        """
        findings = []
        start_time = datetime.now(timezone.utc) - timedelta(hours=hours)

        events = self._query_cloudtrail_events(
            event_source="bedrock.amazonaws.com",
            start_time=start_time,
        )

        # Group events by identity for pattern analysis
        events_by_identity = defaultdict(list)
        events_by_model = defaultdict(list)
        denied_events = []

        for event in events:
            identity = event.get("userIdentity", {}).get("arn", "unknown")
            model = self._extract_model_from_event(event)

            events_by_identity[identity].append(event)
            events_by_model[model].append(event)

            if event.get("errorCode"):
                denied_events.append(event)

        # Check 1: Excessive denied requests
        denial_findings = self._check_denial_patterns(denied_events)
        findings.extend(denial_findings)

        # Check 2: Off-hours access
        off_hours_findings = self._check_off_hours_access(events)
        findings.extend(off_hours_findings)

        # Check 3: Rate anomalies
        rate_findings = self._check_rate_anomalies(events_by_identity)
        findings.extend(rate_findings)

        # Check 4: Unauthorized model access attempts
        model_findings = self._check_model_access_violations(events)
        findings.extend(model_findings)

        # Alert on critical findings
        critical_findings = [
            f for f in findings if f.severity in ("CRITICAL", "HIGH")
        ]
        if critical_findings and self.alert_topic_arn:
            self._send_alert(critical_findings)

        logger.info(
            f"Audit complete: {len(events)} events, "
            f"{len(findings)} findings "
            f"({len(critical_findings)} critical/high)"
        )

        return findings

    def _check_denial_patterns(
        self, denied_events: List[Dict]
    ) -> List[AuditFinding]:
        """
        Detect identities with excessive access denials,
        which may indicate credential compromise or escalation attempts.
        """
        findings = []
        denials_by_identity = defaultdict(list)

        for event in denied_events:
            identity = event.get("userIdentity", {}).get("arn", "unknown")
            denials_by_identity[identity].append(event)

        threshold = self.ANOMALY_THRESHOLDS["max_denials_per_hour"]

        for identity, events in denials_by_identity.items():
            if len(events) >= threshold:
                findings.append(AuditFinding(
                    finding_id=f"DENY-{hash(identity) % 100000:05d}",
                    severity="HIGH",
                    category="access_violation",
                    title="Excessive Bedrock Access Denials",
                    description=(
                        f"Identity '{identity}' had {len(events)} denied "
                        f"Bedrock access attempts (threshold: {threshold}). "
                        f"This may indicate a compromised credential or "
                        f"privilege escalation attempt."
                    ),
                    affected_resource=identity,
                    recommended_action=(
                        "Review the identity's recent activity. If the "
                        "denials are unexpected, rotate credentials and "
                        "investigate the source IP addresses."
                    ),
                    evidence={
                        "denial_count": len(events),
                        "threshold": threshold,
                        "error_codes": list(set(
                            e.get("errorCode", "") for e in events
                        )),
                        "source_ips": list(set(
                            e.get("sourceIPAddress", "") for e in events
                        )),
                    },
                ))

        return findings

    def _check_off_hours_access(
        self, events: List[Dict]
    ) -> List[AuditFinding]:
        """
        Detect Bedrock access outside normal business hours (JST).
        Off-hours access may indicate automated abuse or compromised creds.
        """
        findings = []
        off_hours_events = []

        for event in events:
            event_time = event.get("eventTime", "")
            if event_time:
                try:
                    dt = datetime.fromisoformat(
                        event_time.replace("Z", "+00:00")
                    )
                    jst_hour = (dt.hour + 9) % 24  # UTC to JST
                    start = self.ANOMALY_THRESHOLDS["off_hours_start"]
                    end = self.ANOMALY_THRESHOLDS["off_hours_end"]
                    if jst_hour >= start or jst_hour < end:
                        off_hours_events.append(event)
                except (ValueError, TypeError):
                    pass

        if len(off_hours_events) > 10:
            identities = set(
                e.get("userIdentity", {}).get("arn", "unknown")
                for e in off_hours_events
            )
            findings.append(AuditFinding(
                finding_id=f"OFFHRS-{len(off_hours_events):05d}",
                severity="MEDIUM",
                category="anomaly",
                title="Significant Off-Hours Bedrock Access",
                description=(
                    f"{len(off_hours_events)} Bedrock API calls detected "
                    f"outside business hours (JST 22:00-06:00) from "
                    f"{len(identities)} unique identities."
                ),
                affected_resource="bedrock.amazonaws.com",
                recommended_action=(
                    "Verify these calls are from legitimate automated "
                    "processes. If unexpected, investigate the source "
                    "identities and restrict off-hours access."
                ),
                evidence={
                    "off_hours_count": len(off_hours_events),
                    "unique_identities": list(identities)[:10],
                },
            ))

        return findings

    def _check_rate_anomalies(
        self, events_by_identity: Dict[str, List]
    ) -> List[AuditFinding]:
        """
        Detect identities with request rates significantly above baseline.
        """
        findings = []
        multiplier = self.ANOMALY_THRESHOLDS["rpm_multiplier"]

        # Calculate average rate across all identities
        all_counts = [len(evts) for evts in events_by_identity.values()]
        if not all_counts:
            return findings

        avg_count = sum(all_counts) / len(all_counts)

        for identity, events in events_by_identity.items():
            if len(events) > avg_count * multiplier and len(events) > 50:
                findings.append(AuditFinding(
                    finding_id=f"RATE-{hash(identity) % 100000:05d}",
                    severity="MEDIUM",
                    category="anomaly",
                    title="Abnormal Bedrock Request Rate",
                    description=(
                        f"Identity '{identity}' made {len(events)} "
                        f"requests ({multiplier}x above average of "
                        f"{avg_count:.0f})."
                    ),
                    affected_resource=identity,
                    recommended_action=(
                        "Review whether this rate is expected. Consider "
                        "adding or tightening rate limits for this identity."
                    ),
                    evidence={
                        "request_count": len(events),
                        "average_count": round(avg_count),
                        "multiplier": multiplier,
                    },
                ))

        return findings

    def _check_model_access_violations(
        self, events: List[Dict]
    ) -> List[AuditFinding]:
        """
        Detect attempts to access unauthorized models (e.g., an operator
        trying to invoke Sonnet).
        """
        findings = []
        sonnet_model_id = "anthropic.claude-3-sonnet"

        for event in events:
            if event.get("errorCode") == "AccessDeniedException":
                model = self._extract_model_from_event(event)
                if sonnet_model_id in model:
                    identity = event.get("userIdentity", {}).get(
                        "arn", "unknown"
                    )
                    findings.append(AuditFinding(
                        finding_id=f"MODEL-{hash(identity) % 100000:05d}",
                        severity="HIGH",
                        category="access_violation",
                        title="Unauthorized Premium Model Access Attempt",
                        description=(
                            f"Identity '{identity}' attempted to invoke "
                            f"premium model '{model}' but was denied. "
                            f"This role does not have permission for "
                            f"premium-tier models."
                        ),
                        affected_resource=model,
                        recommended_action=(
                            "Verify this was not an intentional escalation "
                            "attempt. If the identity legitimately needs "
                            "Sonnet access, submit an access request through "
                            "the standard approval process."
                        ),
                        evidence={
                            "model_id": model,
                            "identity": identity,
                            "error_code": event.get("errorCode", ""),
                            "source_ip": event.get("sourceIPAddress", ""),
                            "event_time": event.get("eventTime", ""),
                        },
                    ))

        return findings

    def _query_cloudtrail_events(
        self,
        event_source: str,
        start_time: datetime,
    ) -> List[Dict]:
        """Query CloudTrail for events from a specific service."""
        events = []
        try:
            paginator = self.cloudtrail_client.get_paginator("lookup_events")
            for page in paginator.paginate(
                LookupAttributes=[
                    {
                        "AttributeKey": "EventSource",
                        "AttributeValue": event_source,
                    }
                ],
                StartTime=start_time,
                EndTime=datetime.now(timezone.utc),
            ):
                for event in page.get("Events", []):
                    try:
                        cloud_trail_event = json.loads(
                            event.get("CloudTrailEvent", "{}")
                        )
                        events.append(cloud_trail_event)
                    except json.JSONDecodeError:
                        continue
        except Exception as e:
            logger.error(f"Failed to query CloudTrail: {e}")

        return events

    def _extract_model_from_event(self, event: Dict) -> str:
        """Extract the model ID from a Bedrock CloudTrail event."""
        resources = event.get("resources", [])
        for resource in resources:
            arn = resource.get("ARN", "")
            if "foundation-model" in arn:
                return arn.split("/")[-1]
        request_params = event.get("requestParameters", {})
        return request_params.get("modelId", "unknown")

    def _send_alert(self, findings: List[AuditFinding]) -> None:
        """Send SNS alert for critical/high severity findings."""
        if not self.alert_topic_arn:
            return

        message = {
            "source": "MangaAssist-FMAccessAuditor",
            "timestamp": datetime.now(timezone.utc).isoformat(),
            "finding_count": len(findings),
            "findings": [
                {
                    "id": f.finding_id,
                    "severity": f.severity,
                    "title": f.title,
                    "description": f.description,
                    "action": f.recommended_action,
                }
                for f in findings
            ],
        }

        try:
            self.sns_client.publish(
                TopicArn=self.alert_topic_arn,
                Subject=f"MangaAssist Security Alert: {len(findings)} findings",
                Message=json.dumps(message, indent=2),
            )
            logger.info(
                f"Sent alert for {len(findings)} critical/high findings"
            )
        except Exception as e:
            logger.error(f"Failed to send SNS alert: {e}")

    def generate_compliance_report(
        self, hours: int = 168
    ) -> Dict[str, Any]:
        """
        Generate a weekly compliance report summarizing all FM access.

        Includes:
        - Total API calls by model and role
        - Access denial summary
        - Anomaly detections
        - Cost attribution
        - Recommendations
        """
        findings = self.audit_bedrock_access(hours=hours)

        report = {
            "report_type": "weekly_fm_access_compliance",
            "generated_at": datetime.now(timezone.utc).isoformat(),
            "period_hours": hours,
            "findings_summary": {
                "total": len(findings),
                "critical": len(
                    [f for f in findings if f.severity == "CRITICAL"]
                ),
                "high": len(
                    [f for f in findings if f.severity == "HIGH"]
                ),
                "medium": len(
                    [f for f in findings if f.severity == "MEDIUM"]
                ),
                "low": len(
                    [f for f in findings if f.severity == "LOW"]
                ),
            },
            "findings": [
                {
                    "id": f.finding_id,
                    "severity": f.severity,
                    "category": f.category,
                    "title": f.title,
                    "description": f.description,
                    "action": f.recommended_action,
                }
                for f in findings
            ],
            "compliance_status": (
                "COMPLIANT" if not any(
                    f.severity in ("CRITICAL", "HIGH") for f in findings
                )
                else "NON_COMPLIANT"
            ),
        }

        return report

6. Token-Based Authentication Patterns

6.1 JWT Token Lifecycle for FM Access

stateDiagram-v2
    [*] --> Unauthenticated: User opens app
    Unauthenticated --> Authenticating: Login initiated
    Authenticating --> TokenIssued: Cognito returns tokens
    TokenIssued --> Active: JWT validated by Lambda authorizer

    Active --> Active: API calls with valid token
    Active --> Refreshing: Token nearing expiry (< 5 min)
    Refreshing --> Active: Refresh token exchange succeeds
    Refreshing --> Expired: Refresh token also expired

    Active --> Expired: Token expires (60 min)
    Expired --> Unauthenticated: Must re-authenticate

    Active --> Revoked: Admin revokes session
    Revoked --> Unauthenticated: Must re-authenticate

6.2 Token Validation in Lambda Authorizer

"""
lambda_authorizer.py
WebSocket API Gateway Lambda authorizer for MangaAssist.
Validates Cognito JWT tokens and generates IAM policies.
"""

import json
import logging
import time
import urllib.request
from typing import Dict, Any, Optional

logger = logging.getLogger(__name__)

# Configuration
USER_POOL_ID = "ap-northeast-1_XXXXXXXXX"
REGION = "ap-northeast-1"
APP_CLIENT_ID = "xxxxxxxxxxxxxxxxxxxxxxxxxx"

# JWKS cache
_jwks_cache = None
_jwks_cache_time = 0
JWKS_CACHE_TTL = 3600  # 1 hour


def handler(event: Dict[str, Any], context: Any) -> Dict[str, Any]:
    """
    Lambda authorizer for WebSocket API Gateway.

    Validates the JWT token from the Authorization header and
    returns an IAM policy that allows or denies the WebSocket
    connection based on the user's role.
    """
    token = _extract_token(event)
    if not token:
        logger.warning("No token found in request")
        raise Exception("Unauthorized")

    try:
        claims = _validate_jwt(token)
    except Exception as e:
        logger.warning(f"Token validation failed: {e}")
        raise Exception("Unauthorized")

    # Extract role and build policy
    role = claims.get("custom:role", "readonly")
    user_id = claims.get("sub", "unknown")
    method_arn = event.get("methodArn", "")

    policy = _build_policy(user_id, role, method_arn)

    # Add context for downstream services
    policy["context"] = {
        "user_id": user_id,
        "role": role,
        "email": claims.get("email", ""),
        "department": claims.get("custom:department", ""),
        "model_access_tier": claims.get("custom:model_access_tier", "none"),
        "mfa_verified": str("mfa" in claims.get("amr", [])),
    }

    logger.info(
        f"Authorized: user={user_id}, role={role}"
    )

    return policy


def _extract_token(event: Dict[str, Any]) -> Optional[str]:
    """Extract the Bearer token from request headers."""
    headers = event.get("headers", {})
    auth_header = headers.get("Authorization", headers.get("authorization", ""))

    if auth_header.startswith("Bearer "):
        return auth_header[7:]
    return auth_header if auth_header else None


def _validate_jwt(token: str) -> Dict[str, Any]:
    """
    Validate a Cognito JWT token.

    Checks: signature, expiration, issuer, and token_use.
    """
    import jwt as pyjwt
    from jwt.algorithms import RSAAlgorithm

    header = pyjwt.get_unverified_header(token)
    kid = header.get("kid")

    signing_key = _get_signing_key(kid)
    issuer = f"https://cognito-idp.{REGION}.amazonaws.com/{USER_POOL_ID}"

    claims = pyjwt.decode(
        token,
        signing_key,
        algorithms=["RS256"],
        issuer=issuer,
        options={
            "verify_exp": True,
            "verify_iss": True,
            "verify_aud": False,
        },
    )

    # Verify token_use is "id" (not "access")
    if claims.get("token_use") != "id":
        raise ValueError("Expected id token, got access token")

    return claims


def _get_signing_key(kid: str):
    """Retrieve signing key from Cognito JWKS with caching."""
    global _jwks_cache, _jwks_cache_time
    from jwt.algorithms import RSAAlgorithm

    if _jwks_cache is None or time.time() - _jwks_cache_time > JWKS_CACHE_TTL:
        jwks_url = (
            f"https://cognito-idp.{REGION}.amazonaws.com/"
            f"{USER_POOL_ID}/.well-known/jwks.json"
        )
        with urllib.request.urlopen(jwks_url) as resp:
            _jwks_cache = json.loads(resp.read())
            _jwks_cache_time = time.time()

    for key in _jwks_cache.get("keys", []):
        if key["kid"] == kid:
            return RSAAlgorithm.from_jwk(json.dumps(key))

    raise ValueError(f"Key {kid} not found in JWKS")


def _build_policy(
    principal_id: str, role: str, method_arn: str
) -> Dict[str, Any]:
    """
    Build an IAM policy document for the API Gateway authorizer.

    Different roles get different route access:
    - admin:     all routes
    - developer: chat, search, admin-dashboard
    - operator:  chat, health, metrics
    - readonly:  health, metrics only
    """
    arn_parts = method_arn.split(":")
    region = arn_parts[3]
    account_id = arn_parts[4]
    api_gateway_arn = arn_parts[5].split("/")
    api_id = api_gateway_arn[0]
    stage = api_gateway_arn[1]

    base_arn = f"arn:aws:execute-api:{region}:{account_id}:{api_id}/{stage}"

    route_permissions = {
        "admin": [
            f"{base_arn}/*",
        ],
        "developer": [
            f"{base_arn}/chat/*",
            f"{base_arn}/search/*",
            f"{base_arn}/admin-dashboard/*",
        ],
        "operator": [
            f"{base_arn}/chat/*",
            f"{base_arn}/health/*",
            f"{base_arn}/metrics/*",
        ],
        "readonly": [
            f"{base_arn}/health/*",
            f"{base_arn}/metrics/*",
        ],
    }

    allowed_resources = route_permissions.get(role, route_permissions["readonly"])

    return {
        "principalId": principal_id,
        "policyDocument": {
            "Version": "2012-10-17",
            "Statement": [
                {
                    "Action": "execute-api:Invoke",
                    "Effect": "Allow",
                    "Resource": allowed_resources,
                },
            ],
        },
    }

7. Cross-Account FM Access Patterns

7.1 When Cross-Account Access is Needed

MangaAssist may use separate AWS accounts for different environments (dev, staging, production) or for separating the ML platform team from the application team. Cross-account access to Bedrock must be carefully controlled.

7.2 Cross-Account Role Assumption

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Sid": "AllowCrossAccountBedrockAccess",
            "Effect": "Allow",
            "Action": "sts:AssumeRole",
            "Resource": [
                "arn:aws:iam::222233334444:role/manga-bedrock-cross-account"
            ],
            "Condition": {
                "StringEquals": {
                    "sts:ExternalId": "MangaAssist-Prod-2024",
                    "aws:PrincipalTag/Team": "manga-platform"
                },
                "IpAddress": {
                    "aws:SourceIp": "10.0.0.0/8"
                }
            }
        }
    ]
}

7.3 Trust Policy on the Target Account Role

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Sid": "TrustMangaAssistProdAccount",
            "Effect": "Allow",
            "Principal": {
                "AWS": "arn:aws:iam::111122223333:root"
            },
            "Action": "sts:AssumeRole",
            "Condition": {
                "StringEquals": {
                    "sts:ExternalId": "MangaAssist-Prod-2024"
                },
                "Bool": {
                    "aws:MultiFactorAuthPresent": "true"
                }
            }
        }
    ]
}

8. IAM Access Analyzer Integration

8.1 Automated Policy Validation

IAM Access Analyzer can validate IAM policies before they are deployed, catching overly permissive policies before they reach production.

"""
iam_policy_validator.py
Validates IAM policies using IAM Access Analyzer before deployment.
"""

import json
import logging
import boto3
from typing import Dict, List, Any

logger = logging.getLogger(__name__)


class IAMPolicyValidator:
    """
    Validates IAM policies for MangaAssist using IAM Access Analyzer.

    Checks for:
    - Overly permissive actions (e.g., bedrock:*)
    - Missing resource constraints (Resource: *)
    - Missing condition keys (no VPC endpoint restriction)
    - Security anti-patterns (allowing PassRole without constraints)
    """

    BEDROCK_RESTRICTED_ACTIONS = [
        "bedrock:CreateModelCustomizationJob",
        "bedrock:DeleteCustomModel",
        "bedrock:CreateProvisionedModelThroughput",
        "bedrock:DeleteProvisionedModelThroughput",
    ]

    def __init__(self, region: str = "ap-northeast-1"):
        self.access_analyzer = boto3.client(
            "accessanalyzer", region_name=region
        )
        self.iam_client = boto3.client("iam")

    def validate_policy(
        self, policy_document: Dict
    ) -> Dict[str, Any]:
        """
        Validate an IAM policy document using Access Analyzer.

        Returns findings with severity and recommendations.
        """
        response = self.access_analyzer.validate_policy(
            policyDocument=json.dumps(policy_document),
            policyType="IDENTITY_POLICY",
        )

        findings = response.get("findings", [])

        result = {
            "valid": all(
                f["findingType"] != "ERROR" for f in findings
            ),
            "errors": [
                f for f in findings if f["findingType"] == "ERROR"
            ],
            "warnings": [
                f for f in findings if f["findingType"] == "WARNING"
            ],
            "suggestions": [
                f for f in findings if f["findingType"] == "SUGGESTION"
            ],
        }

        # Custom MangaAssist checks
        custom_findings = self._check_manga_specific_rules(policy_document)
        result["custom_findings"] = custom_findings

        if custom_findings:
            result["valid"] = False

        return result

    def _check_manga_specific_rules(
        self, policy_document: Dict
    ) -> List[Dict[str, str]]:
        """
        Apply MangaAssist-specific policy validation rules.

        These go beyond standard IAM analysis to enforce the project's
        security requirements.
        """
        findings = []
        statements = policy_document.get("Statement", [])

        for stmt in statements:
            actions = stmt.get("Action", [])
            if isinstance(actions, str):
                actions = [actions]
            resources = stmt.get("Resource", [])
            if isinstance(resources, str):
                resources = [resources]
            conditions = stmt.get("Condition", {})

            # Check 1: No wildcard Bedrock actions
            if stmt.get("Effect") == "Allow":
                for action in actions:
                    if action == "bedrock:*":
                        findings.append({
                            "rule": "no_wildcard_bedrock",
                            "severity": "HIGH",
                            "message": (
                                "Wildcard bedrock:* action found. Use "
                                "specific actions like bedrock:InvokeModel."
                            ),
                        })

            # Check 2: InvokeModel must have resource constraint
            if stmt.get("Effect") == "Allow":
                invoke_actions = [
                    a for a in actions
                    if "InvokeModel" in a
                ]
                if invoke_actions and "*" in resources:
                    findings.append({
                        "rule": "invoke_needs_resource",
                        "severity": "HIGH",
                        "message": (
                            "InvokeModel with Resource: * allows access to "
                            "all models. Specify model ARNs explicitly."
                        ),
                    })

            # Check 3: InvokeModel should require VPC endpoint
            if stmt.get("Effect") == "Allow":
                invoke_actions = [
                    a for a in actions
                    if "InvokeModel" in a
                ]
                if invoke_actions and not conditions.get(
                    "StringEquals", {}
                ).get("aws:sourceVpce"):
                    findings.append({
                        "rule": "invoke_needs_vpce",
                        "severity": "MEDIUM",
                        "message": (
                            "InvokeModel without VPC endpoint condition. "
                            "Add aws:sourceVpce condition for network "
                            "security."
                        ),
                    })

            # Check 4: Restricted actions should only be in deny statements
            if stmt.get("Effect") == "Allow":
                for action in actions:
                    if action in self.BEDROCK_RESTRICTED_ACTIONS:
                        findings.append({
                            "rule": "restricted_action_in_allow",
                            "severity": "CRITICAL",
                            "message": (
                                f"Restricted action '{action}' found in "
                                f"Allow statement. This should only appear "
                                f"in admin policies with strict conditions."
                            ),
                        })

        return findings

9. Best Practices Summary

9.1 RBAC Best Practices for FM Access

Practice Implementation Benefit
Principle of least privilege Each role can only invoke the minimum models needed Prevents cost overrun and data exposure
Environment separation Developers cannot invoke Sonnet in production Staging mistakes do not hit production costs
Permission boundaries Teams cannot exceed their ceiling even with new roles Prevents privilege escalation via new role creation
SCPs at org level Only approved models and regions allowed Organization-wide guardrail that individual accounts cannot override
Time-limited credentials 1-hour tokens, 7-day refresh Limits blast radius of credential theft
Automated rotation 30-day API key rotation via Secrets Manager Reduces window of exposure for compromised keys
Continuous audit FMAccessAuditor queries CloudTrail hourly Detects anomalies before they become incidents
Policy validation IAM Access Analyzer checks before deployment Catches misconfigurations in CI/CD pipeline

9.2 Common Anti-Patterns to Avoid

Anti-Pattern Risk Correct Approach
bedrock:* in any Allow statement Allows all Bedrock actions including model deletion Specify exact actions: bedrock:InvokeModel
Resource: * for InvokeModel Allows invoking any model, including expensive ones Specify model ARNs explicitly
Long-lived IAM access keys Keys can be exfiltrated and used indefinitely Use temporary credentials via STS/Cognito
Same IAM role for all environments Dev mistakes affect production Separate roles per environment with tag conditions
No rate limiting on FM calls Runaway automation can consume entire budget Application-layer rate limits + WAF rate rules
Shared API keys across services Cannot attribute costs or detect specific compromises Per-service keys with Secrets Manager rotation

Key Takeaways

  1. Four layers of access control (SCPs, permission boundaries, IAM policies, application RBAC) ensure that no single misconfiguration grants unauthorized FM access.

  2. Permission boundaries are the most underused but most powerful tool: they set an absolute ceiling that cannot be exceeded even by creating new roles with broader policies.

  3. API key rotation through Secrets Manager with a dual-key window enables zero-downtime credential rotation, critical for a 24/7 chatbot serving 1M messages/day.

  4. The FM Access Auditor provides continuous compliance monitoring, detecting anomalies like off-hours access, rate spikes, and unauthorized model access attempts before they become incidents.

  5. Policy validation in CI/CD using IAM Access Analyzer catches overly permissive policies before they reach production, shifting security left in the development lifecycle.