LOCAL PREVIEW View on GitHub

Skill 2.3.3 -- Identity Federation and RBAC Patterns for FM Access

MangaAssist context: JP Manga store chatbot on AWS — Bedrock Claude 3 (Sonnet at $3/$15 per 1M tokens input/output, Haiku at $0.25/$1.25), OpenSearch Serverless (vector store), DynamoDB (sessions/products), ECS Fargate (orchestrator), API Gateway WebSocket, ElastiCache Redis. Target: useful answer in under 3 seconds, 1M messages/day scale.


Skill Mapping

Dimension Detail
Skill 2.3.3 -- Secure Access Frameworks
Task 2.3 -- Enterprise Integration
Domain 2 -- Implementation & Integration
Focus Identity federation with Cognito, cross-account access, fine-grained RBAC, custom Lambda authorizers
MangaAssist Relevance Cognito user pools with JP social providers, per-tier model access, cross-account Bedrock invocation, WebSocket custom authorizer

Mind Map -- Identity Federation and RBAC

mindmap
  root((Identity Federation\n& RBAC))
    Cognito User Pools
      User Registration
        Email + password signup
        Social identity providers
        LINE Login for JP users
        Google / Apple sign-in
      Authentication Flows
        SRP (Secure Remote Password)
        Custom auth challenge
        TOTP MFA enrollment
        Adaptive authentication
      Token Management
        ID Token with claims
        Access Token for APIs
        Refresh Token rotation
        Token revocation endpoint
      User Attributes
        Locale preference (ja-JP)
        Manga genre preferences
        Reading history consent
        Custom attribute schema
    Cross-Account Access
      AssumeRole Pattern
        Trust policy configuration
        External ID requirement
        Session duration limits
        Role chaining prevention
      Resource-Based Policies
        Bedrock model sharing
        S3 cross-account access
        KMS grant for encryption
        DynamoDB cross-account
      Organizations Integration
        Delegated admin accounts
        SCP guardrails
        Tag-based access control
        OU-scoped permissions
    Fine-Grained Permissions
      Attribute-Based Access Control
        User tier tag conditions
        IP address restrictions
        Time-of-day policies
        Request tag matching
      Resource-Level Permissions
        Per-table DynamoDB access
        Per-index OpenSearch access
        Per-model Bedrock access
        Per-bucket S3 access
      Permission Boundaries
        Maximum permission ceiling
        Developer role limits
        CI/CD role boundaries
        Break-glass procedures
    Custom Authorizer
      Lambda Authorizer Types
        Token-based (TOKEN)
        Request-based (REQUEST)
        WebSocket $connect
        Caching strategies
      Authorization Logic
        JWT claim extraction
        Group membership check
        Rate limit enforcement
        IP allowlist validation
      Response Format
        IAM policy document
        Context variables
        Deny with reason codes
        TTL for cached policies

Federation Architecture -- MangaAssist

graph TB
    subgraph "Social Identity Providers"
        LINE[LINE Login<br/>JP Primary]
        GOOG[Google<br/>Global Fallback]
        APPLE[Apple Sign-In<br/>iOS Users]
    end

    subgraph "Cognito User Pool"
        UP[User Pool<br/>manga-assist-users]
        IDP_LINE[LINE IdP Config<br/>OIDC Provider]
        IDP_GOOG[Google IdP Config<br/>OIDC Provider]
        IDP_APPLE[Apple IdP Config<br/>OIDC Provider]

        UP --- IDP_LINE
        UP --- IDP_GOOG
        UP --- IDP_APPLE

        GROUPS[User Groups]
        GROUPS --- G_ADMIN[manga-admins<br/>Precedence: 0]
        GROUPS --- G_PREMIUM[manga-premium<br/>Precedence: 10]
        GROUPS --- G_BASIC[manga-basic<br/>Precedence: 20]

        TRIGGERS[Lambda Triggers]
        TRIGGERS --- PRE_SIGN[Pre Sign-Up<br/>Validate JP locale]
        TRIGGERS --- POST_CONF[Post Confirmation<br/>Create DynamoDB profile]
        TRIGGERS --- PRE_TOKEN[Pre Token Generation<br/>Add custom claims]
    end

    subgraph "Cognito Identity Pool"
        IP[Identity Pool<br/>manga-assist-identity]
        AUTH_ROLE[Authenticated Role<br/>Based on Cognito Group]
        UNAUTH_ROLE[Unauthenticated Role<br/>Guest Browse Only]

        IP --- AUTH_ROLE
        IP --- UNAUTH_ROLE
    end

    LINE -->|OIDC| IDP_LINE
    GOOG -->|OIDC| IDP_GOOG
    APPLE -->|OIDC| IDP_APPLE

    UP -->|ID Token| IP
    AUTH_ROLE -->|STS Credentials| STS[AWS STS]
    UNAUTH_ROLE -->|Limited Credentials| STS

    style LINE fill:#00C300,stroke:#009900,color:#fff
    style UP fill:#E3F2FD,stroke:#1565C0
    style IP fill:#FFF3E0,stroke:#E65100

