"""
MangaAssist Identity Federation
Cognito User Pool management with LINE, Google, and Apple social providers.
Cross-account access patterns and fine-grained RBAC.
"""
import json
import time
import logging
from typing import Any, Optional
from dataclasses import dataclass, field
from enum import Enum
import boto3
from botocore.exceptions import ClientError
logger = logging.getLogger("manga_assist.identity")
logger.setLevel(logging.INFO)
# ---------------------------------------------------------------------------
# Constants
# ---------------------------------------------------------------------------
REGION = "ap-northeast-1" # Tokyo for JP users
ACCOUNT_ID = "123456789012"
PREFIX = "manga-assist"
# ---------------------------------------------------------------------------
# Social Identity Provider Configuration
# ---------------------------------------------------------------------------
class SocialProviderManager:
"""
Configures social identity providers (LINE, Google, Apple)
in the Cognito User Pool for JP manga store users.
"""
def __init__(self, user_pool_id: str, region: str = REGION):
self._cognito = boto3.client("cognito-idp", region_name=region)
self.user_pool_id = user_pool_id
def configure_line_provider(
self,
channel_id: str,
channel_secret_arn: str,
) -> dict:
"""
Configure LINE Login as an OIDC identity provider.
LINE is the primary social login for Japanese users.
"""
# Retrieve channel secret from Secrets Manager
secrets = boto3.client("secretsmanager", region_name=REGION)
secret_value = secrets.get_secret_value(SecretId=channel_secret_arn)
channel_secret = json.loads(secret_value["SecretString"])["secret"]
return self._cognito.create_identity_provider(
UserPoolId=self.user_pool_id,
ProviderName="LINE",
ProviderType="OIDC",
ProviderDetails={
"client_id": channel_id,
"client_secret": channel_secret,
"authorize_scopes": "openid profile email",
"oidc_issuer": "https://access.line.me",
"attributes_request_method": "GET",
"authorize_url": "https://access.line.me/oauth2/v2.1/authorize",
"token_url": "https://api.line.me/oauth2/v2.1/token",
"attributes_url": "https://api.line.me/v2/profile",
"jwks_uri": "https://api.line.me/oauth2/v2.1/certs",
},
AttributeMapping={
"email": "email",
"name": "displayName",
"picture": "pictureUrl",
"username": "sub",
},
IdpIdentifiers=["line.me"],
)
def configure_google_provider(
self,
client_id: str,
client_secret_arn: str,
) -> dict:
"""Configure Google as a social identity provider."""
secrets = boto3.client("secretsmanager", region_name=REGION)
secret_value = secrets.get_secret_value(SecretId=client_secret_arn)
client_secret = json.loads(secret_value["SecretString"])["secret"]
return self._cognito.create_identity_provider(
UserPoolId=self.user_pool_id,
ProviderName="Google",
ProviderType="Google",
ProviderDetails={
"client_id": client_id,
"client_secret": client_secret,
"authorize_scopes": "openid email profile",
},
AttributeMapping={
"email": "email",
"name": "name",
"picture": "picture",
"username": "sub",
},
)
def configure_apple_provider(
self,
services_id: str,
team_id: str,
key_id: str,
private_key_arn: str,
) -> dict:
"""Configure Apple Sign-In for iOS users."""
secrets = boto3.client("secretsmanager", region_name=REGION)
secret_value = secrets.get_secret_value(SecretId=private_key_arn)
private_key = secret_value["SecretString"]
return self._cognito.create_identity_provider(
UserPoolId=self.user_pool_id,
ProviderName="SignInWithApple",
ProviderType="SignInWithApple",
ProviderDetails={
"client_id": services_id,
"team_id": team_id,
"key_id": key_id,
"private_key": private_key,
"authorize_scopes": "email name",
},
AttributeMapping={
"email": "email",
"name": "firstName",
"username": "sub",
},
)
# ---------------------------------------------------------------------------
# Lambda Triggers for Cognito
# ---------------------------------------------------------------------------
class CognitoLambdaTriggers:
"""
Lambda trigger implementations for Cognito User Pool events.
These customize the authentication and registration flow.
"""
@staticmethod
def pre_sign_up_handler(event: dict, context: Any) -> dict:
"""
Pre Sign-Up trigger: validate user attributes before registration.
- Auto-confirm email-verified social provider users
- Validate locale preferences
- Block disposable email domains
"""
trigger_source = event.get("triggerSource", "")
user_attrs = event["request"].get("userAttributes", {})
# Auto-confirm users from trusted social providers
if trigger_source in (
"PreSignUp_ExternalProvider",
"PreSignUp_AdminCreateUser",
):
event["response"]["autoConfirmUser"] = True
if user_attrs.get("email"):
event["response"]["autoVerifyEmail"] = True
logger.info(
"Auto-confirmed social provider user",
extra={"provider": trigger_source},
)
# Validate email domain -- block disposable domains
email = user_attrs.get("email", "")
blocked_domains = {
"tempmail.com", "throwaway.email",
"guerrillamail.com", "10minutemail.com",
}
domain = email.split("@")[-1].lower() if "@" in email else ""
if domain in blocked_domains:
raise Exception("Registration not allowed with disposable email")
return event
@staticmethod
def post_confirmation_handler(event: dict, context: Any) -> dict:
"""
Post Confirmation trigger: create user profile in DynamoDB
after successful registration.
"""
user_attrs = event["request"].get("userAttributes", {})
username = event["userName"]
dynamodb = boto3.resource("dynamodb", region_name=REGION)
table = dynamodb.Table(f"{PREFIX}-users")
try:
table.put_item(
Item={
"pk": f"USER#{username}",
"sk": "PROFILE",
"email": user_attrs.get("email", ""),
"locale": user_attrs.get("locale", "ja-JP"),
"display_name": user_attrs.get("name", username),
"tier": "basic",
"manga_preferences": [],
"created_at": int(time.time()),
"updated_at": int(time.time()),
"status": "active",
},
ConditionExpression="attribute_not_exists(pk)",
)
logger.info(f"Created profile for user: {username}")
except ClientError as e:
if e.response["Error"]["Code"] != "ConditionalCheckFailedException":
raise
logger.info(f"Profile already exists for user: {username}")
return event
@staticmethod
def pre_token_generation_handler(event: dict, context: Any) -> dict:
"""
Pre Token Generation trigger: add custom claims to the JWT.
Injects user tier and manga preferences into the ID token.
"""
username = event["userName"]
dynamodb = boto3.resource("dynamodb", region_name=REGION)
table = dynamodb.Table(f"{PREFIX}-users")
try:
response = table.get_item(
Key={"pk": f"USER#{username}", "sk": "PROFILE"}
)
profile = response.get("Item", {})
except ClientError:
profile = {}
# Add custom claims to ID token
event["response"]["claimsOverrideDetails"] = {
"claimsToAddOrOverride": {
"custom:tier": profile.get("tier", "basic"),
"custom:locale": profile.get("locale", "ja-JP"),
"custom:preferences": json.dumps(
profile.get("manga_preferences", [])[:10]
),
},
}
return event
# ---------------------------------------------------------------------------
# Cross-Account Access Manager
# ---------------------------------------------------------------------------
class CrossAccountAccessManager:
"""
Manages cross-account access patterns for MangaAssist.
Supports scenarios like:
- Shared Bedrock models in a central AI account
- Cross-account DynamoDB access for analytics
- Centralized logging to a security account
"""
def __init__(
self,
source_account: str,
target_account: str,
region: str = REGION,
):
self.source_account = source_account
self.target_account = target_account
self.region = region
self._iam = boto3.client("iam", region_name=region)
self._sts = boto3.client("sts", region_name=region)
def create_cross_account_role_trust(
self,
role_name: str,
external_id: str,
max_session_duration: int = 3600,
) -> dict:
"""
Create a trust policy for cross-account role assumption.
Uses external ID to prevent confused deputy attacks.
"""
trust_policy = {
"Version": "2012-10-17",
"Statement": [
{
"Sid": "AllowCrossAccountAssume",
"Effect": "Allow",
"Principal": {
"AWS": f"arn:aws:iam::{self.source_account}:root"
},
"Action": "sts:AssumeRole",
"Condition": {
"StringEquals": {
"sts:ExternalId": external_id,
"aws:PrincipalTag/Project": "manga-assist",
},
"Bool": {
"aws:MultiFactorAuthPresent": "true"
},
},
}
],
}
return {
"RoleName": role_name,
"AssumeRolePolicyDocument": json.dumps(trust_policy),
"MaxSessionDuration": max_session_duration,
"Tags": [
{"Key": "Project", "Value": "manga-assist"},
{"Key": "CrossAccount", "Value": "true"},
],
}
def assume_cross_account_role(
self,
role_arn: str,
external_id: str,
session_name: str = "manga-assist-session",
duration_seconds: int = 900,
) -> dict:
"""
Assume a cross-account role and return temporary credentials.
Used when the MangaAssist orchestrator needs to access
resources in another AWS account.
"""
try:
response = self._sts.assume_role(
RoleArn=role_arn,
ExternalId=external_id,
RoleSessionName=session_name,
DurationSeconds=duration_seconds,
Tags=[
{"Key": "Project", "Value": "manga-assist"},
{"Key": "RequestSource", "Value": "ecs-orchestrator"},
],
)
credentials = response["Credentials"]
logger.info(
"Cross-account role assumed successfully",
extra={
"role_arn": role_arn,
"expiration": str(credentials["Expiration"]),
},
)
return {
"access_key": credentials["AccessKeyId"],
"secret_key": credentials["SecretAccessKey"],
"session_token": credentials["SessionToken"],
"expiration": credentials["Expiration"].isoformat(),
}
except ClientError as e:
error_code = e.response["Error"]["Code"]
logger.error(
f"Cross-account assume failed: {error_code}",
extra={"role_arn": role_arn},
)
raise
def build_bedrock_cross_account_policy(self) -> dict:
"""
IAM policy allowing cross-account Bedrock invocation.
Attached to the role in the source account.
"""
return {
"Version": "2012-10-17",
"Statement": [
{
"Sid": "AllowAssumeBedrockRole",
"Effect": "Allow",
"Action": "sts:AssumeRole",
"Resource": (
f"arn:aws:iam::{self.target_account}:role/"
f"{PREFIX}-bedrock-invoker"
),
"Condition": {
"StringEquals": {
"sts:ExternalId": f"{PREFIX}-cross-account"
}
},
},
{
"Sid": "DenyRoleChaining",
"Effect": "Deny",
"Action": "sts:AssumeRole",
"Resource": "*",
"Condition": {
"StringNotEquals": {
"aws:PrincipalAccount": self.source_account
}
},
},
],
}
# ---------------------------------------------------------------------------
# Fine-Grained Permission Engine
# ---------------------------------------------------------------------------
class FineGrainedPermissionEngine:
"""
Evaluates fine-grained permissions based on user attributes,
resource tags, and contextual conditions.
"""
@dataclass
class PermissionContext:
user_id: str
tier: str
cognito_groups: list[str]
ip_address: str
request_time: int
user_agent: str
locale: str = "ja-JP"
@dataclass
class PermissionDecision:
allowed: bool
reason: str
model_alias: Optional[str] = None
max_tokens: int = 0
rate_limit_remaining: int = 0
def evaluate(
self,
ctx: "FineGrainedPermissionEngine.PermissionContext",
action: str,
resource: str,
) -> "FineGrainedPermissionEngine.PermissionDecision":
"""
Evaluate whether the action is permitted given the context.
Applies rules in order: explicit deny, conditional checks, allow.
"""
# Rule 1: Check IP allowlist for admin actions
if action.startswith("admin:") and not self._is_admin_ip(ctx.ip_address):
return self.PermissionDecision(
allowed=False,
reason="Admin actions require allowlisted IP",
)
# Rule 2: Time-based restriction -- no admin actions outside JP business hours
if action.startswith("admin:") and not self._in_business_hours(ctx.request_time):
return self.PermissionDecision(
allowed=False,
reason="Admin actions restricted to JP business hours (9-18 JST)",
)
# Rule 3: Model access based on tier
if action == "bedrock:InvokeModel":
return self._evaluate_model_access(ctx, resource)
# Rule 4: Data access based on tier and locale
if action.startswith("dynamodb:"):
return self._evaluate_data_access(ctx, action, resource)
# Default: allow if group membership matches
required_group = self._action_to_group(action)
if required_group and required_group not in ctx.cognito_groups:
return self.PermissionDecision(
allowed=False,
reason=f"Requires group membership: {required_group}",
)
return self.PermissionDecision(allowed=True, reason="Permitted")
def _evaluate_model_access(
self,
ctx: "FineGrainedPermissionEngine.PermissionContext",
model_alias: str,
) -> "FineGrainedPermissionEngine.PermissionDecision":
"""Evaluate model-specific access rules."""
tier_rules = {
"guest": {
"allowed_models": ["haiku"],
"max_tokens": 256,
"daily_limit": 50,
},
"basic": {
"allowed_models": ["haiku", "sonnet"],
"max_tokens": 1024,
"daily_limit": 200,
},
"premium": {
"allowed_models": ["haiku", "sonnet"],
"max_tokens": 4096,
"daily_limit": 1000,
},
"admin": {
"allowed_models": ["haiku", "sonnet"],
"max_tokens": 8192,
"daily_limit": -1, # Unlimited
},
}
rules = tier_rules.get(ctx.tier, tier_rules["guest"])
if model_alias not in rules["allowed_models"]:
return self.PermissionDecision(
allowed=False,
reason=f"Tier '{ctx.tier}' cannot access model '{model_alias}'",
model_alias=model_alias,
)
return self.PermissionDecision(
allowed=True,
reason="Model access granted",
model_alias=model_alias,
max_tokens=rules["max_tokens"],
rate_limit_remaining=rules["daily_limit"],
)
def _evaluate_data_access(
self,
ctx: "FineGrainedPermissionEngine.PermissionContext",
action: str,
resource: str,
) -> "FineGrainedPermissionEngine.PermissionDecision":
"""Evaluate DynamoDB data access with item-level scoping."""
# Guests can only read public catalog data
if ctx.tier == "guest":
if action not in ("dynamodb:GetItem", "dynamodb:Query"):
return self.PermissionDecision(
allowed=False,
reason="Guest tier has read-only catalog access",
)
if "sessions" in resource or "users" in resource:
return self.PermissionDecision(
allowed=False,
reason="Guest tier cannot access session or user data",
)
# Non-admin users can only access their own session data
if "sessions" in resource and ctx.tier != "admin":
if not resource.endswith(ctx.user_id):
return self.PermissionDecision(
allowed=False,
reason="Users can only access their own session data",
)
return self.PermissionDecision(allowed=True, reason="Data access granted")
def _is_admin_ip(self, ip: str) -> bool:
"""Check if IP is in the admin allowlist (office IPs)."""
admin_cidrs = [
"10.0.0.0/8", # Internal VPC
"203.0.113.0/24", # Tokyo office
"198.51.100.0/24", # Osaka office
]
# Simplified check; production uses ipaddress module
return ip.startswith("10.") or ip.startswith("203.0.113.") or ip.startswith("198.51.100.")
def _in_business_hours(self, epoch_time: int) -> bool:
"""Check if request is within JP business hours (9-18 JST)."""
import datetime
import zoneinfo
jst = zoneinfo.ZoneInfo("Asia/Tokyo")
dt = datetime.datetime.fromtimestamp(epoch_time, tz=jst)
return 9 <= dt.hour < 18 and dt.weekday() < 5
def _action_to_group(self, action: str) -> Optional[str]:
group_map = {
"admin:": "manga-admins",
"manage:": "manga-admins",
"analytics:": "manga-premium",
}
for prefix, group in group_map.items():
if action.startswith(prefix):
return group
return None
# ---------------------------------------------------------------------------
# Custom Lambda Authorizer (Enhanced)
# ---------------------------------------------------------------------------
class EnhancedLambdaAuthorizer:
"""
Enhanced Lambda authorizer that combines Cognito token validation
with fine-grained permission evaluation and rate limiting.
"""
def __init__(
self,
user_pool_id: str,
client_id: str,
region: str = REGION,
):
self.user_pool_id = user_pool_id
self.client_id = client_id
self.region = region
self._cognito = boto3.client("cognito-idp", region_name=region)
self._dynamodb = boto3.resource("dynamodb", region_name=region)
self._rate_table = self._dynamodb.Table(f"{PREFIX}-rate-limits")
self._permission_engine = FineGrainedPermissionEngine()
def handler(self, event: dict, context: Any) -> dict:
"""Main Lambda authorizer handler."""
request_context = event.get("requestContext", {})
route_key = request_context.get("routeKey", "$default")
source_ip = (
request_context.get("identity", {}).get("sourceIp", "unknown")
)
# Extract token
token = self._extract_token(event)
if not token:
logger.warning("Missing authentication token")
raise Exception("Unauthorized")
# Validate token and get user info
user_info = self._validate_and_enrich(token)
if not user_info:
raise Exception("Unauthorized")
# Check rate limit
if not self._check_rate_limit(user_info["user_id"], user_info["tier"]):
logger.warning(
"Rate limit exceeded",
extra={"user_id": user_info["user_id"]},
)
# Return policy that denies access with rate limit context
return self._build_deny_policy(
user_info["user_id"],
event.get("methodArn", ""),
reason="RATE_LIMIT_EXCEEDED",
)
# Evaluate fine-grained permissions for the route
perm_ctx = FineGrainedPermissionEngine.PermissionContext(
user_id=user_info["user_id"],
tier=user_info["tier"],
cognito_groups=user_info["groups"],
ip_address=source_ip,
request_time=int(time.time()),
user_agent=event.get("headers", {}).get("User-Agent", ""),
locale=user_info.get("locale", "ja-JP"),
)
action = self._route_to_action(route_key)
decision = self._permission_engine.evaluate(
perm_ctx, action, route_key
)
if not decision.allowed:
logger.warning(
f"Permission denied: {decision.reason}",
extra={"user_id": user_info["user_id"], "route": route_key},
)
return self._build_deny_policy(
user_info["user_id"],
event.get("methodArn", ""),
reason=decision.reason,
)
return self._build_allow_policy(user_info, event.get("methodArn", ""))
def _extract_token(self, event: dict) -> Optional[str]:
qs = event.get("queryStringParameters") or {}
token = qs.get("token")
if token:
return token
headers = event.get("headers") or {}
auth = headers.get("Authorization", "")
if auth.startswith("Bearer "):
return auth[7:]
return None
def _validate_and_enrich(self, token: str) -> Optional[dict]:
"""Validate Cognito token and enrich with DynamoDB profile."""
try:
user = self._cognito.get_user(AccessToken=token)
username = user["Username"]
attrs = {
a["Name"]: a["Value"]
for a in user.get("UserAttributes", [])
}
# Fetch groups
groups_resp = self._cognito.admin_list_groups_for_user(
Username=username,
UserPoolId=self.user_pool_id,
)
groups = [g["GroupName"] for g in groups_resp.get("Groups", [])]
# Determine tier from groups
tier = "guest"
for group, t in [
("manga-admins", "admin"),
("manga-premium", "premium"),
("manga-basic", "basic"),
]:
if group in groups:
tier = t
break
return {
"user_id": username,
"email": attrs.get("email", ""),
"tier": tier,
"groups": groups,
"locale": attrs.get("locale", "ja-JP"),
"mfa_verified": attrs.get("phone_number_verified") == "true",
}
except ClientError as e:
logger.warning(f"Token validation failed: {e.response['Error']['Code']}")
return None
def _check_rate_limit(self, user_id: str, tier: str) -> bool:
"""Check and increment rate limit counter in DynamoDB."""
limits = {
"guest": 10, # 10 requests per minute
"basic": 30, # 30 requests per minute
"premium": 100, # 100 requests per minute
"admin": 500, # 500 requests per minute
}
limit = limits.get(tier, 10)
window = int(time.time() // 60) # 1-minute window
try:
response = self._rate_table.update_item(
Key={
"pk": f"RATE#{user_id}",
"sk": f"WINDOW#{window}",
},
UpdateExpression=(
"SET #count = if_not_exists(#count, :zero) + :one, "
"#ttl = :ttl"
),
ExpressionAttributeNames={
"#count": "request_count",
"#ttl": "ttl",
},
ExpressionAttributeValues={
":zero": 0,
":one": 1,
":ttl": int(time.time()) + 120, # 2 minutes TTL
},
ReturnValues="UPDATED_NEW",
)
count = int(response["Attributes"]["request_count"])
return count <= limit
except ClientError:
logger.exception("Rate limit check failed -- allowing request")
return True # Fail open to avoid blocking legitimate users
def _route_to_action(self, route_key: str) -> str:
"""Map WebSocket route to permission action."""
route_actions = {
"$connect": "connect",
"$disconnect": "disconnect",
"$default": "chat:message",
"chat/browse": "chat:browse",
"chat/search": "chat:search",
"chat/recommend": "chat:recommend",
"chat/history": "chat:history",
"chat/admin": "admin:chat",
"analytics/usage": "analytics:usage",
}
return route_actions.get(route_key, "chat:message")
def _build_allow_policy(self, user_info: dict, method_arn: str) -> dict:
return {
"principalId": user_info["user_id"],
"policyDocument": {
"Version": "2012-10-17",
"Statement": [
{
"Action": "execute-api:Invoke",
"Effect": "Allow",
"Resource": method_arn,
}
],
},
"context": {
"userId": user_info["user_id"],
"tier": user_info["tier"],
"groups": ",".join(user_info["groups"]),
"locale": user_info.get("locale", "ja-JP"),
"mfaVerified": str(user_info.get("mfa_verified", False)).lower(),
},
}
def _build_deny_policy(
self, principal: str, method_arn: str, reason: str = ""
) -> dict:
return {
"principalId": principal,
"policyDocument": {
"Version": "2012-10-17",
"Statement": [
{
"Action": "execute-api:Invoke",
"Effect": "Deny",
"Resource": method_arn,
}
],
},
"context": {"denyReason": reason},
}
# ---------------------------------------------------------------------------
# Permission Boundary for Developer Roles
# ---------------------------------------------------------------------------
class PermissionBoundaryBuilder:
"""
Creates permission boundaries that cap the maximum permissions
any MangaAssist role can have, even if the attached policy
is overly broad.
"""
def __init__(self, account_id: str, region: str = REGION):
self.account_id = account_id
self.region = region
def developer_boundary(self) -> dict:
"""
Permission boundary for developer roles.
Prevents developers from escalating privileges, modifying
security resources, or accessing production secrets.
"""
return {
"Version": "2012-10-17",
"Statement": [
{
"Sid": "AllowDevelopmentActions",
"Effect": "Allow",
"Action": [
"bedrock:InvokeModel",
"bedrock:InvokeModelWithResponseStream",
"dynamodb:GetItem",
"dynamodb:Query",
"dynamodb:PutItem",
"dynamodb:UpdateItem",
"dynamodb:DeleteItem",
"es:ESHttp*",
"logs:*",
"cloudwatch:PutMetricData",
"cloudwatch:GetMetric*",
"xray:PutTelemetryRecords",
"xray:PutTraceSegments",
"s3:GetObject",
"s3:PutObject",
"s3:ListBucket",
],
"Resource": "*",
},
{
"Sid": "DenySecurityModification",
"Effect": "Deny",
"Action": [
"iam:CreateUser",
"iam:CreateRole",
"iam:AttachRolePolicy",
"iam:PutRolePolicy",
"iam:DeleteRolePolicy",
"iam:CreatePolicy",
"iam:DeletePolicy",
"iam:PassRole",
"organizations:*",
"account:*",
],
"Resource": "*",
},
{
"Sid": "DenyProductionSecrets",
"Effect": "Deny",
"Action": [
"secretsmanager:GetSecretValue",
"secretsmanager:DeleteSecret",
"secretsmanager:UpdateSecret",
],
"Resource": [
f"arn:aws:secretsmanager:{self.region}:{self.account_id}:"
f"secret:{PREFIX}/prod/*"
],
},
{
"Sid": "DenyKMSKeyDeletion",
"Effect": "Deny",
"Action": [
"kms:ScheduleKeyDeletion",
"kms:DisableKey",
],
"Resource": "*",
},
],
}
def cicd_boundary(self) -> dict:
"""
Permission boundary for CI/CD pipeline roles.
Allows deployment actions but prevents IAM and security changes.
"""
return {
"Version": "2012-10-17",
"Statement": [
{
"Sid": "AllowDeploymentActions",
"Effect": "Allow",
"Action": [
"ecs:UpdateService",
"ecs:RegisterTaskDefinition",
"ecs:DescribeServices",
"ecs:DescribeTaskDefinition",
"ecr:GetAuthorizationToken",
"ecr:BatchCheckLayerAvailability",
"ecr:PutImage",
"ecr:InitiateLayerUpload",
"ecr:UploadLayerPart",
"ecr:CompleteLayerUpload",
"lambda:UpdateFunctionCode",
"lambda:UpdateFunctionConfiguration",
"s3:PutObject",
"s3:GetObject",
"cloudformation:*",
"codebuild:*",
"codepipeline:*",
],
"Resource": "*",
},
{
"Sid": "DenyIAMEscalation",
"Effect": "Deny",
"Action": [
"iam:CreateUser",
"iam:CreateRole",
"iam:AttachRolePolicy",
"iam:PutRolePolicy",
"iam:CreatePolicy",
],
"Resource": "*",
},
],
}