LOCAL PREVIEW View on GitHub

Secure Access Architecture for FM Services

MangaAssist context: JP Manga store chatbot on AWS — Bedrock Claude 3 (Sonnet at $3/$15 per 1M tokens input/output, Haiku at $0.25/$1.25), OpenSearch Serverless (vector store), DynamoDB (sessions/products), ECS Fargate (orchestrator), API Gateway WebSocket, ElastiCache Redis. Target: useful answer in under 3 seconds, 1M messages/day scale.


Skill Mapping

Attribute Value
Certification AWS Certified AI Practitioner (AIP-C01)
Domain 2 — Implementation and Integration of Foundation Models
Task 2.3 — Describe methods to integrate FM-powered applications into enterprise systems
Skill 2.3.3 — Create secure access frameworks to ensure appropriate security controls (identity federation between FM services and enterprise systems, role-based access control for model and data access, least privilege API access to FMs)

1. Security Architecture Mindmap

mindmap
  root((Secure Access<br/>Architecture))
    Identity Federation
      Cognito User Pools
        SAML 2.0 Providers
        OIDC Providers
        Social Identity Providers
      Cognito Identity Pools
        Federated Identities
        Temporary AWS Credentials
        Cross-Account Access
      Enterprise IdP Integration
        Active Directory
        Okta and Auth0
        Custom SAML Providers
      Token Management
        JWT Validation
        Token Refresh Flows
        Session Duration Controls
    RBAC — Role-Based Access Control
      Model-Level Permissions
        Admin Full Access
        Developer Invoke Only
        Read-Only Monitoring
        Per-Model Granularity
      Data Access Boundaries
        Vector Store Isolation
        Session Data Scoping
        Product Catalog Access
      Environment Separation
        Production Lockdown
        Staging Limited Access
        Development Sandbox
      Audit and Compliance
        CloudTrail Logging
        Access Reviews
        Compliance Reports
    Least Privilege
      IAM Policy Design
        Action-Level Restrictions
        Resource-Level Conditions
        Tag-Based Access Control
      Permission Boundaries
        Team-Level Boundaries
        Service-Level Boundaries
        Account-Level SCPs
      Temporary Credentials
        STS AssumeRole
        Session Policies
        Time-Limited Tokens
      Automated Enforcement
        IAM Access Analyzer
        Policy Validation
        Drift Detection
    API Security
      API Gateway Controls
        WAF Integration
        Rate Limiting
        Request Validation
      Authentication Layers
        Cognito Authorizers
        Lambda Authorizers
        IAM Authorization
      Encryption
        TLS 1.3 In-Transit
        KMS At-Rest
        VPC Endpoints
      Monitoring
        CloudWatch Alarms
        GuardDuty Threats
        Security Hub Findings

2. MangaAssist Security Architecture Flowchart

flowchart TB
    subgraph ClientLayer["Client Layer"]
        WebApp["Web Application<br/>(Manga Store Frontend)"]
        MobileApp["Mobile Application"]
        AdminConsole["Admin Console"]
    end

    subgraph AuthLayer["Authentication and Authorization Layer"]
        Cognito["Amazon Cognito<br/>User Pool + Identity Pool"]
        SAML["Enterprise SAML IdP<br/>(Active Directory)"]
        OIDC["OIDC Provider<br/>(Social Login / LINE)"]
        LambdaAuth["Lambda Authorizer<br/>(Custom Token Validation)"]
    end

    subgraph APILayer["API Security Layer"]
        APIGW["API Gateway WebSocket<br/>+ WAF + Rate Limiting"]
        VPCEndpoint["VPC Endpoint<br/>(AWS PrivateLink)"]
    end

    subgraph ComputeLayer["Compute and Orchestration Layer"]
        ECS["ECS Fargate<br/>(Task Role: manga-ecs-task-role)"]
        IAMRole["IAM Role<br/>(Least Privilege Bedrock Access)"]
        PermBoundary["Permission Boundary<br/>(Team-Level Restrictions)"]
    end

    subgraph FMLayer["Foundation Model Layer"]
        Bedrock["Amazon Bedrock<br/>(Claude 3 Sonnet / Haiku)"]
        ModelAccess["Model Access Policy<br/>(Per-Model Permissions)"]
        Guardrails["Bedrock Guardrails<br/>(Content Filtering)"]
    end

    subgraph DataLayer["Data Layer"]
        OpenSearch["OpenSearch Serverless<br/>(Vector Store — AOSS Policy)"]
        DynamoDB["DynamoDB<br/>(Sessions/Products — Table Policy)"]
        Redis["ElastiCache Redis<br/>(VPC-Only Access)"]
    end

    subgraph MonitorLayer["Monitoring and Audit Layer"]
        CloudTrail["CloudTrail<br/>(API Call Logging)"]
        CloudWatch["CloudWatch<br/>(Metrics and Alarms)"]
        GuardDuty["GuardDuty<br/>(Threat Detection)"]
        SecurityHub["Security Hub<br/>(Compliance Dashboard)"]
    end

    WebApp -->|"HTTPS + JWT"| Cognito
    MobileApp -->|"HTTPS + JWT"| Cognito
    AdminConsole -->|"SAML Assertion"| SAML
    SAML -->|"Federation"| Cognito
    OIDC -->|"OIDC Token"| Cognito
    Cognito -->|"ID Token + Access Token"| LambdaAuth
    LambdaAuth -->|"Authorize"| APIGW
    APIGW -->|"VPC Link"| ECS
    ECS -->|"AssumeRole"| IAMRole
    IAMRole -->|"Scoped Invoke"| Bedrock
    PermBoundary -.->|"Constrains"| IAMRole
    Bedrock --> ModelAccess
    Bedrock --> Guardrails
    ECS -->|"VPC Endpoint"| OpenSearch
    ECS -->|"VPC Endpoint"| DynamoDB
    ECS -->|"VPC Only"| Redis
    ECS -.->|"Logs"| CloudWatch
    IAMRole -.->|"API Calls"| CloudTrail
    CloudTrail -.->|"Findings"| SecurityHub
    GuardDuty -.->|"Alerts"| SecurityHub

    style ClientLayer fill:#e3f2fd,stroke:#1565c0,stroke-width:2px
    style AuthLayer fill:#fff3e0,stroke:#e65100,stroke-width:2px
    style APILayer fill:#f3e5f5,stroke:#6a1b9a,stroke-width:2px
    style ComputeLayer fill:#e8f5e9,stroke:#2e7d32,stroke-width:2px
    style FMLayer fill:#fce4ec,stroke:#b71c1c,stroke-width:2px
    style DataLayer fill:#e0f7fa,stroke:#00695c,stroke-width:2px
    style MonitorLayer fill:#fff8e1,stroke:#f57f17,stroke-width:2px

3. Identity Federation for FM Access

3.1 Federation Architecture Overview

MangaAssist uses a multi-layer identity federation strategy that bridges enterprise identity providers with AWS services, ultimately enabling authenticated and authorized access to Foundation Models through Amazon Bedrock.

Federation Flow:

  1. End Users authenticate via Cognito User Pool (email/password, social login, or enterprise SAML)
  2. Enterprise Staff authenticate via corporate Active Directory federated through SAML 2.0
  3. Cognito Identity Pool exchanges authenticated tokens for temporary AWS credentials
  4. STS issues time-limited credentials scoped to the user's role
  5. ECS Task Role assumes a Bedrock-access role with least privilege permissions

3.2 Why Identity Federation Matters for FM Services

Without federation, every team that needs FM access would need dedicated IAM users with long-lived credentials. At MangaAssist's scale of 1M messages/day, this creates:

  • Credential sprawl: Hundreds of access keys across teams
  • Audit gaps: No link between the human identity and the API call
  • Stale permissions: Departed employees retain FM access
  • Compliance violations: No centralized identity governance

Federation solves all four problems by routing every FM access request through a single identity provider, issuing temporary credentials, and logging every access decision.

3.3 Cognito User Pool Configuration