Production Code -- Identity Federation and RBAC

Cognito User Pool with Social Providers

"""
MangaAssist Identity Federation
Cognito User Pool management with LINE, Google, and Apple social providers.
Cross-account access patterns and fine-grained RBAC.
"""

import json
import time
import logging
from typing import Any, Optional
from dataclasses import dataclass, field
from enum import Enum

import boto3
from botocore.exceptions import ClientError

logger = logging.getLogger("manga_assist.identity")
logger.setLevel(logging.INFO)


# ---------------------------------------------------------------------------
# Constants
# ---------------------------------------------------------------------------
REGION = "ap-northeast-1"    # Tokyo for JP users
ACCOUNT_ID = "123456789012"
PREFIX = "manga-assist"


# ---------------------------------------------------------------------------
# Social Identity Provider Configuration
# ---------------------------------------------------------------------------
class SocialProviderManager:
    """
    Configures social identity providers (LINE, Google, Apple)
    in the Cognito User Pool for JP manga store users.
    """

    def __init__(self, user_pool_id: str, region: str = REGION):
        self._cognito = boto3.client("cognito-idp", region_name=region)
        self.user_pool_id = user_pool_id

    def configure_line_provider(
        self,
        channel_id: str,
        channel_secret_arn: str,
    ) -> dict:
        """
        Configure LINE Login as an OIDC identity provider.
        LINE is the primary social login for Japanese users.
        """
        # Retrieve channel secret from Secrets Manager
        secrets = boto3.client("secretsmanager", region_name=REGION)
        secret_value = secrets.get_secret_value(SecretId=channel_secret_arn)
        channel_secret = json.loads(secret_value["SecretString"])["secret"]

        return self._cognito.create_identity_provider(
            UserPoolId=self.user_pool_id,
            ProviderName="LINE",
            ProviderType="OIDC",
            ProviderDetails={
                "client_id": channel_id,
                "client_secret": channel_secret,
                "authorize_scopes": "openid profile email",
                "oidc_issuer": "https://access.line.me",
                "attributes_request_method": "GET",
                "authorize_url": "https://access.line.me/oauth2/v2.1/authorize",
                "token_url": "https://api.line.me/oauth2/v2.1/token",
                "attributes_url": "https://api.line.me/v2/profile",
                "jwks_uri": "https://api.line.me/oauth2/v2.1/certs",
            },
            AttributeMapping={
                "email": "email",
                "name": "displayName",
                "picture": "pictureUrl",
                "username": "sub",
            },
            IdpIdentifiers=["line.me"],
        )

    def configure_google_provider(
        self,
        client_id: str,
        client_secret_arn: str,
    ) -> dict:
        """Configure Google as a social identity provider."""
        secrets = boto3.client("secretsmanager", region_name=REGION)
        secret_value = secrets.get_secret_value(SecretId=client_secret_arn)
        client_secret = json.loads(secret_value["SecretString"])["secret"]

        return self._cognito.create_identity_provider(
            UserPoolId=self.user_pool_id,
            ProviderName="Google",
            ProviderType="Google",
            ProviderDetails={
                "client_id": client_id,
                "client_secret": client_secret,
                "authorize_scopes": "openid email profile",
            },
            AttributeMapping={
                "email": "email",
                "name": "name",
                "picture": "picture",
                "username": "sub",
            },
        )

    def configure_apple_provider(
        self,
        services_id: str,
        team_id: str,
        key_id: str,
        private_key_arn: str,
    ) -> dict:
        """Configure Apple Sign-In for iOS users."""
        secrets = boto3.client("secretsmanager", region_name=REGION)
        secret_value = secrets.get_secret_value(SecretId=private_key_arn)
        private_key = secret_value["SecretString"]

        return self._cognito.create_identity_provider(
            UserPoolId=self.user_pool_id,
            ProviderName="SignInWithApple",
            ProviderType="SignInWithApple",
            ProviderDetails={
                "client_id": services_id,
                "team_id": team_id,
                "key_id": key_id,
                "private_key": private_key,
                "authorize_scopes": "email name",
            },
            AttributeMapping={
                "email": "email",
                "name": "firstName",
                "username": "sub",
            },
        )


