Secure Access Architecture for FM Services
MangaAssist context: JP Manga store chatbot on AWS — Bedrock Claude 3 (Sonnet at $3/$15 per 1M tokens input/output, Haiku at $0.25/$1.25), OpenSearch Serverless (vector store), DynamoDB (sessions/products), ECS Fargate (orchestrator), API Gateway WebSocket, ElastiCache Redis. Target: useful answer in under 3 seconds, 1M messages/day scale.
Skill Mapping
| Attribute | Value |
|---|---|
| Certification | AWS Certified AI Practitioner (AIP-C01) |
| Domain | 2 — Implementation and Integration of Foundation Models |
| Task | 2.3 — Describe methods to integrate FM-powered applications into enterprise systems |
| Skill | 2.3.3 — Create secure access frameworks to ensure appropriate security controls (identity federation between FM services and enterprise systems, role-based access control for model and data access, least privilege API access to FMs) |
1. Security Architecture Mindmap
mindmap
root((Secure Access<br/>Architecture))
Identity Federation
Cognito User Pools
SAML 2.0 Providers
OIDC Providers
Social Identity Providers
Cognito Identity Pools
Federated Identities
Temporary AWS Credentials
Cross-Account Access
Enterprise IdP Integration
Active Directory
Okta and Auth0
Custom SAML Providers
Token Management
JWT Validation
Token Refresh Flows
Session Duration Controls
RBAC — Role-Based Access Control
Model-Level Permissions
Admin Full Access
Developer Invoke Only
Read-Only Monitoring
Per-Model Granularity
Data Access Boundaries
Vector Store Isolation
Session Data Scoping
Product Catalog Access
Environment Separation
Production Lockdown
Staging Limited Access
Development Sandbox
Audit and Compliance
CloudTrail Logging
Access Reviews
Compliance Reports
Least Privilege
IAM Policy Design
Action-Level Restrictions
Resource-Level Conditions
Tag-Based Access Control
Permission Boundaries
Team-Level Boundaries
Service-Level Boundaries
Account-Level SCPs
Temporary Credentials
STS AssumeRole
Session Policies
Time-Limited Tokens
Automated Enforcement
IAM Access Analyzer
Policy Validation
Drift Detection
API Security
API Gateway Controls
WAF Integration
Rate Limiting
Request Validation
Authentication Layers
Cognito Authorizers
Lambda Authorizers
IAM Authorization
Encryption
TLS 1.3 In-Transit
KMS At-Rest
VPC Endpoints
Monitoring
CloudWatch Alarms
GuardDuty Threats
Security Hub Findings
2. MangaAssist Security Architecture Flowchart
flowchart TB
subgraph ClientLayer["Client Layer"]
WebApp["Web Application<br/>(Manga Store Frontend)"]
MobileApp["Mobile Application"]
AdminConsole["Admin Console"]
end
subgraph AuthLayer["Authentication and Authorization Layer"]
Cognito["Amazon Cognito<br/>User Pool + Identity Pool"]
SAML["Enterprise SAML IdP<br/>(Active Directory)"]
OIDC["OIDC Provider<br/>(Social Login / LINE)"]
LambdaAuth["Lambda Authorizer<br/>(Custom Token Validation)"]
end
subgraph APILayer["API Security Layer"]
APIGW["API Gateway WebSocket<br/>+ WAF + Rate Limiting"]
VPCEndpoint["VPC Endpoint<br/>(AWS PrivateLink)"]
end
subgraph ComputeLayer["Compute and Orchestration Layer"]
ECS["ECS Fargate<br/>(Task Role: manga-ecs-task-role)"]
IAMRole["IAM Role<br/>(Least Privilege Bedrock Access)"]
PermBoundary["Permission Boundary<br/>(Team-Level Restrictions)"]
end
subgraph FMLayer["Foundation Model Layer"]
Bedrock["Amazon Bedrock<br/>(Claude 3 Sonnet / Haiku)"]
ModelAccess["Model Access Policy<br/>(Per-Model Permissions)"]
Guardrails["Bedrock Guardrails<br/>(Content Filtering)"]
end
subgraph DataLayer["Data Layer"]
OpenSearch["OpenSearch Serverless<br/>(Vector Store — AOSS Policy)"]
DynamoDB["DynamoDB<br/>(Sessions/Products — Table Policy)"]
Redis["ElastiCache Redis<br/>(VPC-Only Access)"]
end
subgraph MonitorLayer["Monitoring and Audit Layer"]
CloudTrail["CloudTrail<br/>(API Call Logging)"]
CloudWatch["CloudWatch<br/>(Metrics and Alarms)"]
GuardDuty["GuardDuty<br/>(Threat Detection)"]
SecurityHub["Security Hub<br/>(Compliance Dashboard)"]
end
WebApp -->|"HTTPS + JWT"| Cognito
MobileApp -->|"HTTPS + JWT"| Cognito
AdminConsole -->|"SAML Assertion"| SAML
SAML -->|"Federation"| Cognito
OIDC -->|"OIDC Token"| Cognito
Cognito -->|"ID Token + Access Token"| LambdaAuth
LambdaAuth -->|"Authorize"| APIGW
APIGW -->|"VPC Link"| ECS
ECS -->|"AssumeRole"| IAMRole
IAMRole -->|"Scoped Invoke"| Bedrock
PermBoundary -.->|"Constrains"| IAMRole
Bedrock --> ModelAccess
Bedrock --> Guardrails
ECS -->|"VPC Endpoint"| OpenSearch
ECS -->|"VPC Endpoint"| DynamoDB
ECS -->|"VPC Only"| Redis
ECS -.->|"Logs"| CloudWatch
IAMRole -.->|"API Calls"| CloudTrail
CloudTrail -.->|"Findings"| SecurityHub
GuardDuty -.->|"Alerts"| SecurityHub
style ClientLayer fill:#e3f2fd,stroke:#1565c0,stroke-width:2px
style AuthLayer fill:#fff3e0,stroke:#e65100,stroke-width:2px
style APILayer fill:#f3e5f5,stroke:#6a1b9a,stroke-width:2px
style ComputeLayer fill:#e8f5e9,stroke:#2e7d32,stroke-width:2px
style FMLayer fill:#fce4ec,stroke:#b71c1c,stroke-width:2px
style DataLayer fill:#e0f7fa,stroke:#00695c,stroke-width:2px
style MonitorLayer fill:#fff8e1,stroke:#f57f17,stroke-width:2px
3. Identity Federation for FM Access
3.1 Federation Architecture Overview
MangaAssist uses a multi-layer identity federation strategy that bridges enterprise identity providers with AWS services, ultimately enabling authenticated and authorized access to Foundation Models through Amazon Bedrock.
Federation Flow:
- End Users authenticate via Cognito User Pool (email/password, social login, or enterprise SAML)
- Enterprise Staff authenticate via corporate Active Directory federated through SAML 2.0
- Cognito Identity Pool exchanges authenticated tokens for temporary AWS credentials
- STS issues time-limited credentials scoped to the user's role
- ECS Task Role assumes a Bedrock-access role with least privilege permissions
3.2 Why Identity Federation Matters for FM Services
Without federation, every team that needs FM access would need dedicated IAM users with long-lived credentials. At MangaAssist's scale of 1M messages/day, this creates:
- Credential sprawl: Hundreds of access keys across teams
- Audit gaps: No link between the human identity and the API call
- Stale permissions: Departed employees retain FM access
- Compliance violations: No centralized identity governance
Federation solves all four problems by routing every FM access request through a single identity provider, issuing temporary credentials, and logging every access decision.
3.3 Cognito User Pool Configuration
"""
cognito_pool_config.py
Amazon Cognito User Pool configuration for MangaAssist identity federation.
Supports SAML 2.0, OIDC, and native authentication flows.
"""
import json
import boto3
import logging
from typing import Dict, List, Optional, Any
from dataclasses import dataclass, field
from datetime import datetime
logger = logging.getLogger(__name__)
@dataclass
class SAMLProviderConfig:
"""Configuration for a SAML 2.0 identity provider."""
provider_name: str
metadata_url: Optional[str] = None
metadata_file: Optional[str] = None
idp_sign_out: bool = True
attribute_mapping: Dict[str, str] = field(default_factory=lambda: {
"email": "http://schemas.xmlsoap.org/ws/2005/05/identity/claims/emailaddress",
"given_name": "http://schemas.xmlsoap.org/ws/2005/05/identity/claims/givenname",
"family_name": "http://schemas.xmlsoap.org/ws/2005/05/identity/claims/surname",
"custom:department": "http://schemas.xmlsoap.org/ws/2005/05/identity/claims/department",
"custom:role": "http://schemas.xmlsoap.org/ws/2005/05/identity/claims/role",
})
@dataclass
class OIDCProviderConfig:
"""Configuration for an OIDC identity provider."""
provider_name: str
client_id: str
client_secret: str
issuer_url: str
authorize_scopes: str = "openid email profile"
attribute_mapping: Dict[str, str] = field(default_factory=lambda: {
"email": "email",
"given_name": "given_name",
"family_name": "family_name",
"username": "sub",
})
class CognitoPoolConfigurator:
"""
Manages the Amazon Cognito User Pool configuration for MangaAssist.
Creates and configures user pools with:
- SAML 2.0 federation for enterprise staff (Active Directory)
- OIDC federation for social login providers (LINE for JP market)
- Custom attributes for role-based access control
- Password policies and MFA enforcement
- Token expiration and refresh policies aligned to session duration
"""
def __init__(self, region: str = "ap-northeast-1"):
self.cognito_client = boto3.client("cognito-idp", region_name=region)
self.region = region
def create_user_pool(self, pool_name: str = "MangaAssistUserPool") -> Dict[str, Any]:
"""
Create a Cognito User Pool with security-hardened settings.
Returns the User Pool ID and ARN for downstream configuration.
"""
response = self.cognito_client.create_user_pool(
PoolName=pool_name,
Policies={
"PasswordPolicy": {
"MinimumLength": 12,
"RequireUppercase": True,
"RequireLowercase": True,
"RequireNumbers": True,
"RequireSymbols": True,
"TemporaryPasswordValidityDays": 1,
}
},
AutoVerifiedAttributes=["email"],
UsernameAttributes=["email"],
MfaConfiguration="ON",
SoftwareTokenMfaConfiguration={"Enabled": True},
UserPoolAddOns={"AdvancedSecurityMode": "ENFORCED"},
Schema=[
{
"Name": "email",
"AttributeDataType": "String",
"Required": True,
"Mutable": True,
},
{
"Name": "department",
"AttributeDataType": "String",
"Required": False,
"Mutable": True,
"StringAttributeConstraints": {"MaxLength": "64"},
},
{
"Name": "role",
"AttributeDataType": "String",
"Required": False,
"Mutable": True,
"StringAttributeConstraints": {"MaxLength": "32"},
},
{
"Name": "model_access_tier",
"AttributeDataType": "String",
"Required": False,
"Mutable": True,
"StringAttributeConstraints": {"MaxLength": "16"},
},
],
AccountRecoverySetting={
"RecoveryMechanisms": [
{"Priority": 1, "Name": "verified_email"},
]
},
UserPoolTags={
"Project": "MangaAssist",
"SecurityLevel": "High",
"Environment": "Production",
},
)
pool_id = response["UserPool"]["Id"]
pool_arn = response["UserPool"]["Arn"]
logger.info(f"Created Cognito User Pool: {pool_id}")
return {"pool_id": pool_id, "pool_arn": pool_arn}
def configure_saml_provider(
self, pool_id: str, config: SAMLProviderConfig
) -> Dict[str, Any]:
"""
Register a SAML 2.0 identity provider for enterprise federation.
Used for corporate Active Directory integration, allowing
enterprise staff to authenticate with their corporate credentials
and receive appropriate FM access permissions based on AD group
membership.
"""
provider_details = {}
if config.metadata_url:
provider_details["MetadataURL"] = config.metadata_url
elif config.metadata_file:
provider_details["MetadataFile"] = config.metadata_file
provider_details["IDPSignout"] = str(config.idp_sign_out).lower()
response = self.cognito_client.create_identity_provider(
UserPoolId=pool_id,
ProviderName=config.provider_name,
ProviderType="SAML",
ProviderDetails=provider_details,
AttributeMapping=config.attribute_mapping,
IdpIdentifiers=[config.provider_name.lower()],
)
logger.info(
f"Configured SAML provider '{config.provider_name}' for pool {pool_id}"
)
return response
def configure_oidc_provider(
self, pool_id: str, config: OIDCProviderConfig
) -> Dict[str, Any]:
"""
Register an OIDC identity provider for social or external login.
Supports Google, LINE (popular in Japan for Manga users), and
other custom OIDC providers.
"""
response = self.cognito_client.create_identity_provider(
UserPoolId=pool_id,
ProviderName=config.provider_name,
ProviderType="OIDC",
ProviderDetails={
"client_id": config.client_id,
"client_secret": config.client_secret,
"oidc_issuer": config.issuer_url,
"authorize_scopes": config.authorize_scopes,
"attributes_request_method": "GET",
},
AttributeMapping=config.attribute_mapping,
IdpIdentifiers=[config.provider_name.lower()],
)
logger.info(
f"Configured OIDC provider '{config.provider_name}' for pool {pool_id}"
)
return response
def create_app_client(
self, pool_id: str, client_name: str = "MangaAssistWebApp"
) -> Dict[str, Any]:
"""
Create a Cognito App Client with appropriate OAuth flows.
Configures the app client for the MangaAssist web application
with PKCE-based authorization code flow (no client secret for
public clients like SPAs).
"""
response = self.cognito_client.create_user_pool_client(
UserPoolId=pool_id,
ClientName=client_name,
GenerateSecret=False,
ExplicitAuthFlows=[
"ALLOW_USER_SRP_AUTH",
"ALLOW_REFRESH_TOKEN_AUTH",
],
SupportedIdentityProviders=["COGNITO", "CorporateAD", "LINE"],
AllowedOAuthFlows=["code"],
AllowedOAuthScopes=["openid", "email", "profile"],
AllowedOAuthFlowsUserPoolClient=True,
CallbackURLs=[
"https://manga-assist.example.com/callback",
"https://manga-assist.example.com/admin/callback",
],
LogoutURLs=[
"https://manga-assist.example.com/logout",
],
TokenValidityUnits={
"AccessToken": "minutes",
"IdToken": "minutes",
"RefreshToken": "days",
},
AccessTokenValidity=60,
IdTokenValidity=60,
RefreshTokenValidity=7,
PreventUserExistenceErrors="ENABLED",
)
logger.info(f"Created app client '{client_name}' for pool {pool_id}")
return {
"client_id": response["UserPoolClient"]["ClientId"],
"client_name": client_name,
}
def create_identity_pool(
self,
user_pool_id: str,
app_client_id: str,
identity_pool_name: str = "MangaAssistIdentityPool",
) -> Dict[str, Any]:
"""
Create a Cognito Identity Pool for federated AWS credential exchange.
Maps authenticated users from the User Pool into IAM roles
based on their custom:role attribute, enabling RBAC for
Bedrock FM access.
"""
identity_client = boto3.client(
"cognito-identity", region_name=self.region
)
provider_name = (
f"cognito-idp.{self.region}.amazonaws.com/{user_pool_id}"
)
response = identity_client.create_identity_pool(
IdentityPoolName=identity_pool_name,
AllowUnauthenticatedIdentities=False,
CognitoIdentityProviders=[
{
"ProviderName": provider_name,
"ClientId": app_client_id,
"ServerSideTokenCheck": True,
},
],
IdentityPoolTags={
"Project": "MangaAssist",
"SecurityLevel": "High",
},
)
identity_pool_id = response["IdentityPoolId"]
logger.info(f"Created Identity Pool: {identity_pool_id}")
return {"identity_pool_id": identity_pool_id}
def configure_role_mapping(
self,
identity_pool_id: str,
user_pool_id: str,
app_client_id: str,
admin_role_arn: str,
developer_role_arn: str,
readonly_role_arn: str,
default_role_arn: str,
) -> None:
"""
Configure role mapping rules for the Identity Pool.
Maps the custom:role attribute from Cognito tokens to
specific IAM roles with varying levels of Bedrock access.
This is the critical bridge between enterprise identities and
AWS IAM — it determines which Bedrock models each user can invoke.
"""
identity_client = boto3.client(
"cognito-identity", region_name=self.region
)
provider_name = (
f"cognito-idp.{self.region}.amazonaws.com/"
f"{user_pool_id}:{app_client_id}"
)
identity_client.set_identity_pool_roles(
IdentityPoolId=identity_pool_id,
Roles={
"authenticated": default_role_arn,
},
RoleMappings={
provider_name: {
"Type": "Rules",
"AmbiguousRoleResolution": "Deny",
"RulesConfiguration": {
"Rules": [
{
"Claim": "custom:role",
"MatchType": "Equals",
"Value": "admin",
"RoleARN": admin_role_arn,
},
{
"Claim": "custom:role",
"MatchType": "Equals",
"Value": "developer",
"RoleARN": developer_role_arn,
},
{
"Claim": "custom:role",
"MatchType": "Equals",
"Value": "readonly",
"RoleARN": readonly_role_arn,
},
]
},
}
},
)
logger.info(
f"Configured role mapping for Identity Pool {identity_pool_id}"
)
3.4 Identity Federation Sequence
sequenceDiagram
participant User as Manga Store User
participant App as MangaAssist Frontend
participant Cognito as Cognito User Pool
participant IdP as Enterprise IdP (AD)
participant IdentityPool as Cognito Identity Pool
participant STS as AWS STS
participant ECS as ECS Fargate
participant Bedrock as Amazon Bedrock
User->>App: Open chat interface
App->>Cognito: Redirect to hosted UI
alt Enterprise User (SAML)
Cognito->>IdP: SAML AuthnRequest
IdP->>User: Corporate login page
User->>IdP: Enter AD credentials + MFA
IdP->>Cognito: SAML Response (assertions with role claim)
else Social User (OIDC / LINE)
Cognito->>User: Show login options
User->>Cognito: Authenticate (email or LINE)
end
Cognito->>App: Authorization Code (PKCE)
App->>Cognito: Exchange code for tokens
Cognito->>App: ID Token + Access Token + Refresh Token
App->>IdentityPool: GetId (ID Token)
IdentityPool->>STS: AssumeRoleWithWebIdentity (role mapping)
STS->>IdentityPool: Temporary credentials (scoped by role)
IdentityPool->>App: AWS Credentials (time-limited)
App->>ECS: WebSocket connect (JWT in Authorization header)
ECS->>ECS: Validate JWT, extract role claims
ECS->>Bedrock: InvokeModel (using task role with least privilege)
Bedrock->>ECS: Model response
ECS->>App: Chat response via WebSocket
4. RBAC Design for Model Access
4.1 Role Hierarchy
MangaAssist defines four primary roles for FM access, each with progressively restricted permissions:
| Role | Description | Bedrock Access | Data Access | Use Case |
|---|---|---|---|---|
| Admin | Full platform administration | All models, all actions | Full read/write all stores | Platform team leads |
| Developer | Build and test integrations | Invoke Sonnet + Haiku, list models | Read/write dev data, read prod | Engineering team |
| Operator | Monitor and manage production | Invoke Haiku only, read metrics | Read-only all stores | SRE / operations |
| ReadOnly | View dashboards and logs | No invoke, list/get only | Read-only sessions | Business analysts |
4.2 Per-Model Access Matrix
Claude 3 Sonnet Claude 3 Haiku Custom Fine-tuned
Admin Invoke/Manage Invoke/Manage Invoke/Manage
Developer Invoke Invoke Invoke (staging only)
Operator --- Invoke ---
ReadOnly --- --- ---
4.3 Cost Implications of Per-Model RBAC
At MangaAssist scale (1M messages/day), the model choice has massive cost impact:
| Model | Cost per 1M input tokens | Cost per 1M output tokens | Est. daily cost at 1M msgs |
|---|---|---|---|
| Claude 3 Sonnet | $3.00 | $15.00 | ~$4,500 |
| Claude 3 Haiku | $0.25 | $1.25 | ~$375 |
RBAC ensures operators and automated systems use Haiku (cost-effective), while only developers in staging can invoke Sonnet for testing complex interactions. This alone saves roughly $4,125/day compared to unrestricted Sonnet access.
4.4 RBAC Policy Engine
"""
rbac_policy_engine.py
Role-Based Access Control engine for MangaAssist FM access.
Evaluates access requests against role definitions, model permissions,
and data access boundaries with full audit trail support.
"""
import json
import logging
from typing import Dict, List, Optional, Set, Any
from dataclasses import dataclass, field
from enum import Enum
from datetime import datetime, timezone
logger = logging.getLogger(__name__)
class AccessLevel(Enum):
"""Defines the level of access a role has to a resource."""
NONE = "none"
READ = "read"
INVOKE = "invoke"
MANAGE = "manage" # invoke + configure + delete
FULL = "full" # manage + grant/revoke permissions
class ModelTier(Enum):
"""Bedrock model tiers with cost implications."""
PREMIUM = "premium" # Claude 3 Sonnet ($3/$15 per 1M tokens)
STANDARD = "standard" # Claude 3 Haiku ($0.25/$1.25 per 1M tokens)
CUSTOM = "custom" # Fine-tuned models
class Environment(Enum):
"""Deployment environments with different access requirements."""
PRODUCTION = "production"
STAGING = "staging"
DEVELOPMENT = "development"
@dataclass
class ModelPermission:
"""Permission definition for a specific Bedrock model."""
model_id: str
model_tier: ModelTier
access_level: AccessLevel
max_tokens_per_request: int = 4096
max_requests_per_minute: int = 100
allowed_environments: List[Environment] = field(
default_factory=lambda: [Environment.PRODUCTION]
)
@dataclass
class DataPermission:
"""Permission definition for a data resource (DynamoDB, OpenSearch, Redis)."""
resource_type: str # "opensearch", "dynamodb", "redis"
resource_name: str
access_level: AccessLevel
allowed_operations: List[str] = field(default_factory=list)
row_filter: Optional[Dict] = None
@dataclass
class RoleDefinition:
"""Complete role definition including model and data permissions."""
role_name: str
description: str
model_permissions: List[ModelPermission] = field(default_factory=list)
data_permissions: List[DataPermission] = field(default_factory=list)
max_concurrent_sessions: int = 10
session_duration_minutes: int = 60
require_mfa: bool = True
allowed_ip_ranges: List[str] = field(default_factory=list)
tags: Dict[str, str] = field(default_factory=dict)
class RBACPolicyEngine:
"""
Evaluates access control decisions for MangaAssist FM operations.
Maintains role definitions and evaluates incoming access requests
against them, returning allow/deny decisions with detailed reasoning
for audit purposes.
Role hierarchy: Admin > Developer > Operator > ReadOnly
Decision logic:
1. Validate the requesting identity and extract role claims
2. Look up the role definition
3. Check model-level permissions (tier, access level, environment)
4. Check data-level permissions (resource, operations)
5. Apply rate limits and session constraints
6. Log the decision to CloudWatch for audit
"""
def __init__(self):
self.roles: Dict[str, RoleDefinition] = {}
self._initialize_default_roles()
def _initialize_default_roles(self) -> None:
"""Set up the four default MangaAssist roles."""
# --- Admin role: full access to all models and data ---
self.roles["admin"] = RoleDefinition(
role_name="admin",
description="Full platform administration with all model and data access",
model_permissions=[
ModelPermission(
model_id="anthropic.claude-3-sonnet-20240229-v1:0",
model_tier=ModelTier.PREMIUM,
access_level=AccessLevel.FULL,
max_tokens_per_request=8192,
max_requests_per_minute=500,
allowed_environments=[
Environment.PRODUCTION,
Environment.STAGING,
Environment.DEVELOPMENT,
],
),
ModelPermission(
model_id="anthropic.claude-3-haiku-20240307-v1:0",
model_tier=ModelTier.STANDARD,
access_level=AccessLevel.FULL,
max_tokens_per_request=4096,
max_requests_per_minute=1000,
allowed_environments=[
Environment.PRODUCTION,
Environment.STAGING,
Environment.DEVELOPMENT,
],
),
],
data_permissions=[
DataPermission(
resource_type="opensearch",
resource_name="manga-vectors",
access_level=AccessLevel.FULL,
allowed_operations=["search", "index", "delete", "admin"],
),
DataPermission(
resource_type="dynamodb",
resource_name="manga-sessions",
access_level=AccessLevel.FULL,
allowed_operations=[
"GetItem", "PutItem", "DeleteItem", "Query", "Scan",
],
),
DataPermission(
resource_type="dynamodb",
resource_name="manga-products",
access_level=AccessLevel.FULL,
allowed_operations=[
"GetItem", "PutItem", "DeleteItem", "Query", "Scan",
],
),
DataPermission(
resource_type="redis",
resource_name="manga-cache",
access_level=AccessLevel.FULL,
allowed_operations=["GET", "SET", "DEL", "KEYS", "FLUSHDB"],
),
],
max_concurrent_sessions=50,
session_duration_minutes=480,
require_mfa=True,
tags={"tier": "admin", "cost_center": "platform"},
)
# --- Developer role: invoke models, limited data write ---
self.roles["developer"] = RoleDefinition(
role_name="developer",
description="Development access with model invoke and limited data write",
model_permissions=[
ModelPermission(
model_id="anthropic.claude-3-sonnet-20240229-v1:0",
model_tier=ModelTier.PREMIUM,
access_level=AccessLevel.INVOKE,
max_tokens_per_request=4096,
max_requests_per_minute=100,
allowed_environments=[
Environment.STAGING,
Environment.DEVELOPMENT,
],
),
ModelPermission(
model_id="anthropic.claude-3-haiku-20240307-v1:0",
model_tier=ModelTier.STANDARD,
access_level=AccessLevel.INVOKE,
max_tokens_per_request=4096,
max_requests_per_minute=200,
allowed_environments=[
Environment.PRODUCTION,
Environment.STAGING,
Environment.DEVELOPMENT,
],
),
],
data_permissions=[
DataPermission(
resource_type="opensearch",
resource_name="manga-vectors",
access_level=AccessLevel.READ,
allowed_operations=["search"],
),
DataPermission(
resource_type="dynamodb",
resource_name="manga-sessions",
access_level=AccessLevel.INVOKE,
allowed_operations=["GetItem", "PutItem", "Query"],
),
DataPermission(
resource_type="dynamodb",
resource_name="manga-products",
access_level=AccessLevel.READ,
allowed_operations=["GetItem", "Query"],
),
DataPermission(
resource_type="redis",
resource_name="manga-cache",
access_level=AccessLevel.INVOKE,
allowed_operations=["GET", "SET"],
),
],
max_concurrent_sessions=20,
session_duration_minutes=120,
require_mfa=True,
tags={"tier": "developer", "cost_center": "engineering"},
)
# --- Operator role: Haiku invoke only, read-only data ---
self.roles["operator"] = RoleDefinition(
role_name="operator",
description="Operations access with Haiku invoke and read-only data",
model_permissions=[
ModelPermission(
model_id="anthropic.claude-3-haiku-20240307-v1:0",
model_tier=ModelTier.STANDARD,
access_level=AccessLevel.INVOKE,
max_tokens_per_request=2048,
max_requests_per_minute=50,
allowed_environments=[Environment.PRODUCTION],
),
],
data_permissions=[
DataPermission(
resource_type="opensearch",
resource_name="manga-vectors",
access_level=AccessLevel.READ,
allowed_operations=["search"],
),
DataPermission(
resource_type="dynamodb",
resource_name="manga-sessions",
access_level=AccessLevel.READ,
allowed_operations=["GetItem", "Query"],
),
DataPermission(
resource_type="dynamodb",
resource_name="manga-products",
access_level=AccessLevel.READ,
allowed_operations=["GetItem", "Query"],
),
],
max_concurrent_sessions=10,
session_duration_minutes=60,
require_mfa=True,
tags={"tier": "operator", "cost_center": "operations"},
)
# --- ReadOnly role: no model invoke, read-only data ---
self.roles["readonly"] = RoleDefinition(
role_name="readonly",
description="Read-only access to dashboards and session data",
model_permissions=[],
data_permissions=[
DataPermission(
resource_type="dynamodb",
resource_name="manga-sessions",
access_level=AccessLevel.READ,
allowed_operations=["GetItem", "Query"],
),
],
max_concurrent_sessions=5,
session_duration_minutes=30,
require_mfa=False,
tags={"tier": "readonly", "cost_center": "analytics"},
)
def evaluate_model_access(
self,
role_name: str,
model_id: str,
environment: str,
requested_tokens: int = 1024,
) -> Dict[str, Any]:
"""
Evaluate whether a role can access a specific model in a given environment.
Returns a decision object with allow/deny, reasoning, and constraints.
"""
decision = {
"timestamp": datetime.now(timezone.utc).isoformat(),
"role": role_name,
"model_id": model_id,
"environment": environment,
"requested_tokens": requested_tokens,
"allowed": False,
"reason": "",
"constraints": {},
}
# Step 1: validate role exists
role = self.roles.get(role_name)
if not role:
decision["reason"] = f"Unknown role: {role_name}"
logger.warning(f"Access denied: unknown role '{role_name}'")
return decision
# Step 2: find matching model permission for environment
env = Environment(environment)
matching_permission = None
for perm in role.model_permissions:
if perm.model_id == model_id and env in perm.allowed_environments:
matching_permission = perm
break
if not matching_permission:
decision["reason"] = (
f"Role '{role_name}' has no permission for model "
f"'{model_id}' in environment '{environment}'"
)
logger.info(
f"Access denied: {role_name} cannot access {model_id} "
f"in {environment}"
)
return decision
# Step 3: check access level is not NONE
if matching_permission.access_level == AccessLevel.NONE:
decision["reason"] = "Access level is NONE for this model"
return decision
# Step 4: check token limit
if requested_tokens > matching_permission.max_tokens_per_request:
decision["reason"] = (
f"Requested {requested_tokens} tokens exceeds limit of "
f"{matching_permission.max_tokens_per_request}"
)
return decision
# All checks passed
decision["allowed"] = True
decision["reason"] = "Access granted"
decision["constraints"] = {
"max_tokens": matching_permission.max_tokens_per_request,
"max_rpm": matching_permission.max_requests_per_minute,
"access_level": matching_permission.access_level.value,
"model_tier": matching_permission.model_tier.value,
}
logger.info(
f"Access granted: {role_name} -> {model_id} in {environment}"
)
return decision
def evaluate_data_access(
self,
role_name: str,
resource_type: str,
resource_name: str,
operation: str,
) -> Dict[str, Any]:
"""
Evaluate whether a role can perform an operation on a data resource.
Checks the role's data permissions for a matching resource type,
resource name, and operation. Returns a decision with reasoning.
"""
decision = {
"timestamp": datetime.now(timezone.utc).isoformat(),
"role": role_name,
"resource_type": resource_type,
"resource_name": resource_name,
"operation": operation,
"allowed": False,
"reason": "",
}
role = self.roles.get(role_name)
if not role:
decision["reason"] = f"Unknown role: {role_name}"
return decision
for perm in role.data_permissions:
if (
perm.resource_type == resource_type
and perm.resource_name == resource_name
and operation in perm.allowed_operations
):
decision["allowed"] = True
decision["reason"] = "Access granted"
return decision
decision["reason"] = (
f"Role '{role_name}' has no permission for "
f"{operation} on {resource_type}/{resource_name}"
)
return decision
def get_role_summary(self, role_name: str) -> Optional[Dict[str, Any]]:
"""Return a human-readable summary of a role's permissions."""
role = self.roles.get(role_name)
if not role:
return None
return {
"role_name": role.role_name,
"description": role.description,
"models": [
{
"model_id": p.model_id,
"tier": p.model_tier.value,
"access": p.access_level.value,
"environments": [e.value for e in p.allowed_environments],
}
for p in role.model_permissions
],
"data_resources": [
{
"type": p.resource_type,
"name": p.resource_name,
"access": p.access_level.value,
"operations": p.allowed_operations,
}
for p in role.data_permissions
],
"session_limits": {
"max_concurrent": role.max_concurrent_sessions,
"duration_minutes": role.session_duration_minutes,
"mfa_required": role.require_mfa,
},
}
5. Least Privilege IAM Policies for Bedrock API Calls
5.1 ECS Task Role — Production (Haiku Only)
This policy demonstrates strict least-privilege: the production ECS task can only invoke Claude 3 Haiku (the cost-effective model) and cannot list, create, or delete any Bedrock resources.
{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "AllowBedrockHaikuInvoke",
"Effect": "Allow",
"Action": [
"bedrock:InvokeModel",
"bedrock:InvokeModelWithResponseStream"
],
"Resource": [
"arn:aws:bedrock:ap-northeast-1::foundation-model/anthropic.claude-3-haiku-20240307-v1:0"
],
"Condition": {
"StringEquals": {
"aws:RequestedRegion": "ap-northeast-1"
}
}
},
{
"Sid": "AllowBedrockGuardrails",
"Effect": "Allow",
"Action": [
"bedrock:ApplyGuardrail"
],
"Resource": [
"arn:aws:bedrock:ap-northeast-1:123456789012:guardrail/manga-content-filter"
]
},
{
"Sid": "DenyDestructiveBedrockActions",
"Effect": "Deny",
"Action": [
"bedrock:CreateModelCustomizationJob",
"bedrock:DeleteModelInvocationLoggingConfiguration",
"bedrock:DeleteCustomModel",
"bedrock:CreateProvisionedModelThroughput",
"bedrock:DeleteProvisionedModelThroughput",
"bedrock:UpdateProvisionedModelThroughput"
],
"Resource": "*"
}
]
}
5.2 ECS Task Role — Staging (Sonnet + Haiku)
{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "AllowBedrockInvokeStaging",
"Effect": "Allow",
"Action": [
"bedrock:InvokeModel",
"bedrock:InvokeModelWithResponseStream"
],
"Resource": [
"arn:aws:bedrock:ap-northeast-1::foundation-model/anthropic.claude-3-sonnet-20240229-v1:0",
"arn:aws:bedrock:ap-northeast-1::foundation-model/anthropic.claude-3-haiku-20240307-v1:0"
],
"Condition": {
"StringEquals": {
"aws:PrincipalTag/Environment": "staging"
}
}
},
{
"Sid": "AllowBedrockListModels",
"Effect": "Allow",
"Action": [
"bedrock:ListFoundationModels",
"bedrock:GetFoundationModel"
],
"Resource": "*"
}
]
}
5.3 Data Layer Least Privilege Policies
{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "AllowDynamoDBSessionAccess",
"Effect": "Allow",
"Action": [
"dynamodb:GetItem",
"dynamodb:PutItem",
"dynamodb:Query"
],
"Resource": [
"arn:aws:dynamodb:ap-northeast-1:123456789012:table/manga-sessions",
"arn:aws:dynamodb:ap-northeast-1:123456789012:table/manga-sessions/index/*"
],
"Condition": {
"ForAllValues:StringEquals": {
"dynamodb:LeadingKeys": [
"${cognito-identity.amazonaws.com:sub}"
]
}
}
},
{
"Sid": "AllowDynamoDBProductReadOnly",
"Effect": "Allow",
"Action": [
"dynamodb:GetItem",
"dynamodb:Query"
],
"Resource": [
"arn:aws:dynamodb:ap-northeast-1:123456789012:table/manga-products",
"arn:aws:dynamodb:ap-northeast-1:123456789012:table/manga-products/index/*"
]
},
{
"Sid": "AllowOpenSearchServerlessVectorAccess",
"Effect": "Allow",
"Action": [
"aoss:APIAccessAll"
],
"Resource": [
"arn:aws:aoss:ap-northeast-1:123456789012:collection/manga-vectors"
]
},
{
"Sid": "DenyDynamoDBTableManagement",
"Effect": "Deny",
"Action": [
"dynamodb:CreateTable",
"dynamodb:DeleteTable",
"dynamodb:UpdateTable"
],
"Resource": "*"
}
]
}
6. FM Access Controller
"""
fm_access_controller.py
Central access controller for MangaAssist Foundation Model requests.
Combines identity validation, RBAC evaluation, rate limiting, and
audit logging into a single enforcement point — the gateway through
which every FM call must pass.
"""
import json
import time
import hashlib
import logging
import boto3
from typing import Dict, Optional, Any, Tuple
from dataclasses import dataclass
from datetime import datetime, timezone
from functools import lru_cache
logger = logging.getLogger(__name__)
@dataclass
class FMRequest:
"""Represents a request to invoke a Foundation Model."""
request_id: str
user_id: str
role: str
model_id: str
environment: str
input_tokens: int
max_output_tokens: int
source_ip: str
session_id: str
timestamp: datetime = None
def __post_init__(self):
if self.timestamp is None:
self.timestamp = datetime.now(timezone.utc)
@dataclass
class FMAccessDecision:
"""Result of an access control evaluation."""
allowed: bool
reason: str
request_id: str
evaluated_at: datetime
constraints: Dict[str, Any] = None
audit_trail: Dict[str, Any] = None
class FMAccessController:
"""
Central enforcement point for all Foundation Model access in MangaAssist.
Implements defense-in-depth by layering multiple security checks:
1. Identity validation -- verify the JWT token is valid and not expired
2. Role extraction -- map the authenticated identity to an RBAC role
3. Model permission -- verify the role can access the requested model
4. Rate limit check -- ensure the request does not exceed per-role limits
5. Cost guard -- block requests that would exceed budget thresholds
6. Audit logging -- record every decision for compliance review
Usage:
controller = FMAccessController(rbac_engine, rate_limiter)
decision = controller.evaluate(fm_request)
if decision.allowed:
response = bedrock_client.invoke_model(...)
else:
raise AccessDeniedException(decision.reason)
"""
# Cost per 1M tokens (input/output) by model
MODEL_COSTS = {
"anthropic.claude-3-sonnet-20240229-v1:0": {
"input": 3.00,
"output": 15.00,
},
"anthropic.claude-3-haiku-20240307-v1:0": {
"input": 0.25,
"output": 1.25,
},
}
# Daily budget limits per role (in USD)
ROLE_BUDGET_LIMITS = {
"admin": 500.00,
"developer": 50.00,
"operator": 20.00,
"readonly": 0.00,
}
def __init__(self, rbac_engine, rate_limiter=None, redis_client=None):
self.rbac_engine = rbac_engine
self.rate_limiter = rate_limiter
self.redis_client = redis_client
self.cloudwatch = boto3.client(
"cloudwatch", region_name="ap-northeast-1"
)
self.cloudtrail_logs = boto3.client(
"logs", region_name="ap-northeast-1"
)
def evaluate(self, request: FMRequest) -> FMAccessDecision:
"""
Evaluate an FM access request through all security layers.
Returns an FMAccessDecision with the result and full audit trail.
"""
audit_trail = {
"request_id": request.request_id,
"user_id": request.user_id,
"role": request.role,
"model_id": request.model_id,
"environment": request.environment,
"source_ip": request.source_ip,
"checks": [],
}
# Layer 1: Role validation
role_check = self._validate_role(request)
audit_trail["checks"].append(role_check)
if not role_check["passed"]:
return self._deny(request, role_check["reason"], audit_trail)
# Layer 2: Model permission check (RBAC)
model_check = self._check_model_permission(request)
audit_trail["checks"].append(model_check)
if not model_check["passed"]:
return self._deny(request, model_check["reason"], audit_trail)
# Layer 3: Rate limit check (Redis sliding window)
rate_check = self._check_rate_limit(request)
audit_trail["checks"].append(rate_check)
if not rate_check["passed"]:
return self._deny(request, rate_check["reason"], audit_trail)
# Layer 4: Cost guard check (daily budget)
cost_check = self._check_cost_guard(request)
audit_trail["checks"].append(cost_check)
if not cost_check["passed"]:
return self._deny(request, cost_check["reason"], audit_trail)
# All checks passed
decision = FMAccessDecision(
allowed=True,
reason="All security checks passed",
request_id=request.request_id,
evaluated_at=datetime.now(timezone.utc),
constraints=model_check.get("constraints", {}),
audit_trail=audit_trail,
)
self._log_decision(decision)
self._emit_metrics(decision, request)
return decision
def _validate_role(self, request: FMRequest) -> Dict[str, Any]:
"""Validate that the role claim is recognized and active."""
check = {"name": "role_validation", "passed": False, "reason": ""}
valid_roles = self.rbac_engine.roles.keys()
if request.role not in valid_roles:
check["reason"] = f"Unrecognized role: {request.role}"
return check
check["passed"] = True
check["reason"] = f"Role '{request.role}' is valid"
return check
def _check_model_permission(self, request: FMRequest) -> Dict[str, Any]:
"""Check RBAC permissions for the requested model and environment."""
check = {
"name": "model_permission",
"passed": False,
"reason": "",
"constraints": {},
}
rbac_decision = self.rbac_engine.evaluate_model_access(
role_name=request.role,
model_id=request.model_id,
environment=request.environment,
requested_tokens=request.max_output_tokens,
)
check["passed"] = rbac_decision["allowed"]
check["reason"] = rbac_decision["reason"]
check["constraints"] = rbac_decision.get("constraints", {})
return check
def _check_rate_limit(self, request: FMRequest) -> Dict[str, Any]:
"""Check per-role rate limits using Redis sliding window counter."""
check = {
"name": "rate_limit",
"passed": True,
"reason": "Rate limit check passed",
}
if not self.redis_client:
return check
# Sliding window counter keyed by role + model + minute
window_key = (
f"rate:{request.role}:{request.model_id}:"
f"{int(time.time()) // 60}"
)
current_count = self.redis_client.incr(window_key)
if current_count == 1:
self.redis_client.expire(window_key, 60)
# Look up the RPM limit from the role definition
role_def = self.rbac_engine.roles.get(request.role)
if role_def:
for perm in role_def.model_permissions:
if perm.model_id == request.model_id:
if current_count > perm.max_requests_per_minute:
check["passed"] = False
check["reason"] = (
f"Rate limit exceeded: {current_count}/"
f"{perm.max_requests_per_minute} RPM for "
f"role '{request.role}'"
)
break
return check
def _check_cost_guard(self, request: FMRequest) -> Dict[str, Any]:
"""Check that the request will not exceed daily budget limits."""
check = {
"name": "cost_guard",
"passed": True,
"reason": "Cost guard check passed",
}
budget_limit = self.ROLE_BUDGET_LIMITS.get(request.role, 0.0)
if budget_limit == 0.0 and request.role == "readonly":
check["passed"] = False
check["reason"] = "ReadOnly role has no FM invocation budget"
return check
model_cost = self.MODEL_COSTS.get(request.model_id)
if not model_cost:
return check # Unknown model; let IAM handle it
estimated_cost = (
(request.input_tokens / 1_000_000) * model_cost["input"]
+ (request.max_output_tokens / 1_000_000) * model_cost["output"]
)
if self.redis_client:
today = datetime.now(timezone.utc).strftime("%Y-%m-%d")
cost_key = f"cost:{request.role}:{today}"
current_spend = float(self.redis_client.get(cost_key) or 0)
if current_spend + estimated_cost > budget_limit:
check["passed"] = False
check["reason"] = (
f"Daily budget exceeded: ${current_spend:.2f} + "
f"${estimated_cost:.4f} > ${budget_limit:.2f} limit"
)
return check
# Track cumulative spend
self.redis_client.incrbyfloat(cost_key, estimated_cost)
self.redis_client.expire(cost_key, 86400)
return check
def _deny(
self,
request: FMRequest,
reason: str,
audit_trail: Dict,
) -> FMAccessDecision:
"""Create a denial decision and log it."""
decision = FMAccessDecision(
allowed=False,
reason=reason,
request_id=request.request_id,
evaluated_at=datetime.now(timezone.utc),
audit_trail=audit_trail,
)
self._log_decision(decision)
self._emit_metrics(decision, request)
return decision
def _log_decision(self, decision: FMAccessDecision) -> None:
"""Log the access decision to CloudWatch Logs for audit."""
log_entry = {
"request_id": decision.request_id,
"allowed": decision.allowed,
"reason": decision.reason,
"evaluated_at": decision.evaluated_at.isoformat(),
"audit_trail": decision.audit_trail,
}
logger.info(f"FM Access Decision: {json.dumps(log_entry)}")
def _emit_metrics(
self, decision: FMAccessDecision, request: FMRequest
) -> None:
"""Emit CloudWatch metrics for access control decisions."""
try:
self.cloudwatch.put_metric_data(
Namespace="MangaAssist/Security",
MetricData=[
{
"MetricName": "FMAccessDecision",
"Dimensions": [
{"Name": "Role", "Value": request.role},
{"Name": "Model", "Value": request.model_id},
{
"Name": "Decision",
"Value": (
"Allow" if decision.allowed else "Deny"
),
},
],
"Value": 1,
"Unit": "Count",
"Timestamp": decision.evaluated_at,
},
],
)
except Exception as e:
logger.error(f"Failed to emit security metric: {e}")
7. Identity Federation Manager
"""
identity_federation_manager.py
Manages the complete identity federation lifecycle for MangaAssist,
including token validation, credential exchange, and session management.
"""
import json
import time
import logging
import boto3
from typing import Dict, Optional, Any
from dataclasses import dataclass
from datetime import datetime, timezone, timedelta
logger = logging.getLogger(__name__)
@dataclass
class FederatedIdentity:
"""Represents a validated federated identity with role claims."""
user_id: str
email: str
role: str
department: str
model_access_tier: str
identity_provider: str
session_expiry: datetime
mfa_verified: bool
groups: list
class AuthenticationError(Exception):
"""Raised when authentication or token validation fails."""
pass
class IdentityFederationManager:
"""
Manages identity federation between enterprise identity providers
and AWS services for MangaAssist FM access.
Responsibilities:
- Validate JWT tokens from Cognito
- Extract role claims and map to RBAC roles
- Exchange tokens for temporary AWS credentials
- Manage federated session lifecycle
- Handle token refresh and session extension
Federation flow:
Enterprise IdP -> Cognito User Pool -> Cognito Identity Pool
-> STS AssumeRoleWithWebIdentity -> Scoped AWS Credentials
"""
# Maps enterprise group names to MangaAssist RBAC roles
ROLE_CLAIM_MAPPING = {
"platform-admin": "admin",
"engineering": "developer",
"sre-team": "operator",
"analytics": "readonly",
"manga-ops": "operator",
}
# Maps RBAC roles to model access tiers
MODEL_TIER_MAPPING = {
"admin": "premium",
"developer": "standard",
"operator": "standard",
"readonly": "none",
}
def __init__(
self,
user_pool_id: str,
identity_pool_id: str,
region: str = "ap-northeast-1",
):
self.user_pool_id = user_pool_id
self.identity_pool_id = identity_pool_id
self.region = region
self.cognito_client = boto3.client("cognito-idp", region_name=region)
self.identity_client = boto3.client(
"cognito-identity", region_name=region
)
self.sts_client = boto3.client("sts", region_name=region)
self._jwks_cache = None
self._jwks_cache_time = 0
def validate_token(self, id_token: str) -> FederatedIdentity:
"""
Validate a Cognito ID token and extract federated identity claims.
Performs:
1. JWT signature verification against Cognito JWKS
2. Token expiration check
3. Audience (client_id) validation
4. Issuer validation
5. Role claim extraction and mapping
"""
try:
import jwt as pyjwt
# Decode header to get key ID
unverified_header = pyjwt.get_unverified_header(id_token)
kid = unverified_header.get("kid")
# Retrieve signing key from Cognito JWKS
signing_key = self._get_signing_key(kid)
issuer = (
f"https://cognito-idp.{self.region}.amazonaws.com/"
f"{self.user_pool_id}"
)
# Full verification with signature, expiry, and issuer
claims = pyjwt.decode(
id_token,
signing_key,
algorithms=["RS256"],
issuer=issuer,
options={
"verify_exp": True,
"verify_aud": False, # Cognito uses client_id
"verify_iss": True,
},
)
# Map custom:role to RBAC role, with group-based override
raw_role = claims.get("custom:role", "")
mapped_role = self.ROLE_CLAIM_MAPPING.get(raw_role, "readonly")
groups = claims.get("cognito:groups", [])
for group in groups:
if group in self.ROLE_CLAIM_MAPPING:
candidate = self.ROLE_CLAIM_MAPPING[group]
if self._role_priority(candidate) > self._role_priority(
mapped_role
):
mapped_role = candidate
identity = FederatedIdentity(
user_id=claims.get("sub", ""),
email=claims.get("email", ""),
role=mapped_role,
department=claims.get("custom:department", "unknown"),
model_access_tier=self.MODEL_TIER_MAPPING.get(
mapped_role, "none"
),
identity_provider=(
claims.get("identities", [{}])[0].get(
"providerName", "Cognito"
)
if claims.get("identities")
else "Cognito"
),
session_expiry=datetime.fromtimestamp(
claims.get("exp", 0), tz=timezone.utc
),
mfa_verified="mfa" in claims.get("amr", []),
groups=groups,
)
logger.info(
f"Validated identity: user={identity.user_id}, "
f"role={identity.role}, provider={identity.identity_provider}"
)
return identity
except Exception as e:
if "ExpiredSignature" in str(type(e).__name__):
logger.warning("Token validation failed: token expired")
raise AuthenticationError("Token has expired")
logger.warning(f"Token validation failed: {e}")
raise AuthenticationError(f"Invalid token: {e}")
def exchange_for_credentials(
self, id_token: str, identity: FederatedIdentity
) -> Dict[str, Any]:
"""
Exchange a validated ID token for temporary AWS credentials
scoped to the user's RBAC role.
The Cognito Identity Pool role mapping rules automatically
select the correct IAM role based on the custom:role claim.
"""
provider_key = (
f"cognito-idp.{self.region}.amazonaws.com/{self.user_pool_id}"
)
# Get identity ID from the Identity Pool
identity_response = self.identity_client.get_id(
IdentityPoolId=self.identity_pool_id,
Logins={provider_key: id_token},
)
identity_id = identity_response["IdentityId"]
# Get temporary credentials (role mapping applies automatically)
credentials_response = (
self.identity_client.get_credentials_for_identity(
IdentityId=identity_id,
Logins={provider_key: id_token},
)
)
credentials = credentials_response["Credentials"]
logger.info(
f"Issued temporary credentials for {identity.user_id} "
f"(role={identity.role}, expires={credentials['Expiration']})"
)
return {
"access_key_id": credentials["AccessKeyId"],
"secret_access_key": credentials["SecretKey"],
"session_token": credentials["SessionToken"],
"expiration": credentials["Expiration"].isoformat(),
"identity_id": identity_id,
"role": identity.role,
}
def refresh_session(
self, refresh_token: str, client_id: str
) -> Dict[str, str]:
"""
Refresh an expired session using the Cognito refresh token.
Returns new ID and access tokens without requiring re-authentication.
Refresh tokens are valid for 7 days (configured in the app client).
"""
try:
response = self.cognito_client.initiate_auth(
AuthFlow="REFRESH_TOKEN_AUTH",
AuthParameters={"REFRESH_TOKEN": refresh_token},
ClientId=client_id,
)
result = response["AuthenticationResult"]
logger.info("Session refreshed successfully")
return {
"id_token": result["IdToken"],
"access_token": result["AccessToken"],
"token_type": result["TokenType"],
"expires_in": result["ExpiresIn"],
}
except self.cognito_client.exceptions.NotAuthorizedException:
logger.warning("Refresh token is invalid or expired")
raise AuthenticationError(
"Refresh token expired -- re-authentication required"
)
def _get_signing_key(self, kid: str):
"""
Retrieve the JWT signing key from Cognito JWKS endpoint.
Caches the JWKS for 1 hour to avoid repeated network calls.
"""
import urllib.request
if (
self._jwks_cache is None
or time.time() - self._jwks_cache_time > 3600
):
jwks_url = (
f"https://cognito-idp.{self.region}.amazonaws.com/"
f"{self.user_pool_id}/.well-known/jwks.json"
)
with urllib.request.urlopen(jwks_url) as response:
self._jwks_cache = json.loads(response.read())
self._jwks_cache_time = time.time()
for key in self._jwks_cache.get("keys", []):
if key["kid"] == kid:
from jwt.algorithms import RSAAlgorithm
return RSAAlgorithm.from_jwk(json.dumps(key))
raise AuthenticationError(f"Signing key not found for kid: {kid}")
@staticmethod
def _role_priority(role: str) -> int:
"""Return numeric priority for role comparison (higher = more privileged)."""
priorities = {
"readonly": 0,
"operator": 1,
"developer": 2,
"admin": 3,
}
return priorities.get(role, -1)
8. VPC Endpoint and Network Security Configuration
8.1 Bedrock VPC Endpoint
To ensure FM API calls never traverse the public internet, MangaAssist uses VPC endpoints (AWS PrivateLink) for all Bedrock traffic.
{
"VPCEndpoint": {
"ServiceName": "com.amazonaws.ap-northeast-1.bedrock-runtime",
"VpcId": "vpc-manga-prod-01",
"SubnetIds": [
"subnet-private-az1",
"subnet-private-az2"
],
"SecurityGroupIds": [
"sg-bedrock-vpce"
],
"PrivateDnsEnabled": true,
"PolicyDocument": {
"Version": "2012-10-17",
"Statement": [
{
"Sid": "AllowECSTasksOnly",
"Effect": "Allow",
"Principal": {
"AWS": "arn:aws:iam::123456789012:role/manga-ecs-task-role"
},
"Action": [
"bedrock:InvokeModel",
"bedrock:InvokeModelWithResponseStream"
],
"Resource": [
"arn:aws:bedrock:ap-northeast-1::foundation-model/anthropic.claude-3-haiku-20240307-v1:0",
"arn:aws:bedrock:ap-northeast-1::foundation-model/anthropic.claude-3-sonnet-20240229-v1:0"
]
}
]
}
}
}
8.2 Security Group for Bedrock VPC Endpoint
{
"SecurityGroup": {
"GroupId": "sg-bedrock-vpce",
"GroupName": "manga-bedrock-vpce-sg",
"Description": "Security group for Bedrock VPC Endpoint -- allows only ECS tasks",
"IngressRules": [
{
"Protocol": "tcp",
"FromPort": 443,
"ToPort": 443,
"SourceSecurityGroup": "sg-manga-ecs-tasks",
"Description": "Allow HTTPS from ECS Fargate tasks only"
}
],
"EgressRules": []
}
}
9. API Gateway Security Configuration
9.1 WebSocket API with Lambda Authorizer and WAF
{
"ApiGateway": {
"Name": "MangaAssistWebSocketAPI",
"ProtocolType": "WEBSOCKET",
"RouteSelectionExpression": "$request.body.action",
"Authorizer": {
"AuthorizerType": "REQUEST",
"AuthorizerUri": "arn:aws:lambda:ap-northeast-1:123456789012:function:manga-ws-authorizer",
"IdentitySource": "route.request.header.Authorization",
"AuthorizerResultTtlInSeconds": 300
},
"WAFAssociation": {
"WebACLArn": "arn:aws:wafv2:ap-northeast-1:123456789012:regional/webacl/manga-api-waf",
"Rules": [
{
"Name": "RateLimit",
"Priority": 1,
"Statement": {
"RateBasedStatement": {
"Limit": 2000,
"AggregateKeyType": "IP"
}
},
"Action": "Block"
},
{
"Name": "GeoRestriction",
"Priority": 2,
"Statement": {
"GeoMatchStatement": {
"CountryCodes": ["JP", "US"]
}
},
"Action": "Allow"
},
{
"Name": "SQLInjectionProtection",
"Priority": 3,
"Statement": {
"SqliMatchStatement": {
"FieldToMatch": {"Body": {}},
"TextTransformations": [
{"Priority": 0, "Type": "URL_DECODE"}
]
}
},
"Action": "Block"
}
]
},
"ThrottlingSettings": {
"BurstLimit": 500,
"RateLimit": 1000
},
"Logging": {
"AccessLogDestination": "arn:aws:logs:ap-northeast-1:123456789012:log-group:/aws/apigateway/manga-websocket",
"Format": "$context.identity.sourceIp $context.identity.caller $context.requestTime $context.routeKey $context.status $context.requestId"
}
}
}
10. Security Layer Summary
| Security Layer | Component | Purpose | MangaAssist Implementation |
|---|---|---|---|
| Identity | Cognito User Pool | Authenticate users | SAML (AD), OIDC (LINE), email/password |
| Federation | Cognito Identity Pool | Map to AWS credentials | Role-based credential exchange via STS |
| Authorization | RBAC Policy Engine | Enforce model permissions | 4-tier role hierarchy (admin/dev/ops/readonly) |
| Least Privilege | IAM Policies | Restrict API actions | Per-model, per-environment policies |
| Network | VPC Endpoints | Private connectivity | PrivateLink for Bedrock, OpenSearch, DynamoDB |
| API Security | API Gateway + WAF | Protect entry points | Rate limiting, geo-restriction, SQLi protection |
| Encryption | KMS + TLS 1.3 | Protect data | At-rest and in-transit encryption |
| Monitoring | CloudTrail + GuardDuty | Detect threats | Centralized audit logging |
| Compliance | Security Hub | Continuous compliance | Automated security posture assessment |
| Cost Control | FM Access Controller | Prevent budget overrun | Per-role daily budget limits with Redis tracking |
Key Takeaways
-
Identity Federation bridges enterprise identity providers (Active Directory, social logins) with AWS FM services through Cognito, eliminating the need for separate credentials and enabling centralized governance.
-
RBAC for Models goes beyond simple allow/deny -- it enforces per-model, per-environment, per-token-count restrictions, ensuring developers cannot accidentally invoke premium models in production (saving ~$4,125/day at scale).
-
Least Privilege means every IAM policy is scoped to the minimum actions, resources, and conditions needed. The production ECS task role cannot list models, create jobs, or access Sonnet -- only invoke Haiku through the VPC endpoint.
-
Defense in Depth layers identity, RBAC, network, and API security so that a compromise at any single layer does not grant FM access. The FMAccessController enforces all layers in a single code path.
-
Cost Guards integrated into the access controller prevent runaway costs from unauthorized or excessive model invocations, critical at MangaAssist's 1M messages/day scale where an unrestricted Sonnet deployment would cost ~$4,500/day versus ~$375/day with Haiku.