"""
cognito_pool_config.py
Amazon Cognito User Pool configuration for MangaAssist identity federation.
Supports SAML 2.0, OIDC, and native authentication flows.
"""

import json
import boto3
import logging
from typing import Dict, List, Optional, Any
from dataclasses import dataclass, field
from datetime import datetime

logger = logging.getLogger(__name__)


@dataclass
class SAMLProviderConfig:
    """Configuration for a SAML 2.0 identity provider."""
    provider_name: str
    metadata_url: Optional[str] = None
    metadata_file: Optional[str] = None
    idp_sign_out: bool = True
    attribute_mapping: Dict[str, str] = field(default_factory=lambda: {
        "email": "http://schemas.xmlsoap.org/ws/2005/05/identity/claims/emailaddress",
        "given_name": "http://schemas.xmlsoap.org/ws/2005/05/identity/claims/givenname",
        "family_name": "http://schemas.xmlsoap.org/ws/2005/05/identity/claims/surname",
        "custom:department": "http://schemas.xmlsoap.org/ws/2005/05/identity/claims/department",
        "custom:role": "http://schemas.xmlsoap.org/ws/2005/05/identity/claims/role",
    })


@dataclass
class OIDCProviderConfig:
    """Configuration for an OIDC identity provider."""
    provider_name: str
    client_id: str
    client_secret: str
    issuer_url: str
    authorize_scopes: str = "openid email profile"
    attribute_mapping: Dict[str, str] = field(default_factory=lambda: {
        "email": "email",
        "given_name": "given_name",
        "family_name": "family_name",
        "username": "sub",
    })


class CognitoPoolConfigurator:
    """
    Manages the Amazon Cognito User Pool configuration for MangaAssist.

    Creates and configures user pools with:
    - SAML 2.0 federation for enterprise staff (Active Directory)
    - OIDC federation for social login providers (LINE for JP market)
    - Custom attributes for role-based access control
    - Password policies and MFA enforcement
    - Token expiration and refresh policies aligned to session duration
    """

    def __init__(self, region: str = "ap-northeast-1"):
        self.cognito_client = boto3.client("cognito-idp", region_name=region)
        self.region = region

    def create_user_pool(self, pool_name: str = "MangaAssistUserPool") -> Dict[str, Any]:
        """
        Create a Cognito User Pool with security-hardened settings.

        Returns the User Pool ID and ARN for downstream configuration.
        """
        response = self.cognito_client.create_user_pool(
            PoolName=pool_name,
            Policies={
                "PasswordPolicy": {
                    "MinimumLength": 12,
                    "RequireUppercase": True,
                    "RequireLowercase": True,
                    "RequireNumbers": True,
                    "RequireSymbols": True,
                    "TemporaryPasswordValidityDays": 1,
                }
            },
            AutoVerifiedAttributes=["email"],
            UsernameAttributes=["email"],
            MfaConfiguration="ON",
            SoftwareTokenMfaConfiguration={"Enabled": True},
            UserPoolAddOns={"AdvancedSecurityMode": "ENFORCED"},
            Schema=[
                {
                    "Name": "email",
                    "AttributeDataType": "String",
                    "Required": True,
                    "Mutable": True,
                },
                {
                    "Name": "department",
                    "AttributeDataType": "String",
                    "Required": False,
                    "Mutable": True,
                    "StringAttributeConstraints": {"MaxLength": "64"},
                },
                {
                    "Name": "role",
                    "AttributeDataType": "String",
                    "Required": False,
                    "Mutable": True,
                    "StringAttributeConstraints": {"MaxLength": "32"},
                },
                {
                    "Name": "model_access_tier",
                    "AttributeDataType": "String",
                    "Required": False,
                    "Mutable": True,
                    "StringAttributeConstraints": {"MaxLength": "16"},
                },
            ],
            AccountRecoverySetting={
                "RecoveryMechanisms": [
                    {"Priority": 1, "Name": "verified_email"},
                ]
            },
            UserPoolTags={
                "Project": "MangaAssist",
                "SecurityLevel": "High",
                "Environment": "Production",
            },
        )

        pool_id = response["UserPool"]["Id"]
        pool_arn = response["UserPool"]["Arn"]
        logger.info(f"Created Cognito User Pool: {pool_id}")

        return {"pool_id": pool_id, "pool_arn": pool_arn}

    def configure_saml_provider(
        self, pool_id: str, config: SAMLProviderConfig
    ) -> Dict[str, Any]:
        """
        Register a SAML 2.0 identity provider for enterprise federation.

        Used for corporate Active Directory integration, allowing
        enterprise staff to authenticate with their corporate credentials
        and receive appropriate FM access permissions based on AD group
        membership.
        """
        provider_details = {}
        if config.metadata_url:
            provider_details["MetadataURL"] = config.metadata_url
        elif config.metadata_file:
            provider_details["MetadataFile"] = config.metadata_file

        provider_details["IDPSignout"] = str(config.idp_sign_out).lower()

        response = self.cognito_client.create_identity_provider(
            UserPoolId=pool_id,
            ProviderName=config.provider_name,
            ProviderType="SAML",
            ProviderDetails=provider_details,
            AttributeMapping=config.attribute_mapping,
            IdpIdentifiers=[config.provider_name.lower()],
        )

        logger.info(
            f"Configured SAML provider '{config.provider_name}' for pool {pool_id}"
        )
        return response

    def configure_oidc_provider(
        self, pool_id: str, config: OIDCProviderConfig
    ) -> Dict[str, Any]:
        """
        Register an OIDC identity provider for social or external login.

        Supports Google, LINE (popular in Japan for Manga users), and
        other custom OIDC providers.
        """
        response = self.cognito_client.create_identity_provider(
            UserPoolId=pool_id,
            ProviderName=config.provider_name,
            ProviderType="OIDC",
            ProviderDetails={
                "client_id": config.client_id,
                "client_secret": config.client_secret,
                "oidc_issuer": config.issuer_url,
                "authorize_scopes": config.authorize_scopes,
                "attributes_request_method": "GET",
            },
            AttributeMapping=config.attribute_mapping,
            IdpIdentifiers=[config.provider_name.lower()],
        )

        logger.info(
            f"Configured OIDC provider '{config.provider_name}' for pool {pool_id}"
        )
        return response

    def create_app_client(
        self, pool_id: str, client_name: str = "MangaAssistWebApp"
    ) -> Dict[str, Any]:
        """
        Create a Cognito App Client with appropriate OAuth flows.

        Configures the app client for the MangaAssist web application
        with PKCE-based authorization code flow (no client secret for
        public clients like SPAs).
        """
        response = self.cognito_client.create_user_pool_client(
            UserPoolId=pool_id,
            ClientName=client_name,
            GenerateSecret=False,
            ExplicitAuthFlows=[
                "ALLOW_USER_SRP_AUTH",
                "ALLOW_REFRESH_TOKEN_AUTH",
            ],
            SupportedIdentityProviders=["COGNITO", "CorporateAD", "LINE"],
            AllowedOAuthFlows=["code"],
            AllowedOAuthScopes=["openid", "email", "profile"],
            AllowedOAuthFlowsUserPoolClient=True,
            CallbackURLs=[
                "https://manga-assist.example.com/callback",
                "https://manga-assist.example.com/admin/callback",
            ],
            LogoutURLs=[
                "https://manga-assist.example.com/logout",
            ],
            TokenValidityUnits={
                "AccessToken": "minutes",
                "IdToken": "minutes",
                "RefreshToken": "days",
            },
            AccessTokenValidity=60,
            IdTokenValidity=60,
            RefreshTokenValidity=7,
            PreventUserExistenceErrors="ENABLED",
        )

        logger.info(f"Created app client '{client_name}' for pool {pool_id}")
        return {
            "client_id": response["UserPoolClient"]["ClientId"],
            "client_name": client_name,
        }

    def create_identity_pool(
        self,
        user_pool_id: str,
        app_client_id: str,
        identity_pool_name: str = "MangaAssistIdentityPool",
    ) -> Dict[str, Any]:
        """
        Create a Cognito Identity Pool for federated AWS credential exchange.

        Maps authenticated users from the User Pool into IAM roles
        based on their custom:role attribute, enabling RBAC for
        Bedrock FM access.
        """
        identity_client = boto3.client(
            "cognito-identity", region_name=self.region
        )

        provider_name = (
            f"cognito-idp.{self.region}.amazonaws.com/{user_pool_id}"
        )

        response = identity_client.create_identity_pool(
            IdentityPoolName=identity_pool_name,
            AllowUnauthenticatedIdentities=False,
            CognitoIdentityProviders=[
                {
                    "ProviderName": provider_name,
                    "ClientId": app_client_id,
                    "ServerSideTokenCheck": True,
                },
            ],
            IdentityPoolTags={
                "Project": "MangaAssist",
                "SecurityLevel": "High",
            },
        )

        identity_pool_id = response["IdentityPoolId"]
        logger.info(f"Created Identity Pool: {identity_pool_id}")

        return {"identity_pool_id": identity_pool_id}

    def configure_role_mapping(
        self,
        identity_pool_id: str,
        user_pool_id: str,
        app_client_id: str,
        admin_role_arn: str,
        developer_role_arn: str,
        readonly_role_arn: str,
        default_role_arn: str,
    ) -> None:
        """
        Configure role mapping rules for the Identity Pool.

        Maps the custom:role attribute from Cognito tokens to
        specific IAM roles with varying levels of Bedrock access.

        This is the critical bridge between enterprise identities and
        AWS IAM — it determines which Bedrock models each user can invoke.
        """
        identity_client = boto3.client(
            "cognito-identity", region_name=self.region
        )

        provider_name = (
            f"cognito-idp.{self.region}.amazonaws.com/"
            f"{user_pool_id}:{app_client_id}"
        )

        identity_client.set_identity_pool_roles(
            IdentityPoolId=identity_pool_id,
            Roles={
                "authenticated": default_role_arn,
            },
            RoleMappings={
                provider_name: {
                    "Type": "Rules",
                    "AmbiguousRoleResolution": "Deny",
                    "RulesConfiguration": {
                        "Rules": [
                            {
                                "Claim": "custom:role",
                                "MatchType": "Equals",
                                "Value": "admin",
                                "RoleARN": admin_role_arn,
                            },
                            {
                                "Claim": "custom:role",
                                "MatchType": "Equals",
                                "Value": "developer",
                                "RoleARN": developer_role_arn,
                            },
                            {
                                "Claim": "custom:role",
                                "MatchType": "Equals",
                                "Value": "readonly",
                                "RoleARN": readonly_role_arn,
                            },
                        ]
                    },
                }
            },
        )

        logger.info(
            f"Configured role mapping for Identity Pool {identity_pool_id}"
        )