# ---------------------------------------------------------------------------
# Lambda Triggers for Cognito
# ---------------------------------------------------------------------------
class CognitoLambdaTriggers:
    """
    Lambda trigger implementations for Cognito User Pool events.
    These customize the authentication and registration flow.
    """

    @staticmethod
    def pre_sign_up_handler(event: dict, context: Any) -> dict:
        """
        Pre Sign-Up trigger: validate user attributes before registration.
        - Auto-confirm email-verified social provider users
        - Validate locale preferences
        - Block disposable email domains
        """
        trigger_source = event.get("triggerSource", "")
        user_attrs = event["request"].get("userAttributes", {})

        # Auto-confirm users from trusted social providers
        if trigger_source in (
            "PreSignUp_ExternalProvider",
            "PreSignUp_AdminCreateUser",
        ):
            event["response"]["autoConfirmUser"] = True
            if user_attrs.get("email"):
                event["response"]["autoVerifyEmail"] = True
            logger.info(
                "Auto-confirmed social provider user",
                extra={"provider": trigger_source},
            )

        # Validate email domain -- block disposable domains
        email = user_attrs.get("email", "")
        blocked_domains = {
            "tempmail.com", "throwaway.email",
            "guerrillamail.com", "10minutemail.com",
        }
        domain = email.split("@")[-1].lower() if "@" in email else ""
        if domain in blocked_domains:
            raise Exception("Registration not allowed with disposable email")

        return event

    @staticmethod
    def post_confirmation_handler(event: dict, context: Any) -> dict:
        """
        Post Confirmation trigger: create user profile in DynamoDB
        after successful registration.
        """
        user_attrs = event["request"].get("userAttributes", {})
        username = event["userName"]

        dynamodb = boto3.resource("dynamodb", region_name=REGION)
        table = dynamodb.Table(f"{PREFIX}-users")

        try:
            table.put_item(
                Item={
                    "pk": f"USER#{username}",
                    "sk": "PROFILE",
                    "email": user_attrs.get("email", ""),
                    "locale": user_attrs.get("locale", "ja-JP"),
                    "display_name": user_attrs.get("name", username),
                    "tier": "basic",
                    "manga_preferences": [],
                    "created_at": int(time.time()),
                    "updated_at": int(time.time()),
                    "status": "active",
                },
                ConditionExpression="attribute_not_exists(pk)",
            )
            logger.info(f"Created profile for user: {username}")
        except ClientError as e:
            if e.response["Error"]["Code"] != "ConditionalCheckFailedException":
                raise
            logger.info(f"Profile already exists for user: {username}")

        return event

    @staticmethod
    def pre_token_generation_handler(event: dict, context: Any) -> dict:
        """
        Pre Token Generation trigger: add custom claims to the JWT.
        Injects user tier and manga preferences into the ID token.
        """
        username = event["userName"]

        dynamodb = boto3.resource("dynamodb", region_name=REGION)
        table = dynamodb.Table(f"{PREFIX}-users")

        try:
            response = table.get_item(
                Key={"pk": f"USER#{username}", "sk": "PROFILE"}
            )
            profile = response.get("Item", {})
        except ClientError:
            profile = {}

        # Add custom claims to ID token
        event["response"]["claimsOverrideDetails"] = {
            "claimsToAddOrOverride": {
                "custom:tier": profile.get("tier", "basic"),
                "custom:locale": profile.get("locale", "ja-JP"),
                "custom:preferences": json.dumps(
                    profile.get("manga_preferences", [])[:10]
                ),
            },
        }

        return event