3.4 Identity Federation Sequence

sequenceDiagram
    participant User as Manga Store User
    participant App as MangaAssist Frontend
    participant Cognito as Cognito User Pool
    participant IdP as Enterprise IdP (AD)
    participant IdentityPool as Cognito Identity Pool
    participant STS as AWS STS
    participant ECS as ECS Fargate
    participant Bedrock as Amazon Bedrock

    User->>App: Open chat interface
    App->>Cognito: Redirect to hosted UI
    alt Enterprise User (SAML)
        Cognito->>IdP: SAML AuthnRequest
        IdP->>User: Corporate login page
        User->>IdP: Enter AD credentials + MFA
        IdP->>Cognito: SAML Response (assertions with role claim)
    else Social User (OIDC / LINE)
        Cognito->>User: Show login options
        User->>Cognito: Authenticate (email or LINE)
    end
    Cognito->>App: Authorization Code (PKCE)
    App->>Cognito: Exchange code for tokens
    Cognito->>App: ID Token + Access Token + Refresh Token
    App->>IdentityPool: GetId (ID Token)
    IdentityPool->>STS: AssumeRoleWithWebIdentity (role mapping)
    STS->>IdentityPool: Temporary credentials (scoped by role)
    IdentityPool->>App: AWS Credentials (time-limited)
    App->>ECS: WebSocket connect (JWT in Authorization header)
    ECS->>ECS: Validate JWT, extract role claims
    ECS->>Bedrock: InvokeModel (using task role with least privilege)
    Bedrock->>ECS: Model response
    ECS->>App: Chat response via WebSocket

4. RBAC Design for Model Access

4.1 Role Hierarchy

MangaAssist defines four primary roles for FM access, each with progressively restricted permissions:

Role Description Bedrock Access Data Access Use Case
Admin Full platform administration All models, all actions Full read/write all stores Platform team leads
Developer Build and test integrations Invoke Sonnet + Haiku, list models Read/write dev data, read prod Engineering team
Operator Monitor and manage production Invoke Haiku only, read metrics Read-only all stores SRE / operations
ReadOnly View dashboards and logs No invoke, list/get only Read-only sessions Business analysts

4.2 Per-Model Access Matrix

                       Claude 3 Sonnet      Claude 3 Haiku       Custom Fine-tuned
Admin                  Invoke/Manage        Invoke/Manage        Invoke/Manage
Developer              Invoke               Invoke               Invoke (staging only)
Operator               ---                  Invoke               ---
ReadOnly               ---                  ---                  ---

4.3 Cost Implications of Per-Model RBAC

At MangaAssist scale (1M messages/day), the model choice has massive cost impact:

Model Cost per 1M input tokens Cost per 1M output tokens Est. daily cost at 1M msgs
Claude 3 Sonnet $3.00 $15.00 ~$4,500
Claude 3 Haiku $0.25 $1.25 ~$375

RBAC ensures operators and automated systems use Haiku (cost-effective), while only developers in staging can invoke Sonnet for testing complex interactions. This alone saves roughly $4,125/day compared to unrestricted Sonnet access.

4.4 RBAC Policy Engine

"""
rbac_policy_engine.py
Role-Based Access Control engine for MangaAssist FM access.
Evaluates access requests against role definitions, model permissions,
and data access boundaries with full audit trail support.
"""

import json
import logging
from typing import Dict, List, Optional, Set, Any
from dataclasses import dataclass, field
from enum import Enum
from datetime import datetime, timezone

logger = logging.getLogger(__name__)


class AccessLevel(Enum):
    """Defines the level of access a role has to a resource."""
    NONE = "none"
    READ = "read"
    INVOKE = "invoke"
    MANAGE = "manage"       # invoke + configure + delete
    FULL = "full"           # manage + grant/revoke permissions


class ModelTier(Enum):
    """Bedrock model tiers with cost implications."""
    PREMIUM = "premium"     # Claude 3 Sonnet ($3/$15 per 1M tokens)
    STANDARD = "standard"   # Claude 3 Haiku ($0.25/$1.25 per 1M tokens)
    CUSTOM = "custom"       # Fine-tuned models


class Environment(Enum):
    """Deployment environments with different access requirements."""
    PRODUCTION = "production"
    STAGING = "staging"
    DEVELOPMENT = "development"


@dataclass
class ModelPermission:
    """Permission definition for a specific Bedrock model."""
    model_id: str
    model_tier: ModelTier
    access_level: AccessLevel
    max_tokens_per_request: int = 4096
    max_requests_per_minute: int = 100
    allowed_environments: List[Environment] = field(
        default_factory=lambda: [Environment.PRODUCTION]
    )


@dataclass
class DataPermission:
    """Permission definition for a data resource (DynamoDB, OpenSearch, Redis)."""
    resource_type: str      # "opensearch", "dynamodb", "redis"
    resource_name: str
    access_level: AccessLevel
    allowed_operations: List[str] = field(default_factory=list)
    row_filter: Optional[Dict] = None


@dataclass
class RoleDefinition:
    """Complete role definition including model and data permissions."""
    role_name: str
    description: str
    model_permissions: List[ModelPermission] = field(default_factory=list)
    data_permissions: List[DataPermission] = field(default_factory=list)
    max_concurrent_sessions: int = 10
    session_duration_minutes: int = 60
    require_mfa: bool = True
    allowed_ip_ranges: List[str] = field(default_factory=list)
    tags: Dict[str, str] = field(default_factory=dict)


class RBACPolicyEngine:
    """
    Evaluates access control decisions for MangaAssist FM operations.

    Maintains role definitions and evaluates incoming access requests
    against them, returning allow/deny decisions with detailed reasoning
    for audit purposes.

    Role hierarchy:  Admin > Developer > Operator > ReadOnly

    Decision logic:
        1. Validate the requesting identity and extract role claims
        2. Look up the role definition
        3. Check model-level permissions (tier, access level, environment)
        4. Check data-level permissions (resource, operations)
        5. Apply rate limits and session constraints
        6. Log the decision to CloudWatch for audit
    """

    def __init__(self):
        self.roles: Dict[str, RoleDefinition] = {}
        self._initialize_default_roles()

    def _initialize_default_roles(self) -> None:
        """Set up the four default MangaAssist roles."""

        # --- Admin role: full access to all models and data ---
        self.roles["admin"] = RoleDefinition(
            role_name="admin",
            description="Full platform administration with all model and data access",
            model_permissions=[
                ModelPermission(
                    model_id="anthropic.claude-3-sonnet-20240229-v1:0",
                    model_tier=ModelTier.PREMIUM,
                    access_level=AccessLevel.FULL,
                    max_tokens_per_request=8192,
                    max_requests_per_minute=500,
                    allowed_environments=[
                        Environment.PRODUCTION,
                        Environment.STAGING,
                        Environment.DEVELOPMENT,
                    ],
                ),
                ModelPermission(
                    model_id="anthropic.claude-3-haiku-20240307-v1:0",
                    model_tier=ModelTier.STANDARD,
                    access_level=AccessLevel.FULL,
                    max_tokens_per_request=4096,
                    max_requests_per_minute=1000,
                    allowed_environments=[
                        Environment.PRODUCTION,
                        Environment.STAGING,
                        Environment.DEVELOPMENT,
                    ],
                ),
            ],
            data_permissions=[
                DataPermission(
                    resource_type="opensearch",
                    resource_name="manga-vectors",
                    access_level=AccessLevel.FULL,
                    allowed_operations=["search", "index", "delete", "admin"],
                ),
                DataPermission(
                    resource_type="dynamodb",
                    resource_name="manga-sessions",
                    access_level=AccessLevel.FULL,
                    allowed_operations=[
                        "GetItem", "PutItem", "DeleteItem", "Query", "Scan",
                    ],
                ),
                DataPermission(
                    resource_type="dynamodb",
                    resource_name="manga-products",
                    access_level=AccessLevel.FULL,
                    allowed_operations=[
                        "GetItem", "PutItem", "DeleteItem", "Query", "Scan",
                    ],
                ),
                DataPermission(
                    resource_type="redis",
                    resource_name="manga-cache",
                    access_level=AccessLevel.FULL,
                    allowed_operations=["GET", "SET", "DEL", "KEYS", "FLUSHDB"],
                ),
            ],
            max_concurrent_sessions=50,
            session_duration_minutes=480,
            require_mfa=True,
            tags={"tier": "admin", "cost_center": "platform"},
        )

        # --- Developer role: invoke models, limited data write ---
        self.roles["developer"] = RoleDefinition(
            role_name="developer",
            description="Development access with model invoke and limited data write",
            model_permissions=[
                ModelPermission(
                    model_id="anthropic.claude-3-sonnet-20240229-v1:0",
                    model_tier=ModelTier.PREMIUM,
                    access_level=AccessLevel.INVOKE,
                    max_tokens_per_request=4096,
                    max_requests_per_minute=100,
                    allowed_environments=[
                        Environment.STAGING,
                        Environment.DEVELOPMENT,
                    ],
                ),
                ModelPermission(
                    model_id="anthropic.claude-3-haiku-20240307-v1:0",
                    model_tier=ModelTier.STANDARD,
                    access_level=AccessLevel.INVOKE,
                    max_tokens_per_request=4096,
                    max_requests_per_minute=200,
                    allowed_environments=[
                        Environment.PRODUCTION,
                        Environment.STAGING,
                        Environment.DEVELOPMENT,
                    ],
                ),
            ],
            data_permissions=[
                DataPermission(
                    resource_type="opensearch",
                    resource_name="manga-vectors",
                    access_level=AccessLevel.READ,
                    allowed_operations=["search"],
                ),
                DataPermission(
                    resource_type="dynamodb",
                    resource_name="manga-sessions",
                    access_level=AccessLevel.INVOKE,
                    allowed_operations=["GetItem", "PutItem", "Query"],
                ),
                DataPermission(
                    resource_type="dynamodb",
                    resource_name="manga-products",
                    access_level=AccessLevel.READ,
                    allowed_operations=["GetItem", "Query"],
                ),
                DataPermission(
                    resource_type="redis",
                    resource_name="manga-cache",
                    access_level=AccessLevel.INVOKE,
                    allowed_operations=["GET", "SET"],
                ),
            ],
            max_concurrent_sessions=20,
            session_duration_minutes=120,
            require_mfa=True,
            tags={"tier": "developer", "cost_center": "engineering"},
        )

        # --- Operator role: Haiku invoke only, read-only data ---
        self.roles["operator"] = RoleDefinition(
            role_name="operator",
            description="Operations access with Haiku invoke and read-only data",
            model_permissions=[
                ModelPermission(
                    model_id="anthropic.claude-3-haiku-20240307-v1:0",
                    model_tier=ModelTier.STANDARD,
                    access_level=AccessLevel.INVOKE,
                    max_tokens_per_request=2048,
                    max_requests_per_minute=50,
                    allowed_environments=[Environment.PRODUCTION],
                ),
            ],
            data_permissions=[
                DataPermission(
                    resource_type="opensearch",
                    resource_name="manga-vectors",
                    access_level=AccessLevel.READ,
                    allowed_operations=["search"],
                ),
                DataPermission(
                    resource_type="dynamodb",
                    resource_name="manga-sessions",
                    access_level=AccessLevel.READ,
                    allowed_operations=["GetItem", "Query"],
                ),
                DataPermission(
                    resource_type="dynamodb",
                    resource_name="manga-products",
                    access_level=AccessLevel.READ,
                    allowed_operations=["GetItem", "Query"],
                ),
            ],
            max_concurrent_sessions=10,
            session_duration_minutes=60,
            require_mfa=True,
            tags={"tier": "operator", "cost_center": "operations"},
        )

        # --- ReadOnly role: no model invoke, read-only data ---
        self.roles["readonly"] = RoleDefinition(
            role_name="readonly",
            description="Read-only access to dashboards and session data",
            model_permissions=[],
            data_permissions=[
                DataPermission(
                    resource_type="dynamodb",
                    resource_name="manga-sessions",
                    access_level=AccessLevel.READ,
                    allowed_operations=["GetItem", "Query"],
                ),
            ],
            max_concurrent_sessions=5,
            session_duration_minutes=30,
            require_mfa=False,
            tags={"tier": "readonly", "cost_center": "analytics"},
        )

    def evaluate_model_access(
        self,
        role_name: str,
        model_id: str,
        environment: str,
        requested_tokens: int = 1024,
    ) -> Dict[str, Any]:
        """
        Evaluate whether a role can access a specific model in a given environment.

        Returns a decision object with allow/deny, reasoning, and constraints.
        """
        decision = {
            "timestamp": datetime.now(timezone.utc).isoformat(),
            "role": role_name,
            "model_id": model_id,
            "environment": environment,
            "requested_tokens": requested_tokens,
            "allowed": False,
            "reason": "",
            "constraints": {},
        }

        # Step 1: validate role exists
        role = self.roles.get(role_name)
        if not role:
            decision["reason"] = f"Unknown role: {role_name}"
            logger.warning(f"Access denied: unknown role '{role_name}'")
            return decision

        # Step 2: find matching model permission for environment
        env = Environment(environment)
        matching_permission = None
        for perm in role.model_permissions:
            if perm.model_id == model_id and env in perm.allowed_environments:
                matching_permission = perm
                break

        if not matching_permission:
            decision["reason"] = (
                f"Role '{role_name}' has no permission for model "
                f"'{model_id}' in environment '{environment}'"
            )
            logger.info(
                f"Access denied: {role_name} cannot access {model_id} "
                f"in {environment}"
            )
            return decision

        # Step 3: check access level is not NONE
        if matching_permission.access_level == AccessLevel.NONE:
            decision["reason"] = "Access level is NONE for this model"
            return decision

        # Step 4: check token limit
        if requested_tokens > matching_permission.max_tokens_per_request:
            decision["reason"] = (
                f"Requested {requested_tokens} tokens exceeds limit of "
                f"{matching_permission.max_tokens_per_request}"
            )
            return decision

        # All checks passed
        decision["allowed"] = True
        decision["reason"] = "Access granted"
        decision["constraints"] = {
            "max_tokens": matching_permission.max_tokens_per_request,
            "max_rpm": matching_permission.max_requests_per_minute,
            "access_level": matching_permission.access_level.value,
            "model_tier": matching_permission.model_tier.value,
        }

        logger.info(
            f"Access granted: {role_name} -> {model_id} in {environment}"
        )
        return decision

    def evaluate_data_access(
        self,
        role_name: str,
        resource_type: str,
        resource_name: str,
        operation: str,
    ) -> Dict[str, Any]:
        """
        Evaluate whether a role can perform an operation on a data resource.

        Checks the role's data permissions for a matching resource type,
        resource name, and operation. Returns a decision with reasoning.
        """
        decision = {
            "timestamp": datetime.now(timezone.utc).isoformat(),
            "role": role_name,
            "resource_type": resource_type,
            "resource_name": resource_name,
            "operation": operation,
            "allowed": False,
            "reason": "",
        }

        role = self.roles.get(role_name)
        if not role:
            decision["reason"] = f"Unknown role: {role_name}"
            return decision

        for perm in role.data_permissions:
            if (
                perm.resource_type == resource_type
                and perm.resource_name == resource_name
                and operation in perm.allowed_operations
            ):
                decision["allowed"] = True
                decision["reason"] = "Access granted"
                return decision

        decision["reason"] = (
            f"Role '{role_name}' has no permission for "
            f"{operation} on {resource_type}/{resource_name}"
        )
        return decision

    def get_role_summary(self, role_name: str) -> Optional[Dict[str, Any]]:
        """Return a human-readable summary of a role's permissions."""
        role = self.roles.get(role_name)
        if not role:
            return None

        return {
            "role_name": role.role_name,
            "description": role.description,
            "models": [
                {
                    "model_id": p.model_id,
                    "tier": p.model_tier.value,
                    "access": p.access_level.value,
                    "environments": [e.value for e in p.allowed_environments],
                }
                for p in role.model_permissions
            ],
            "data_resources": [
                {
                    "type": p.resource_type,
                    "name": p.resource_name,
                    "access": p.access_level.value,
                    "operations": p.allowed_operations,
                }
                for p in role.data_permissions
            ],
            "session_limits": {
                "max_concurrent": role.max_concurrent_sessions,
                "duration_minutes": role.session_duration_minutes,
                "mfa_required": role.require_mfa,
            },
        }