# ---------------------------------------------------------------------------
# Cross-Account Access Manager
# ---------------------------------------------------------------------------
class CrossAccountAccessManager:
    """
    Manages cross-account access patterns for MangaAssist.
    Supports scenarios like:
    - Shared Bedrock models in a central AI account
    - Cross-account DynamoDB access for analytics
    - Centralized logging to a security account
    """

    def __init__(
        self,
        source_account: str,
        target_account: str,
        region: str = REGION,
    ):
        self.source_account = source_account
        self.target_account = target_account
        self.region = region
        self._iam = boto3.client("iam", region_name=region)
        self._sts = boto3.client("sts", region_name=region)

    def create_cross_account_role_trust(
        self,
        role_name: str,
        external_id: str,
        max_session_duration: int = 3600,
    ) -> dict:
        """
        Create a trust policy for cross-account role assumption.
        Uses external ID to prevent confused deputy attacks.
        """
        trust_policy = {
            "Version": "2012-10-17",
            "Statement": [
                {
                    "Sid": "AllowCrossAccountAssume",
                    "Effect": "Allow",
                    "Principal": {
                        "AWS": f"arn:aws:iam::{self.source_account}:root"
                    },
                    "Action": "sts:AssumeRole",
                    "Condition": {
                        "StringEquals": {
                            "sts:ExternalId": external_id,
                            "aws:PrincipalTag/Project": "manga-assist",
                        },
                        "Bool": {
                            "aws:MultiFactorAuthPresent": "true"
                        },
                    },
                }
            ],
        }

        return {
            "RoleName": role_name,
            "AssumeRolePolicyDocument": json.dumps(trust_policy),
            "MaxSessionDuration": max_session_duration,
            "Tags": [
                {"Key": "Project", "Value": "manga-assist"},
                {"Key": "CrossAccount", "Value": "true"},
            ],
        }

    def assume_cross_account_role(
        self,
        role_arn: str,
        external_id: str,
        session_name: str = "manga-assist-session",
        duration_seconds: int = 900,
    ) -> dict:
        """
        Assume a cross-account role and return temporary credentials.
        Used when the MangaAssist orchestrator needs to access
        resources in another AWS account.
        """
        try:
            response = self._sts.assume_role(
                RoleArn=role_arn,
                ExternalId=external_id,
                RoleSessionName=session_name,
                DurationSeconds=duration_seconds,
                Tags=[
                    {"Key": "Project", "Value": "manga-assist"},
                    {"Key": "RequestSource", "Value": "ecs-orchestrator"},
                ],
            )

            credentials = response["Credentials"]
            logger.info(
                "Cross-account role assumed successfully",
                extra={
                    "role_arn": role_arn,
                    "expiration": str(credentials["Expiration"]),
                },
            )

            return {
                "access_key": credentials["AccessKeyId"],
                "secret_key": credentials["SecretAccessKey"],
                "session_token": credentials["SessionToken"],
                "expiration": credentials["Expiration"].isoformat(),
            }

        except ClientError as e:
            error_code = e.response["Error"]["Code"]
            logger.error(
                f"Cross-account assume failed: {error_code}",
                extra={"role_arn": role_arn},
            )
            raise

    def build_bedrock_cross_account_policy(self) -> dict:
        """
        IAM policy allowing cross-account Bedrock invocation.
        Attached to the role in the source account.
        """
        return {
            "Version": "2012-10-17",
            "Statement": [
                {
                    "Sid": "AllowAssumeBedrockRole",
                    "Effect": "Allow",
                    "Action": "sts:AssumeRole",
                    "Resource": (
                        f"arn:aws:iam::{self.target_account}:role/"
                        f"{PREFIX}-bedrock-invoker"
                    ),
                    "Condition": {
                        "StringEquals": {
                            "sts:ExternalId": f"{PREFIX}-cross-account"
                        }
                    },
                },
                {
                    "Sid": "DenyRoleChaining",
                    "Effect": "Deny",
                    "Action": "sts:AssumeRole",
                    "Resource": "*",
                    "Condition": {
                        "StringNotEquals": {
                            "aws:PrincipalAccount": self.source_account
                        }
                    },
                },
            ],
        }