5. Least Privilege IAM Policies for Bedrock API Calls

5.1 ECS Task Role — Production (Haiku Only)

This policy demonstrates strict least-privilege: the production ECS task can only invoke Claude 3 Haiku (the cost-effective model) and cannot list, create, or delete any Bedrock resources.

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Sid": "AllowBedrockHaikuInvoke",
            "Effect": "Allow",
            "Action": [
                "bedrock:InvokeModel",
                "bedrock:InvokeModelWithResponseStream"
            ],
            "Resource": [
                "arn:aws:bedrock:ap-northeast-1::foundation-model/anthropic.claude-3-haiku-20240307-v1:0"
            ],
            "Condition": {
                "StringEquals": {
                    "aws:RequestedRegion": "ap-northeast-1"
                }
            }
        },
        {
            "Sid": "AllowBedrockGuardrails",
            "Effect": "Allow",
            "Action": [
                "bedrock:ApplyGuardrail"
            ],
            "Resource": [
                "arn:aws:bedrock:ap-northeast-1:123456789012:guardrail/manga-content-filter"
            ]
        },
        {
            "Sid": "DenyDestructiveBedrockActions",
            "Effect": "Deny",
            "Action": [
                "bedrock:CreateModelCustomizationJob",
                "bedrock:DeleteModelInvocationLoggingConfiguration",
                "bedrock:DeleteCustomModel",
                "bedrock:CreateProvisionedModelThroughput",
                "bedrock:DeleteProvisionedModelThroughput",
                "bedrock:UpdateProvisionedModelThroughput"
            ],
            "Resource": "*"
        }
    ]
}

5.2 ECS Task Role — Staging (Sonnet + Haiku)

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Sid": "AllowBedrockInvokeStaging",
            "Effect": "Allow",
            "Action": [
                "bedrock:InvokeModel",
                "bedrock:InvokeModelWithResponseStream"
            ],
            "Resource": [
                "arn:aws:bedrock:ap-northeast-1::foundation-model/anthropic.claude-3-sonnet-20240229-v1:0",
                "arn:aws:bedrock:ap-northeast-1::foundation-model/anthropic.claude-3-haiku-20240307-v1:0"
            ],
            "Condition": {
                "StringEquals": {
                    "aws:PrincipalTag/Environment": "staging"
                }
            }
        },
        {
            "Sid": "AllowBedrockListModels",
            "Effect": "Allow",
            "Action": [
                "bedrock:ListFoundationModels",
                "bedrock:GetFoundationModel"
            ],
            "Resource": "*"
        }
    ]
}

5.3 Data Layer Least Privilege Policies

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Sid": "AllowDynamoDBSessionAccess",
            "Effect": "Allow",
            "Action": [
                "dynamodb:GetItem",
                "dynamodb:PutItem",
                "dynamodb:Query"
            ],
            "Resource": [
                "arn:aws:dynamodb:ap-northeast-1:123456789012:table/manga-sessions",
                "arn:aws:dynamodb:ap-northeast-1:123456789012:table/manga-sessions/index/*"
            ],
            "Condition": {
                "ForAllValues:StringEquals": {
                    "dynamodb:LeadingKeys": [
                        "${cognito-identity.amazonaws.com:sub}"
                    ]
                }
            }
        },
        {
            "Sid": "AllowDynamoDBProductReadOnly",
            "Effect": "Allow",
            "Action": [
                "dynamodb:GetItem",
                "dynamodb:Query"
            ],
            "Resource": [
                "arn:aws:dynamodb:ap-northeast-1:123456789012:table/manga-products",
                "arn:aws:dynamodb:ap-northeast-1:123456789012:table/manga-products/index/*"
            ]
        },
        {
            "Sid": "AllowOpenSearchServerlessVectorAccess",
            "Effect": "Allow",
            "Action": [
                "aoss:APIAccessAll"
            ],
            "Resource": [
                "arn:aws:aoss:ap-northeast-1:123456789012:collection/manga-vectors"
            ]
        },
        {
            "Sid": "DenyDynamoDBTableManagement",
            "Effect": "Deny",
            "Action": [
                "dynamodb:CreateTable",
                "dynamodb:DeleteTable",
                "dynamodb:UpdateTable"
            ],
            "Resource": "*"
        }
    ]
}

6. FM Access Controller

"""
fm_access_controller.py
Central access controller for MangaAssist Foundation Model requests.
Combines identity validation, RBAC evaluation, rate limiting, and
audit logging into a single enforcement point — the gateway through
which every FM call must pass.
"""

import json
import time
import hashlib
import logging
import boto3
from typing import Dict, Optional, Any, Tuple
from dataclasses import dataclass
from datetime import datetime, timezone
from functools import lru_cache

logger = logging.getLogger(__name__)


@dataclass
class FMRequest:
    """Represents a request to invoke a Foundation Model."""
    request_id: str
    user_id: str
    role: str
    model_id: str
    environment: str
    input_tokens: int
    max_output_tokens: int
    source_ip: str
    session_id: str
    timestamp: datetime = None

    def __post_init__(self):
        if self.timestamp is None:
            self.timestamp = datetime.now(timezone.utc)


@dataclass
class FMAccessDecision:
    """Result of an access control evaluation."""
    allowed: bool
    reason: str
    request_id: str
    evaluated_at: datetime
    constraints: Dict[str, Any] = None
    audit_trail: Dict[str, Any] = None


class FMAccessController:
    """
    Central enforcement point for all Foundation Model access in MangaAssist.

    Implements defense-in-depth by layering multiple security checks:
    1. Identity validation  -- verify the JWT token is valid and not expired
    2. Role extraction      -- map the authenticated identity to an RBAC role
    3. Model permission     -- verify the role can access the requested model
    4. Rate limit check     -- ensure the request does not exceed per-role limits
    5. Cost guard           -- block requests that would exceed budget thresholds
    6. Audit logging        -- record every decision for compliance review

    Usage:
        controller = FMAccessController(rbac_engine, rate_limiter)
        decision = controller.evaluate(fm_request)
        if decision.allowed:
            response = bedrock_client.invoke_model(...)
        else:
            raise AccessDeniedException(decision.reason)
    """

    # Cost per 1M tokens (input/output) by model
    MODEL_COSTS = {
        "anthropic.claude-3-sonnet-20240229-v1:0": {
            "input": 3.00,
            "output": 15.00,
        },
        "anthropic.claude-3-haiku-20240307-v1:0": {
            "input": 0.25,
            "output": 1.25,
        },
    }

    # Daily budget limits per role (in USD)
    ROLE_BUDGET_LIMITS = {
        "admin": 500.00,
        "developer": 50.00,
        "operator": 20.00,
        "readonly": 0.00,
    }

    def __init__(self, rbac_engine, rate_limiter=None, redis_client=None):
        self.rbac_engine = rbac_engine
        self.rate_limiter = rate_limiter
        self.redis_client = redis_client
        self.cloudwatch = boto3.client(
            "cloudwatch", region_name="ap-northeast-1"
        )
        self.cloudtrail_logs = boto3.client(
            "logs", region_name="ap-northeast-1"
        )

    def evaluate(self, request: FMRequest) -> FMAccessDecision:
        """
        Evaluate an FM access request through all security layers.

        Returns an FMAccessDecision with the result and full audit trail.
        """
        audit_trail = {
            "request_id": request.request_id,
            "user_id": request.user_id,
            "role": request.role,
            "model_id": request.model_id,
            "environment": request.environment,
            "source_ip": request.source_ip,
            "checks": [],
        }

        # Layer 1: Role validation
        role_check = self._validate_role(request)
        audit_trail["checks"].append(role_check)
        if not role_check["passed"]:
            return self._deny(request, role_check["reason"], audit_trail)

        # Layer 2: Model permission check (RBAC)
        model_check = self._check_model_permission(request)
        audit_trail["checks"].append(model_check)
        if not model_check["passed"]:
            return self._deny(request, model_check["reason"], audit_trail)

        # Layer 3: Rate limit check (Redis sliding window)
        rate_check = self._check_rate_limit(request)
        audit_trail["checks"].append(rate_check)
        if not rate_check["passed"]:
            return self._deny(request, rate_check["reason"], audit_trail)

        # Layer 4: Cost guard check (daily budget)
        cost_check = self._check_cost_guard(request)
        audit_trail["checks"].append(cost_check)
        if not cost_check["passed"]:
            return self._deny(request, cost_check["reason"], audit_trail)

        # All checks passed
        decision = FMAccessDecision(
            allowed=True,
            reason="All security checks passed",
            request_id=request.request_id,
            evaluated_at=datetime.now(timezone.utc),
            constraints=model_check.get("constraints", {}),
            audit_trail=audit_trail,
        )

        self._log_decision(decision)
        self._emit_metrics(decision, request)
        return decision

    def _validate_role(self, request: FMRequest) -> Dict[str, Any]:
        """Validate that the role claim is recognized and active."""
        check = {"name": "role_validation", "passed": False, "reason": ""}

        valid_roles = self.rbac_engine.roles.keys()
        if request.role not in valid_roles:
            check["reason"] = f"Unrecognized role: {request.role}"
            return check

        check["passed"] = True
        check["reason"] = f"Role '{request.role}' is valid"
        return check

    def _check_model_permission(self, request: FMRequest) -> Dict[str, Any]:
        """Check RBAC permissions for the requested model and environment."""
        check = {
            "name": "model_permission",
            "passed": False,
            "reason": "",
            "constraints": {},
        }

        rbac_decision = self.rbac_engine.evaluate_model_access(
            role_name=request.role,
            model_id=request.model_id,
            environment=request.environment,
            requested_tokens=request.max_output_tokens,
        )

        check["passed"] = rbac_decision["allowed"]
        check["reason"] = rbac_decision["reason"]
        check["constraints"] = rbac_decision.get("constraints", {})
        return check

    def _check_rate_limit(self, request: FMRequest) -> Dict[str, Any]:
        """Check per-role rate limits using Redis sliding window counter."""
        check = {
            "name": "rate_limit",
            "passed": True,
            "reason": "Rate limit check passed",
        }

        if not self.redis_client:
            return check

        # Sliding window counter keyed by role + model + minute
        window_key = (
            f"rate:{request.role}:{request.model_id}:"
            f"{int(time.time()) // 60}"
        )
        current_count = self.redis_client.incr(window_key)
        if current_count == 1:
            self.redis_client.expire(window_key, 60)

        # Look up the RPM limit from the role definition
        role_def = self.rbac_engine.roles.get(request.role)
        if role_def:
            for perm in role_def.model_permissions:
                if perm.model_id == request.model_id:
                    if current_count > perm.max_requests_per_minute:
                        check["passed"] = False
                        check["reason"] = (
                            f"Rate limit exceeded: {current_count}/"
                            f"{perm.max_requests_per_minute} RPM for "
                            f"role '{request.role}'"
                        )
                    break

        return check

    def _check_cost_guard(self, request: FMRequest) -> Dict[str, Any]:
        """Check that the request will not exceed daily budget limits."""
        check = {
            "name": "cost_guard",
            "passed": True,
            "reason": "Cost guard check passed",
        }

        budget_limit = self.ROLE_BUDGET_LIMITS.get(request.role, 0.0)
        if budget_limit == 0.0 and request.role == "readonly":
            check["passed"] = False
            check["reason"] = "ReadOnly role has no FM invocation budget"
            return check

        model_cost = self.MODEL_COSTS.get(request.model_id)
        if not model_cost:
            return check  # Unknown model; let IAM handle it

        estimated_cost = (
            (request.input_tokens / 1_000_000) * model_cost["input"]
            + (request.max_output_tokens / 1_000_000) * model_cost["output"]
        )

        if self.redis_client:
            today = datetime.now(timezone.utc).strftime("%Y-%m-%d")
            cost_key = f"cost:{request.role}:{today}"
            current_spend = float(self.redis_client.get(cost_key) or 0)

            if current_spend + estimated_cost > budget_limit:
                check["passed"] = False
                check["reason"] = (
                    f"Daily budget exceeded: ${current_spend:.2f} + "
                    f"${estimated_cost:.4f} > ${budget_limit:.2f} limit"
                )
                return check

            # Track cumulative spend
            self.redis_client.incrbyfloat(cost_key, estimated_cost)
            self.redis_client.expire(cost_key, 86400)

        return check

    def _deny(
        self,
        request: FMRequest,
        reason: str,
        audit_trail: Dict,
    ) -> FMAccessDecision:
        """Create a denial decision and log it."""
        decision = FMAccessDecision(
            allowed=False,
            reason=reason,
            request_id=request.request_id,
            evaluated_at=datetime.now(timezone.utc),
            audit_trail=audit_trail,
        )
        self._log_decision(decision)
        self._emit_metrics(decision, request)
        return decision

    def _log_decision(self, decision: FMAccessDecision) -> None:
        """Log the access decision to CloudWatch Logs for audit."""
        log_entry = {
            "request_id": decision.request_id,
            "allowed": decision.allowed,
            "reason": decision.reason,
            "evaluated_at": decision.evaluated_at.isoformat(),
            "audit_trail": decision.audit_trail,
        }
        logger.info(f"FM Access Decision: {json.dumps(log_entry)}")

    def _emit_metrics(
        self, decision: FMAccessDecision, request: FMRequest
    ) -> None:
        """Emit CloudWatch metrics for access control decisions."""
        try:
            self.cloudwatch.put_metric_data(
                Namespace="MangaAssist/Security",
                MetricData=[
                    {
                        "MetricName": "FMAccessDecision",
                        "Dimensions": [
                            {"Name": "Role", "Value": request.role},
                            {"Name": "Model", "Value": request.model_id},
                            {
                                "Name": "Decision",
                                "Value": (
                                    "Allow" if decision.allowed else "Deny"
                                ),
                            },
                        ],
                        "Value": 1,
                        "Unit": "Count",
                        "Timestamp": decision.evaluated_at,
                    },
                ],
            )
        except Exception as e:
            logger.error(f"Failed to emit security metric: {e}")