# ---------------------------------------------------------------------------
# Fine-Grained Permission Engine
# ---------------------------------------------------------------------------
class FineGrainedPermissionEngine:
    """
    Evaluates fine-grained permissions based on user attributes,
    resource tags, and contextual conditions.
    """

    @dataclass
    class PermissionContext:
        user_id: str
        tier: str
        cognito_groups: list[str]
        ip_address: str
        request_time: int
        user_agent: str
        locale: str = "ja-JP"

    @dataclass
    class PermissionDecision:
        allowed: bool
        reason: str
        model_alias: Optional[str] = None
        max_tokens: int = 0
        rate_limit_remaining: int = 0

    def evaluate(
        self,
        ctx: "FineGrainedPermissionEngine.PermissionContext",
        action: str,
        resource: str,
    ) -> "FineGrainedPermissionEngine.PermissionDecision":
        """
        Evaluate whether the action is permitted given the context.
        Applies rules in order: explicit deny, conditional checks, allow.
        """
        # Rule 1: Check IP allowlist for admin actions
        if action.startswith("admin:") and not self._is_admin_ip(ctx.ip_address):
            return self.PermissionDecision(
                allowed=False,
                reason="Admin actions require allowlisted IP",
            )

        # Rule 2: Time-based restriction -- no admin actions outside JP business hours
        if action.startswith("admin:") and not self._in_business_hours(ctx.request_time):
            return self.PermissionDecision(
                allowed=False,
                reason="Admin actions restricted to JP business hours (9-18 JST)",
            )

        # Rule 3: Model access based on tier
        if action == "bedrock:InvokeModel":
            return self._evaluate_model_access(ctx, resource)

        # Rule 4: Data access based on tier and locale
        if action.startswith("dynamodb:"):
            return self._evaluate_data_access(ctx, action, resource)

        # Default: allow if group membership matches
        required_group = self._action_to_group(action)
        if required_group and required_group not in ctx.cognito_groups:
            return self.PermissionDecision(
                allowed=False,
                reason=f"Requires group membership: {required_group}",
            )

        return self.PermissionDecision(allowed=True, reason="Permitted")

    def _evaluate_model_access(
        self,
        ctx: "FineGrainedPermissionEngine.PermissionContext",
        model_alias: str,
    ) -> "FineGrainedPermissionEngine.PermissionDecision":
        """Evaluate model-specific access rules."""
        tier_rules = {
            "guest": {
                "allowed_models": ["haiku"],
                "max_tokens": 256,
                "daily_limit": 50,
            },
            "basic": {
                "allowed_models": ["haiku", "sonnet"],
                "max_tokens": 1024,
                "daily_limit": 200,
            },
            "premium": {
                "allowed_models": ["haiku", "sonnet"],
                "max_tokens": 4096,
                "daily_limit": 1000,
            },
            "admin": {
                "allowed_models": ["haiku", "sonnet"],
                "max_tokens": 8192,
                "daily_limit": -1,  # Unlimited
            },
        }

        rules = tier_rules.get(ctx.tier, tier_rules["guest"])

        if model_alias not in rules["allowed_models"]:
            return self.PermissionDecision(
                allowed=False,
                reason=f"Tier '{ctx.tier}' cannot access model '{model_alias}'",
                model_alias=model_alias,
            )

        return self.PermissionDecision(
            allowed=True,
            reason="Model access granted",
            model_alias=model_alias,
            max_tokens=rules["max_tokens"],
            rate_limit_remaining=rules["daily_limit"],
        )

    def _evaluate_data_access(
        self,
        ctx: "FineGrainedPermissionEngine.PermissionContext",
        action: str,
        resource: str,
    ) -> "FineGrainedPermissionEngine.PermissionDecision":
        """Evaluate DynamoDB data access with item-level scoping."""
        # Guests can only read public catalog data
        if ctx.tier == "guest":
            if action not in ("dynamodb:GetItem", "dynamodb:Query"):
                return self.PermissionDecision(
                    allowed=False,
                    reason="Guest tier has read-only catalog access",
                )
            if "sessions" in resource or "users" in resource:
                return self.PermissionDecision(
                    allowed=False,
                    reason="Guest tier cannot access session or user data",
                )

        # Non-admin users can only access their own session data
        if "sessions" in resource and ctx.tier != "admin":
            if not resource.endswith(ctx.user_id):
                return self.PermissionDecision(
                    allowed=False,
                    reason="Users can only access their own session data",
                )

        return self.PermissionDecision(allowed=True, reason="Data access granted")

    def _is_admin_ip(self, ip: str) -> bool:
        """Check if IP is in the admin allowlist (office IPs)."""
        admin_cidrs = [
            "10.0.0.0/8",       # Internal VPC
            "203.0.113.0/24",   # Tokyo office
            "198.51.100.0/24",  # Osaka office
        ]
        # Simplified check; production uses ipaddress module
        return ip.startswith("10.") or ip.startswith("203.0.113.") or ip.startswith("198.51.100.")

    def _in_business_hours(self, epoch_time: int) -> bool:
        """Check if request is within JP business hours (9-18 JST)."""
        import datetime
        import zoneinfo
        jst = zoneinfo.ZoneInfo("Asia/Tokyo")
        dt = datetime.datetime.fromtimestamp(epoch_time, tz=jst)
        return 9 <= dt.hour < 18 and dt.weekday() < 5

    def _action_to_group(self, action: str) -> Optional[str]:
        group_map = {
            "admin:": "manga-admins",
            "manage:": "manga-admins",
            "analytics:": "manga-premium",
        }
        for prefix, group in group_map.items():
            if action.startswith(prefix):
                return group
        return None


# ---------------------------------------------------------------------------
# Custom Lambda Authorizer (Enhanced)
# ---------------------------------------------------------------------------
class EnhancedLambdaAuthorizer:
    """
    Enhanced Lambda authorizer that combines Cognito token validation
    with fine-grained permission evaluation and rate limiting.
    """

    def __init__(
        self,
        user_pool_id: str,
        client_id: str,
        region: str = REGION,
    ):
        self.user_pool_id = user_pool_id
        self.client_id = client_id
        self.region = region
        self._cognito = boto3.client("cognito-idp", region_name=region)
        self._dynamodb = boto3.resource("dynamodb", region_name=region)
        self._rate_table = self._dynamodb.Table(f"{PREFIX}-rate-limits")
        self._permission_engine = FineGrainedPermissionEngine()

    def handler(self, event: dict, context: Any) -> dict:
        """Main Lambda authorizer handler."""
        request_context = event.get("requestContext", {})
        route_key = request_context.get("routeKey", "$default")
        source_ip = (
            request_context.get("identity", {}).get("sourceIp", "unknown")
        )

        # Extract token
        token = self._extract_token(event)
        if not token:
            logger.warning("Missing authentication token")
            raise Exception("Unauthorized")

        # Validate token and get user info
        user_info = self._validate_and_enrich(token)
        if not user_info:
            raise Exception("Unauthorized")

        # Check rate limit
        if not self._check_rate_limit(user_info["user_id"], user_info["tier"]):
            logger.warning(
                "Rate limit exceeded",
                extra={"user_id": user_info["user_id"]},
            )
            # Return policy that denies access with rate limit context
            return self._build_deny_policy(
                user_info["user_id"],
                event.get("methodArn", ""),
                reason="RATE_LIMIT_EXCEEDED",
            )

        # Evaluate fine-grained permissions for the route
        perm_ctx = FineGrainedPermissionEngine.PermissionContext(
            user_id=user_info["user_id"],
            tier=user_info["tier"],
            cognito_groups=user_info["groups"],
            ip_address=source_ip,
            request_time=int(time.time()),
            user_agent=event.get("headers", {}).get("User-Agent", ""),
            locale=user_info.get("locale", "ja-JP"),
        )

        action = self._route_to_action(route_key)
        decision = self._permission_engine.evaluate(
            perm_ctx, action, route_key
        )

        if not decision.allowed:
            logger.warning(
                f"Permission denied: {decision.reason}",
                extra={"user_id": user_info["user_id"], "route": route_key},
            )
            return self._build_deny_policy(
                user_info["user_id"],
                event.get("methodArn", ""),
                reason=decision.reason,
            )

        return self._build_allow_policy(user_info, event.get("methodArn", ""))

    def _extract_token(self, event: dict) -> Optional[str]:
        qs = event.get("queryStringParameters") or {}
        token = qs.get("token")
        if token:
            return token
        headers = event.get("headers") or {}
        auth = headers.get("Authorization", "")
        if auth.startswith("Bearer "):
            return auth[7:]
        return None

    def _validate_and_enrich(self, token: str) -> Optional[dict]:
        """Validate Cognito token and enrich with DynamoDB profile."""
        try:
            user = self._cognito.get_user(AccessToken=token)
            username = user["Username"]

            attrs = {
                a["Name"]: a["Value"]
                for a in user.get("UserAttributes", [])
            }

            # Fetch groups
            groups_resp = self._cognito.admin_list_groups_for_user(
                Username=username,
                UserPoolId=self.user_pool_id,
            )
            groups = [g["GroupName"] for g in groups_resp.get("Groups", [])]

            # Determine tier from groups
            tier = "guest"
            for group, t in [
                ("manga-admins", "admin"),
                ("manga-premium", "premium"),
                ("manga-basic", "basic"),
            ]:
                if group in groups:
                    tier = t
                    break

            return {
                "user_id": username,
                "email": attrs.get("email", ""),
                "tier": tier,
                "groups": groups,
                "locale": attrs.get("locale", "ja-JP"),
                "mfa_verified": attrs.get("phone_number_verified") == "true",
            }

        except ClientError as e:
            logger.warning(f"Token validation failed: {e.response['Error']['Code']}")
            return None

    def _check_rate_limit(self, user_id: str, tier: str) -> bool:
        """Check and increment rate limit counter in DynamoDB."""
        limits = {
            "guest": 10,       # 10 requests per minute
            "basic": 30,       # 30 requests per minute
            "premium": 100,    # 100 requests per minute
            "admin": 500,      # 500 requests per minute
        }
        limit = limits.get(tier, 10)
        window = int(time.time() // 60)  # 1-minute window

        try:
            response = self._rate_table.update_item(
                Key={
                    "pk": f"RATE#{user_id}",
                    "sk": f"WINDOW#{window}",
                },
                UpdateExpression=(
                    "SET #count = if_not_exists(#count, :zero) + :one, "
                    "#ttl = :ttl"
                ),
                ExpressionAttributeNames={
                    "#count": "request_count",
                    "#ttl": "ttl",
                },
                ExpressionAttributeValues={
                    ":zero": 0,
                    ":one": 1,
                    ":ttl": int(time.time()) + 120,  # 2 minutes TTL
                },
                ReturnValues="UPDATED_NEW",
            )
            count = int(response["Attributes"]["request_count"])
            return count <= limit

        except ClientError:
            logger.exception("Rate limit check failed -- allowing request")
            return True  # Fail open to avoid blocking legitimate users

    def _route_to_action(self, route_key: str) -> str:
        """Map WebSocket route to permission action."""
        route_actions = {
            "$connect": "connect",
            "$disconnect": "disconnect",
            "$default": "chat:message",
            "chat/browse": "chat:browse",
            "chat/search": "chat:search",
            "chat/recommend": "chat:recommend",
            "chat/history": "chat:history",
            "chat/admin": "admin:chat",
            "analytics/usage": "analytics:usage",
        }
        return route_actions.get(route_key, "chat:message")

    def _build_allow_policy(self, user_info: dict, method_arn: str) -> dict:
        return {
            "principalId": user_info["user_id"],
            "policyDocument": {
                "Version": "2012-10-17",
                "Statement": [
                    {
                        "Action": "execute-api:Invoke",
                        "Effect": "Allow",
                        "Resource": method_arn,
                    }
                ],
            },
            "context": {
                "userId": user_info["user_id"],
                "tier": user_info["tier"],
                "groups": ",".join(user_info["groups"]),
                "locale": user_info.get("locale", "ja-JP"),
                "mfaVerified": str(user_info.get("mfa_verified", False)).lower(),
            },
        }

    def _build_deny_policy(
        self, principal: str, method_arn: str, reason: str = ""
    ) -> dict:
        return {
            "principalId": principal,
            "policyDocument": {
                "Version": "2012-10-17",
                "Statement": [
                    {
                        "Action": "execute-api:Invoke",
                        "Effect": "Deny",
                        "Resource": method_arn,
                    }
                ],
            },
            "context": {"denyReason": reason},
        }


# ---------------------------------------------------------------------------
# Permission Boundary for Developer Roles
# ---------------------------------------------------------------------------
class PermissionBoundaryBuilder:
    """
    Creates permission boundaries that cap the maximum permissions
    any MangaAssist role can have, even if the attached policy
    is overly broad.
    """

    def __init__(self, account_id: str, region: str = REGION):
        self.account_id = account_id
        self.region = region

    def developer_boundary(self) -> dict:
        """
        Permission boundary for developer roles.
        Prevents developers from escalating privileges, modifying
        security resources, or accessing production secrets.
        """
        return {
            "Version": "2012-10-17",
            "Statement": [
                {
                    "Sid": "AllowDevelopmentActions",
                    "Effect": "Allow",
                    "Action": [
                        "bedrock:InvokeModel",
                        "bedrock:InvokeModelWithResponseStream",
                        "dynamodb:GetItem",
                        "dynamodb:Query",
                        "dynamodb:PutItem",
                        "dynamodb:UpdateItem",
                        "dynamodb:DeleteItem",
                        "es:ESHttp*",
                        "logs:*",
                        "cloudwatch:PutMetricData",
                        "cloudwatch:GetMetric*",
                        "xray:PutTelemetryRecords",
                        "xray:PutTraceSegments",
                        "s3:GetObject",
                        "s3:PutObject",
                        "s3:ListBucket",
                    ],
                    "Resource": "*",
                },
                {
                    "Sid": "DenySecurityModification",
                    "Effect": "Deny",
                    "Action": [
                        "iam:CreateUser",
                        "iam:CreateRole",
                        "iam:AttachRolePolicy",
                        "iam:PutRolePolicy",
                        "iam:DeleteRolePolicy",
                        "iam:CreatePolicy",
                        "iam:DeletePolicy",
                        "iam:PassRole",
                        "organizations:*",
                        "account:*",
                    ],
                    "Resource": "*",
                },
                {
                    "Sid": "DenyProductionSecrets",
                    "Effect": "Deny",
                    "Action": [
                        "secretsmanager:GetSecretValue",
                        "secretsmanager:DeleteSecret",
                        "secretsmanager:UpdateSecret",
                    ],
                    "Resource": [
                        f"arn:aws:secretsmanager:{self.region}:{self.account_id}:"
                        f"secret:{PREFIX}/prod/*"
                    ],
                },
                {
                    "Sid": "DenyKMSKeyDeletion",
                    "Effect": "Deny",
                    "Action": [
                        "kms:ScheduleKeyDeletion",
                        "kms:DisableKey",
                    ],
                    "Resource": "*",
                },
            ],
        }

    def cicd_boundary(self) -> dict:
        """
        Permission boundary for CI/CD pipeline roles.
        Allows deployment actions but prevents IAM and security changes.
        """
        return {
            "Version": "2012-10-17",
            "Statement": [
                {
                    "Sid": "AllowDeploymentActions",
                    "Effect": "Allow",
                    "Action": [
                        "ecs:UpdateService",
                        "ecs:RegisterTaskDefinition",
                        "ecs:DescribeServices",
                        "ecs:DescribeTaskDefinition",
                        "ecr:GetAuthorizationToken",
                        "ecr:BatchCheckLayerAvailability",
                        "ecr:PutImage",
                        "ecr:InitiateLayerUpload",
                        "ecr:UploadLayerPart",
                        "ecr:CompleteLayerUpload",
                        "lambda:UpdateFunctionCode",
                        "lambda:UpdateFunctionConfiguration",
                        "s3:PutObject",
                        "s3:GetObject",
                        "cloudformation:*",
                        "codebuild:*",
                        "codepipeline:*",
                    ],
                    "Resource": "*",
                },
                {
                    "Sid": "DenyIAMEscalation",
                    "Effect": "Deny",
                    "Action": [
                        "iam:CreateUser",
                        "iam:CreateRole",
                        "iam:AttachRolePolicy",
                        "iam:PutRolePolicy",
                        "iam:CreatePolicy",
                    ],
                    "Resource": "*",
                },
            ],
        }

Cross-Account Access Flow

sequenceDiagram
    participant App as MangaAssist Account<br/>(Application)
    participant STS as AWS STS
    participant AI as Central AI Account<br/>(Bedrock Models)
    participant Sec as Security Account<br/>(Audit Logs)

    Note over App,AI: Cross-Account Bedrock Invocation

    App->>STS: AssumeRole(AI-Bedrock-Role,<br/>ExternalId, MFA)
    STS->>STS: Validate trust policy<br/>+ external ID + MFA
    STS-->>App: Temporary credentials<br/>(15 min duration)

    App->>AI: InvokeModel(Claude 3 Sonnet)<br/>using temp credentials
    AI-->>App: Model response

    Note over App,Sec: Cross-Account Audit Logging

    App->>STS: AssumeRole(Sec-Audit-Writer)
    STS-->>App: Temp credentials (audit)

    App->>Sec: PutItem(audit-log-table)<br/>user, model, tokens, cost
    Sec-->>App: Write confirmed

ABAC (Attribute-Based Access Control) Pattern

graph TD
    subgraph "Request Context"
        A[User Request] --> B{Extract Attributes}
        B --> C[User Tier: premium]
        B --> D[Locale: ja-JP]
        B --> E[IP: 203.0.113.42]
        B --> F[Time: 14:30 JST]
        B --> G[Route: chat/recommend]
    end

    subgraph "Policy Evaluation"
        C --> H{Tier Check}
        H -->|premium allows sonnet| I[Model: ALLOW]

        D --> J{Locale Check}
        J -->|ja-JP in allowed| K[Region: ALLOW]

        E --> L{IP Check}
        L -->|Office IP range| M[Network: ALLOW]

        F --> N{Time Check}
        N -->|Business hours| O[Time: ALLOW]

        G --> P{Route Check}
        P -->|premium can recommend| Q[Route: ALLOW]
    end

    subgraph "Combined Decision"
        I --> R{All conditions met?}
        K --> R
        M --> R
        O --> R
        Q --> R

        R -->|Yes| S[ALLOW with constraints:<br/>max_tokens=4096<br/>model=sonnet<br/>rate_limit=100/min]
        R -->|No| T[DENY with reason code]
    end

    style S fill:#C8E6C9,stroke:#2E7D32
    style T fill:#FFCDD2,stroke:#C62828

Key Takeaways

# Takeaway MangaAssist Application
1 Cognito User Pools handle social federation natively -- LINE, Google, Apple integrate as OIDC/social providers with attribute mapping to a unified user model LINE Login is the primary JP social provider; user attributes include locale and manga preferences
2 Lambda triggers customize every auth lifecycle event -- Pre Sign-Up validates, Post Confirmation provisions, Pre Token Generation enriches JWTs with business claims Pre Token Generation injects tier and locale into the ID token so the authorizer can make decisions without a DB lookup
3 Cross-account access requires external ID + MFA -- prevents confused deputy attacks and ensures only authorized principals can assume roles in other accounts MangaAssist app account assumes a role in the central AI account to invoke Bedrock models; external ID and MFA are mandatory
4 Fine-grained RBAC evaluates user tier, IP, time, and route -- attribute-based access control (ABAC) goes beyond group membership to enforce contextual policies Admin actions restricted to office IPs during JP business hours; guest users limited to read-only catalog with Haiku-only access
5 Rate limiting in DynamoDB with TTL -- per-user, per-minute counters use atomic increments and auto-expire to prevent abuse without external rate limiter overhead Guest: 10/min, Basic: 30/min, Premium: 100/min, Admin: 500/min -- enforced at the Lambda authorizer before reaching ECS
6 Permission boundaries cap maximum privileges -- even if a developer or CI/CD role has overly broad policies, the boundary prevents IAM escalation and production secret access Developer roles cannot create IAM users/roles, modify policies, or access production secrets; CI/CD cannot touch IAM at all
7 Deny statements override Allow -- explicit denies for security-critical actions (IAM mutation, KMS deletion, production secrets) cannot be overridden by any attached policy SCP denies non-approved models at the org level; permission boundary denies IAM escalation at the role level
8 Fail-open for rate limiting, fail-closed for auth -- rate limit check failures allow the request (availability over strictness), but token validation failures always deny If DynamoDB rate limit table is unreachable, requests pass through; if Cognito token validation fails, request is denied