7. Identity Federation Manager

"""
identity_federation_manager.py
Manages the complete identity federation lifecycle for MangaAssist,
including token validation, credential exchange, and session management.
"""

import json
import time
import logging
import boto3
from typing import Dict, Optional, Any
from dataclasses import dataclass
from datetime import datetime, timezone, timedelta

logger = logging.getLogger(__name__)


@dataclass
class FederatedIdentity:
    """Represents a validated federated identity with role claims."""
    user_id: str
    email: str
    role: str
    department: str
    model_access_tier: str
    identity_provider: str
    session_expiry: datetime
    mfa_verified: bool
    groups: list


class AuthenticationError(Exception):
    """Raised when authentication or token validation fails."""
    pass


class IdentityFederationManager:
    """
    Manages identity federation between enterprise identity providers
    and AWS services for MangaAssist FM access.

    Responsibilities:
    - Validate JWT tokens from Cognito
    - Extract role claims and map to RBAC roles
    - Exchange tokens for temporary AWS credentials
    - Manage federated session lifecycle
    - Handle token refresh and session extension

    Federation flow:
        Enterprise IdP -> Cognito User Pool -> Cognito Identity Pool
        -> STS AssumeRoleWithWebIdentity -> Scoped AWS Credentials
    """

    # Maps enterprise group names to MangaAssist RBAC roles
    ROLE_CLAIM_MAPPING = {
        "platform-admin": "admin",
        "engineering": "developer",
        "sre-team": "operator",
        "analytics": "readonly",
        "manga-ops": "operator",
    }

    # Maps RBAC roles to model access tiers
    MODEL_TIER_MAPPING = {
        "admin": "premium",
        "developer": "standard",
        "operator": "standard",
        "readonly": "none",
    }

    def __init__(
        self,
        user_pool_id: str,
        identity_pool_id: str,
        region: str = "ap-northeast-1",
    ):
        self.user_pool_id = user_pool_id
        self.identity_pool_id = identity_pool_id
        self.region = region
        self.cognito_client = boto3.client("cognito-idp", region_name=region)
        self.identity_client = boto3.client(
            "cognito-identity", region_name=region
        )
        self.sts_client = boto3.client("sts", region_name=region)

        self._jwks_cache = None
        self._jwks_cache_time = 0

    def validate_token(self, id_token: str) -> FederatedIdentity:
        """
        Validate a Cognito ID token and extract federated identity claims.

        Performs:
        1. JWT signature verification against Cognito JWKS
        2. Token expiration check
        3. Audience (client_id) validation
        4. Issuer validation
        5. Role claim extraction and mapping
        """
        try:
            import jwt as pyjwt

            # Decode header to get key ID
            unverified_header = pyjwt.get_unverified_header(id_token)
            kid = unverified_header.get("kid")

            # Retrieve signing key from Cognito JWKS
            signing_key = self._get_signing_key(kid)

            issuer = (
                f"https://cognito-idp.{self.region}.amazonaws.com/"
                f"{self.user_pool_id}"
            )

            # Full verification with signature, expiry, and issuer
            claims = pyjwt.decode(
                id_token,
                signing_key,
                algorithms=["RS256"],
                issuer=issuer,
                options={
                    "verify_exp": True,
                    "verify_aud": False,  # Cognito uses client_id
                    "verify_iss": True,
                },
            )

            # Map custom:role to RBAC role, with group-based override
            raw_role = claims.get("custom:role", "")
            mapped_role = self.ROLE_CLAIM_MAPPING.get(raw_role, "readonly")

            groups = claims.get("cognito:groups", [])
            for group in groups:
                if group in self.ROLE_CLAIM_MAPPING:
                    candidate = self.ROLE_CLAIM_MAPPING[group]
                    if self._role_priority(candidate) > self._role_priority(
                        mapped_role
                    ):
                        mapped_role = candidate

            identity = FederatedIdentity(
                user_id=claims.get("sub", ""),
                email=claims.get("email", ""),
                role=mapped_role,
                department=claims.get("custom:department", "unknown"),
                model_access_tier=self.MODEL_TIER_MAPPING.get(
                    mapped_role, "none"
                ),
                identity_provider=(
                    claims.get("identities", [{}])[0].get(
                        "providerName", "Cognito"
                    )
                    if claims.get("identities")
                    else "Cognito"
                ),
                session_expiry=datetime.fromtimestamp(
                    claims.get("exp", 0), tz=timezone.utc
                ),
                mfa_verified="mfa" in claims.get("amr", []),
                groups=groups,
            )

            logger.info(
                f"Validated identity: user={identity.user_id}, "
                f"role={identity.role}, provider={identity.identity_provider}"
            )
            return identity

        except Exception as e:
            if "ExpiredSignature" in str(type(e).__name__):
                logger.warning("Token validation failed: token expired")
                raise AuthenticationError("Token has expired")
            logger.warning(f"Token validation failed: {e}")
            raise AuthenticationError(f"Invalid token: {e}")

    def exchange_for_credentials(
        self, id_token: str, identity: FederatedIdentity
    ) -> Dict[str, Any]:
        """
        Exchange a validated ID token for temporary AWS credentials
        scoped to the user's RBAC role.

        The Cognito Identity Pool role mapping rules automatically
        select the correct IAM role based on the custom:role claim.
        """
        provider_key = (
            f"cognito-idp.{self.region}.amazonaws.com/{self.user_pool_id}"
        )

        # Get identity ID from the Identity Pool
        identity_response = self.identity_client.get_id(
            IdentityPoolId=self.identity_pool_id,
            Logins={provider_key: id_token},
        )

        identity_id = identity_response["IdentityId"]

        # Get temporary credentials (role mapping applies automatically)
        credentials_response = (
            self.identity_client.get_credentials_for_identity(
                IdentityId=identity_id,
                Logins={provider_key: id_token},
            )
        )

        credentials = credentials_response["Credentials"]

        logger.info(
            f"Issued temporary credentials for {identity.user_id} "
            f"(role={identity.role}, expires={credentials['Expiration']})"
        )

        return {
            "access_key_id": credentials["AccessKeyId"],
            "secret_access_key": credentials["SecretKey"],
            "session_token": credentials["SessionToken"],
            "expiration": credentials["Expiration"].isoformat(),
            "identity_id": identity_id,
            "role": identity.role,
        }

    def refresh_session(
        self, refresh_token: str, client_id: str
    ) -> Dict[str, str]:
        """
        Refresh an expired session using the Cognito refresh token.

        Returns new ID and access tokens without requiring re-authentication.
        Refresh tokens are valid for 7 days (configured in the app client).
        """
        try:
            response = self.cognito_client.initiate_auth(
                AuthFlow="REFRESH_TOKEN_AUTH",
                AuthParameters={"REFRESH_TOKEN": refresh_token},
                ClientId=client_id,
            )

            result = response["AuthenticationResult"]
            logger.info("Session refreshed successfully")

            return {
                "id_token": result["IdToken"],
                "access_token": result["AccessToken"],
                "token_type": result["TokenType"],
                "expires_in": result["ExpiresIn"],
            }

        except self.cognito_client.exceptions.NotAuthorizedException:
            logger.warning("Refresh token is invalid or expired")
            raise AuthenticationError(
                "Refresh token expired -- re-authentication required"
            )

    def _get_signing_key(self, kid: str):
        """
        Retrieve the JWT signing key from Cognito JWKS endpoint.
        Caches the JWKS for 1 hour to avoid repeated network calls.
        """
        import urllib.request

        if (
            self._jwks_cache is None
            or time.time() - self._jwks_cache_time > 3600
        ):
            jwks_url = (
                f"https://cognito-idp.{self.region}.amazonaws.com/"
                f"{self.user_pool_id}/.well-known/jwks.json"
            )
            with urllib.request.urlopen(jwks_url) as response:
                self._jwks_cache = json.loads(response.read())
                self._jwks_cache_time = time.time()

        for key in self._jwks_cache.get("keys", []):
            if key["kid"] == kid:
                from jwt.algorithms import RSAAlgorithm
                return RSAAlgorithm.from_jwk(json.dumps(key))

        raise AuthenticationError(f"Signing key not found for kid: {kid}")

    @staticmethod
    def _role_priority(role: str) -> int:
        """Return numeric priority for role comparison (higher = more privileged)."""
        priorities = {
            "readonly": 0,
            "operator": 1,
            "developer": 2,
            "admin": 3,
        }
        return priorities.get(role, -1)

8. VPC Endpoint and Network Security Configuration

8.1 Bedrock VPC Endpoint

To ensure FM API calls never traverse the public internet, MangaAssist uses VPC endpoints (AWS PrivateLink) for all Bedrock traffic.

{
    "VPCEndpoint": {
        "ServiceName": "com.amazonaws.ap-northeast-1.bedrock-runtime",
        "VpcId": "vpc-manga-prod-01",
        "SubnetIds": [
            "subnet-private-az1",
            "subnet-private-az2"
        ],
        "SecurityGroupIds": [
            "sg-bedrock-vpce"
        ],
        "PrivateDnsEnabled": true,
        "PolicyDocument": {
            "Version": "2012-10-17",
            "Statement": [
                {
                    "Sid": "AllowECSTasksOnly",
                    "Effect": "Allow",
                    "Principal": {
                        "AWS": "arn:aws:iam::123456789012:role/manga-ecs-task-role"
                    },
                    "Action": [
                        "bedrock:InvokeModel",
                        "bedrock:InvokeModelWithResponseStream"
                    ],
                    "Resource": [
                        "arn:aws:bedrock:ap-northeast-1::foundation-model/anthropic.claude-3-haiku-20240307-v1:0",
                        "arn:aws:bedrock:ap-northeast-1::foundation-model/anthropic.claude-3-sonnet-20240229-v1:0"
                    ]
                }
            ]
        }
    }
}

8.2 Security Group for Bedrock VPC Endpoint

{
    "SecurityGroup": {
        "GroupId": "sg-bedrock-vpce",
        "GroupName": "manga-bedrock-vpce-sg",
        "Description": "Security group for Bedrock VPC Endpoint -- allows only ECS tasks",
        "IngressRules": [
            {
                "Protocol": "tcp",
                "FromPort": 443,
                "ToPort": 443,
                "SourceSecurityGroup": "sg-manga-ecs-tasks",
                "Description": "Allow HTTPS from ECS Fargate tasks only"
            }
        ],
        "EgressRules": []
    }
}

9. API Gateway Security Configuration

9.1 WebSocket API with Lambda Authorizer and WAF

{
    "ApiGateway": {
        "Name": "MangaAssistWebSocketAPI",
        "ProtocolType": "WEBSOCKET",
        "RouteSelectionExpression": "$request.body.action",
        "Authorizer": {
            "AuthorizerType": "REQUEST",
            "AuthorizerUri": "arn:aws:lambda:ap-northeast-1:123456789012:function:manga-ws-authorizer",
            "IdentitySource": "route.request.header.Authorization",
            "AuthorizerResultTtlInSeconds": 300
        },
        "WAFAssociation": {
            "WebACLArn": "arn:aws:wafv2:ap-northeast-1:123456789012:regional/webacl/manga-api-waf",
            "Rules": [
                {
                    "Name": "RateLimit",
                    "Priority": 1,
                    "Statement": {
                        "RateBasedStatement": {
                            "Limit": 2000,
                            "AggregateKeyType": "IP"
                        }
                    },
                    "Action": "Block"
                },
                {
                    "Name": "GeoRestriction",
                    "Priority": 2,
                    "Statement": {
                        "GeoMatchStatement": {
                            "CountryCodes": ["JP", "US"]
                        }
                    },
                    "Action": "Allow"
                },
                {
                    "Name": "SQLInjectionProtection",
                    "Priority": 3,
                    "Statement": {
                        "SqliMatchStatement": {
                            "FieldToMatch": {"Body": {}},
                            "TextTransformations": [
                                {"Priority": 0, "Type": "URL_DECODE"}
                            ]
                        }
                    },
                    "Action": "Block"
                }
            ]
        },
        "ThrottlingSettings": {
            "BurstLimit": 500,
            "RateLimit": 1000
        },
        "Logging": {
            "AccessLogDestination": "arn:aws:logs:ap-northeast-1:123456789012:log-group:/aws/apigateway/manga-websocket",
            "Format": "$context.identity.sourceIp $context.identity.caller $context.requestTime $context.routeKey $context.status $context.requestId"
        }
    }
}

10. Security Layer Summary

Security Layer Component Purpose MangaAssist Implementation
Identity Cognito User Pool Authenticate users SAML (AD), OIDC (LINE), email/password
Federation Cognito Identity Pool Map to AWS credentials Role-based credential exchange via STS
Authorization RBAC Policy Engine Enforce model permissions 4-tier role hierarchy (admin/dev/ops/readonly)
Least Privilege IAM Policies Restrict API actions Per-model, per-environment policies
Network VPC Endpoints Private connectivity PrivateLink for Bedrock, OpenSearch, DynamoDB
API Security API Gateway + WAF Protect entry points Rate limiting, geo-restriction, SQLi protection
Encryption KMS + TLS 1.3 Protect data At-rest and in-transit encryption
Monitoring CloudTrail + GuardDuty Detect threats Centralized audit logging
Compliance Security Hub Continuous compliance Automated security posture assessment
Cost Control FM Access Controller Prevent budget overrun Per-role daily budget limits with Redis tracking

Key Takeaways

  1. Identity Federation bridges enterprise identity providers (Active Directory, social logins) with AWS FM services through Cognito, eliminating the need for separate credentials and enabling centralized governance.

  2. RBAC for Models goes beyond simple allow/deny -- it enforces per-model, per-environment, per-token-count restrictions, ensuring developers cannot accidentally invoke premium models in production (saving ~$4,125/day at scale).

  3. Least Privilege means every IAM policy is scoped to the minimum actions, resources, and conditions needed. The production ECS task role cannot list models, create jobs, or access Sonnet -- only invoke Haiku through the VPC endpoint.

  4. Defense in Depth layers identity, RBAC, network, and API security so that a compromise at any single layer does not grant FM access. The FMAccessController enforces all layers in a single code path.

  5. Cost Guards integrated into the access controller prevent runaway costs from unauthorized or excessive model invocations, critical at MangaAssist's 1M messages/day scale where an unrestricted Sonnet deployment would cost ~$4,500/day versus ~$375/day with Haiku.