LOCAL PREVIEW View on GitHub

Skill 2.1.5 - Human-in-the-Loop Architecture for Collaborative AI Systems

MangaAssist context: JP Manga store chatbot on AWS — Bedrock Claude 3 (Sonnet at $3/$15 per 1M tokens input/output, Haiku at $0.25/$1.25), OpenSearch Serverless (vector store), DynamoDB (sessions/products), ECS Fargate (orchestrator), API Gateway WebSocket, ElastiCache Redis. Target: useful answer in under 3 seconds, 1M messages/day scale.


Mind Map: Collaborative AI Systems with Human Expertise

Collaborative AI Systems (Skill 2.1.5)
│
├── Human-in-the-Loop Patterns
│   ├── Review Before Publish
│   │   ├── AI generates content → human curator approves
│   │   ├── Confidence-threshold gating (auto-publish above 0.95)
│   │   ├── Batch review queues for efficiency
│   │   └── Priority routing based on content sensitivity
│   │
│   ├── Approval Workflows
│   │   ├── Single-approver (manga description review)
│   │   ├── Multi-approver (content moderation escalation)
│   │   ├── Hierarchical approval (junior → senior curator)
│   │   └── Consensus-based (majority vote for edge cases)
│   │
│   └── Escalation Patterns
│       ├── Low-confidence AI response → human agent
│       ├── User-initiated escalation ("talk to a human")
│       ├── Content-sensitivity escalation (NSFW detection)
│       └── Repeated negative feedback → human review trigger
│
├── Feedback Collection Mechanisms
│   ├── Thumbs Up/Down
│   │   ├── Binary signal for quick assessment
│   │   ├── Aggregated quality score per response type
│   │   └── Low friction → high participation rate
│   │
│   ├── Detailed Ratings
│   │   ├── Multi-dimension (accuracy, helpfulness, tone)
│   │   ├── Star ratings (1-5) for recommendations
│   │   └── Comparative feedback ("was this better?")
│   │
│   └── Correction Input
│       ├── Free-text corrections by users
│       ├── Curator-provided ground truth
│       ├── Structured correction forms
│       └── Before/after comparison tracking
│
├── Human Augmentation Patterns
│   ├── Curator Knowledge Injection
│   │   ├── Manga genre expertise → prompt enrichment
│   │   ├── Seasonal trend annotations
│   │   ├── Author/artist relationship graphs
│   │   └── Cultural context for JP → EN translations
│   │
│   ├── Expert Review of Recommendations
│   │   ├── Curated "staff picks" override lists
│   │   ├── Quality scoring of AI recommendations
│   │   ├── Bias correction (diversity of genres)
│   │   └── New release prioritization input
│   │
│   └── Human-AI Teaming Models
│       ├── AI drafts → human refines
│       ├── Human sets guardrails → AI executes
│       ├── Parallel (AI + human) with merge
│       └── Human coaches AI via feedback loops
│
├── Step Functions Orchestration
│   ├── Wait-for-Callback Pattern
│   │   ├── Task token issued → human reviewer receives
│   │   ├── SendTaskSuccess / SendTaskFailure
│   │   ├── Heartbeat timeout for stale reviews
│   │   └── Token expiry handling (re-queue or auto-approve)
│   │
│   ├── Activity Tasks
│   │   ├── Long-running human review activities
│   │   ├── Worker polling for review items
│   │   ├── Activity heartbeat for progress tracking
│   │   └── Timeout and retry configuration
│   │
│   └── Orchestration Patterns
│       ├── Sequential review → approval → publish
│       ├── Parallel reviews with aggregation
│       ├── Map state for batch content review
│       └── Choice state for confidence-based routing
│
└── API Gateway Feedback Endpoints
    ├── REST Endpoints
    │   ├── POST /feedback (submit rating)
    │   ├── POST /feedback/correction (submit correction)
    │   ├── GET /review-queue (curator dashboard)
    │   └── POST /review/{id}/decision (approve/reject)
    │
    ├── WebSocket Integration
    │   ├── Real-time feedback during conversation
    │   ├── Curator notification for pending reviews
    │   └── Live approval status updates
    │
    └── Security & Throttling
        ├── API key per curator role
        ├── Rate limiting on feedback submission
        ├── Input validation and sanitization
        └── Audit trail for all human decisions

Architecture Diagram: MangaAssist Human Review Workflow

┌─────────────────────────────────────────────────────────────────────────────────┐
│                    MangaAssist Human-in-the-Loop Architecture                   │
│                                                                                 │
│  ┌──────────────┐     ┌──────────────────┐     ┌──────────────────────────┐     │
│  │   User Chat   │────▶│  ECS Fargate     │────▶│  Bedrock Claude 3       │     │
│  │  (WebSocket)  │     │  Orchestrator    │     │  (Sonnet / Haiku)       │     │
│  └──────────────┘     └────────┬─────────┘     └────────────┬─────────────┘     │
│                                │                             │                   │
│                    ┌───────────▼───────────┐                 │                   │
│                    │  Confidence Score     │                 │                   │
│                    │  Evaluator            │                 │                   │
│                    └───────────┬───────────┘                 │                   │
│                                │                             │                   │
│              ┌─────────────────┼─────────────────┐           │                   │
│              ▼                 ▼                  ▼           │                   │
│     ┌────────────┐   ┌────────────────┐  ┌───────────────┐  │                   │
│     │ Score > 0.9│   │ 0.7 < Score    │  │ Score < 0.7  │  │                   │
│     │ Auto-Serve │   │ < 0.9: Queue   │  │ Escalate to  │  │                   │
│     │ to User    │   │ for Review     │  │ Human Agent  │  │                   │
│     └────────────┘   └───────┬────────┘  └──────┬────────┘  │                   │
│                              │                   │           │                   │
│                    ┌─────────▼───────────────────▼────────┐  │                   │
│                    │      Step Functions Workflow          │  │                   │
│                    │  ┌─────────────────────────────────┐ │  │                   │
│                    │  │ 1. Create Review Task           │ │  │                   │
│                    │  │ 2. Assign to Curator Queue      │ │  │                   │
│                    │  │ 3. Wait-for-Callback            │ │  │                   │
│                    │  │ 4. Process Decision              │ │  │                   │
│                    │  │ 5. Update Knowledge Base         │ │  │                   │
│                    │  └─────────────────────────────────┘ │  │                   │
│                    └─────────────┬────────────────────────┘  │                   │
│                                  │                           │                   │
│                    ┌─────────────▼───────────┐               │                   │
│                    │  Curator Dashboard      │               │                   │
│                    │  (React + API Gateway)  │               │                   │
│                    │                         │               │                   │
│                    │  ┌───────────────────┐  │               │                   │
│                    │  │ Review Queue      │  │               │                   │
│                    │  │ ┌───┬───┬───┬───┐ │  │               │                   │
│                    │  │ │ ✓ │ ✗ │ ✎ │ ⟳ │ │  │               │                   │
│                    │  │ └───┴───┴───┴───┘ │  │               │                   │
│                    │  │ Approve│Reject│   │  │               │                   │
│                    │  │ Edit  │Reassign   │  │               │                   │
│                    │  └───────────────────┘  │               │                   │
│                    └─────────────┬───────────┘               │                   │
│                                  │                           │                   │
│              ┌───────────────────┼───────────────────┐       │                   │
│              ▼                   ▼                    ▼       │                   │
│     ┌────────────────┐ ┌────────────────┐  ┌────────────┐   │                   │
│     │  DynamoDB      │ │ OpenSearch     │  │ ElastiCache│   │                   │
│     │  (Feedback +   │ │ Serverless     │  │ Redis      │   │                   │
│     │   Decisions)   │ │ (Updated Docs) │  │ (Cache     │   │                   │
│     └────────────────┘ └────────────────┘  │  Invalidate│   │                   │
│                                            └────────────┘   │                   │
│                                                              │                   │
│  ┌───────────────────────────────────────────────────────┐   │                   │
│  │              Feedback Collection Layer                 │   │                   │
│  │                                                       │   │                   │
│  │  API Gateway (REST)          API Gateway (WebSocket)  │   │                   │
│  │  POST /feedback              Real-time feedback       │   │                   │
│  │  POST /feedback/correction   during chat session      │   │                   │
│  │  POST /review/{id}/decision                           │   │                   │
│  └───────────────────────────────────────────────────────┘   │                   │
└─────────────────────────────────────────────────────────────────────────────────┘

Step Functions ASL: Human Approval Workflow with Callback

This state machine orchestrates the complete human review process for MangaAssist content. When Claude generates a manga description or recommendation with moderate confidence, it enters this workflow for curator review before publishing.

{
  "Comment": "MangaAssist Human Approval Workflow - Curator Review Pipeline",
  "StartAt": "PrepareReviewItem",
  "States": {
    "PrepareReviewItem": {
      "Type": "Task",
      "Resource": "arn:aws:lambda:ap-northeast-1:ACCOUNT:function:manga-prepare-review",
      "Parameters": {
        "content_id.$": "$.content_id",
        "content_type.$": "$.content_type",
        "ai_generated_content.$": "$.ai_generated_content",
        "confidence_score.$": "$.confidence_score",
        "source_context.$": "$.source_context",
        "metadata": {
          "model_id.$": "$.model_id",
          "timestamp.$": "$$.State.EnteredTime",
          "execution_id.$": "$$.Execution.Id"
        }
      },
      "ResultPath": "$.review_item",
      "Next": "DetermineReviewPriority",
      "Retry": [
        {
          "ErrorEquals": ["Lambda.ServiceException", "Lambda.TooManyRequestsException"],
          "IntervalSeconds": 2,
          "MaxAttempts": 3,
          "BackoffRate": 2.0
        }
      ],
      "Catch": [
        {
          "ErrorEquals": ["States.ALL"],
          "Next": "HandlePreparationError",
          "ResultPath": "$.error"
        }
      ]
    },

    "DetermineReviewPriority": {
      "Type": "Choice",
      "Choices": [
        {
          "And": [
            {
              "Variable": "$.confidence_score",
              "NumericLessThan": 0.5
            },
            {
              "Variable": "$.content_type",
              "StringEquals": "product_description"
            }
          ],
          "Next": "AssignSeniorReviewer"
        },
        {
          "Variable": "$.content_type",
          "StringEquals": "content_moderation",
          "Next": "AssignSeniorReviewer"
        },
        {
          "Variable": "$.confidence_score",
          "NumericLessThan": 0.7,
          "Next": "AssignStandardReviewer"
        }
      ],
      "Default": "AssignStandardReviewer"
    },

    "AssignSeniorReviewer": {
      "Type": "Task",
      "Resource": "arn:aws:lambda:ap-northeast-1:ACCOUNT:function:manga-assign-reviewer",
      "Parameters": {
        "review_item.$": "$.review_item",
        "priority": "HIGH",
        "reviewer_tier": "SENIOR",
        "sla_minutes": 30
      },
      "ResultPath": "$.assignment",
      "Next": "NotifyReviewer"
    },

    "AssignStandardReviewer": {
      "Type": "Task",
      "Resource": "arn:aws:lambda:ap-northeast-1:ACCOUNT:function:manga-assign-reviewer",
      "Parameters": {
        "review_item.$": "$.review_item",
        "priority": "STANDARD",
        "reviewer_tier": "STANDARD",
        "sla_minutes": 120
      },
      "ResultPath": "$.assignment",
      "Next": "NotifyReviewer"
    },

    "NotifyReviewer": {
      "Type": "Task",
      "Resource": "arn:aws:lambda:ap-northeast-1:ACCOUNT:function:manga-notify-reviewer",
      "Parameters": {
        "reviewer_id.$": "$.assignment.reviewer_id",
        "review_item.$": "$.review_item",
        "priority.$": "$.assignment.priority",
        "dashboard_url.$": "$.assignment.dashboard_url"
      },
      "ResultPath": "$.notification",
      "Next": "WaitForHumanReview"
    },

    "WaitForHumanReview": {
      "Type": "Task",
      "Resource": "arn:aws:states:::lambda:invoke.waitForTaskToken",
      "Parameters": {
        "FunctionName": "arn:aws:lambda:ap-northeast-1:ACCOUNT:function:manga-create-review-task",
        "Payload": {
          "task_token.$": "$$.Task.Token",
          "review_item.$": "$.review_item",
          "assignment.$": "$.assignment",
          "content_id.$": "$.content_id"
        }
      },
      "ResultPath": "$.human_decision",
      "HeartbeatSeconds": 900,
      "TimeoutSeconds": 7200,
      "Next": "ProcessDecision",
      "Catch": [
        {
          "ErrorEquals": ["States.Timeout"],
          "Next": "HandleReviewTimeout",
          "ResultPath": "$.error"
        },
        {
          "ErrorEquals": ["States.HeartbeatTimeout"],
          "Next": "CheckReviewerStatus",
          "ResultPath": "$.error"
        },
        {
          "ErrorEquals": ["States.TaskFailed"],
          "Next": "HandleReviewRejection",
          "ResultPath": "$.error"
        }
      ]
    },

    "CheckReviewerStatus": {
      "Type": "Task",
      "Resource": "arn:aws:lambda:ap-northeast-1:ACCOUNT:function:manga-check-reviewer",
      "Parameters": {
        "assignment.$": "$.assignment",
        "review_item.$": "$.review_item"
      },
      "ResultPath": "$.reviewer_status",
      "Next": "ReassignOrWait"
    },

    "ReassignOrWait": {
      "Type": "Choice",
      "Choices": [
        {
          "Variable": "$.reviewer_status.action",
          "StringEquals": "REASSIGN",
          "Next": "AssignStandardReviewer"
        },
        {
          "Variable": "$.reviewer_status.action",
          "StringEquals": "ESCALATE",
          "Next": "AssignSeniorReviewer"
        }
      ],
      "Default": "WaitForHumanReview"
    },

    "HandleReviewTimeout": {
      "Type": "Choice",
      "Choices": [
        {
          "And": [
            {
              "Variable": "$.confidence_score",
              "NumericGreaterThanEquals": 0.85
            },
            {
              "Variable": "$.content_type",
              "StringEquals": "recommendation"
            }
          ],
          "Next": "AutoApproveWithFlag"
        }
      ],
      "Default": "RequeueForReview"
    },

    "AutoApproveWithFlag": {
      "Type": "Task",
      "Resource": "arn:aws:lambda:ap-northeast-1:ACCOUNT:function:manga-auto-approve",
      "Parameters": {
        "content_id.$": "$.content_id",
        "reason": "TIMEOUT_AUTO_APPROVE",
        "confidence_score.$": "$.confidence_score",
        "flag_for_post_review": true
      },
      "ResultPath": "$.decision",
      "Next": "PublishContent"
    },

    "RequeueForReview": {
      "Type": "Task",
      "Resource": "arn:aws:lambda:ap-northeast-1:ACCOUNT:function:manga-requeue-review",
      "Parameters": {
        "review_item.$": "$.review_item",
        "requeue_count.$": "$.review_item.requeue_count",
        "escalate": true
      },
      "ResultPath": "$.requeue_result",
      "Next": "AssignSeniorReviewer"
    },

    "ProcessDecision": {
      "Type": "Choice",
      "Choices": [
        {
          "Variable": "$.human_decision.action",
          "StringEquals": "APPROVE",
          "Next": "PublishContent"
        },
        {
          "Variable": "$.human_decision.action",
          "StringEquals": "APPROVE_WITH_EDITS",
          "Next": "ApplyEditsAndPublish"
        },
        {
          "Variable": "$.human_decision.action",
          "StringEquals": "REJECT",
          "Next": "HandleReviewRejection"
        },
        {
          "Variable": "$.human_decision.action",
          "StringEquals": "REQUEST_REGENERATION",
          "Next": "RegenerateContent"
        }
      ],
      "Default": "HandleReviewRejection"
    },

    "ApplyEditsAndPublish": {
      "Type": "Task",
      "Resource": "arn:aws:lambda:ap-northeast-1:ACCOUNT:function:manga-apply-edits",
      "Parameters": {
        "content_id.$": "$.content_id",
        "original_content.$": "$.ai_generated_content",
        "edited_content.$": "$.human_decision.edited_content",
        "reviewer_id.$": "$.human_decision.reviewer_id",
        "edit_notes.$": "$.human_decision.notes"
      },
      "ResultPath": "$.edited_result",
      "Next": "StoreFeedbackForTraining"
    },

    "PublishContent": {
      "Type": "Parallel",
      "Branches": [
        {
          "StartAt": "UpdateOpenSearch",
          "States": {
            "UpdateOpenSearch": {
              "Type": "Task",
              "Resource": "arn:aws:lambda:ap-northeast-1:ACCOUNT:function:manga-update-opensearch",
              "Parameters": {
                "content_id.$": "$.content_id",
                "content.$": "$.ai_generated_content",
                "status": "PUBLISHED"
              },
              "End": true
            }
          }
        },
        {
          "StartAt": "InvalidateCache",
          "States": {
            "InvalidateCache": {
              "Type": "Task",
              "Resource": "arn:aws:lambda:ap-northeast-1:ACCOUNT:function:manga-invalidate-cache",
              "Parameters": {
                "content_id.$": "$.content_id",
                "cache_keys.$": "$.review_item.affected_cache_keys"
              },
              "End": true
            }
          }
        },
        {
          "StartAt": "RecordDecision",
          "States": {
            "RecordDecision": {
              "Type": "Task",
              "Resource": "arn:aws:lambda:ap-northeast-1:ACCOUNT:function:manga-record-decision",
              "Parameters": {
                "content_id.$": "$.content_id",
                "decision": "APPROVED",
                "reviewer_id.$": "$.assignment.reviewer_id",
                "timestamp.$": "$$.State.EnteredTime"
              },
              "End": true
            }
          }
        }
      ],
      "ResultPath": "$.publish_results",
      "Next": "ReviewComplete"
    },

    "StoreFeedbackForTraining": {
      "Type": "Task",
      "Resource": "arn:aws:lambda:ap-northeast-1:ACCOUNT:function:manga-store-training-feedback",
      "Parameters": {
        "content_id.$": "$.content_id",
        "original.$": "$.ai_generated_content",
        "corrected.$": "$.human_decision.edited_content",
        "feedback_type": "CURATOR_CORRECTION",
        "reviewer_id.$": "$.human_decision.reviewer_id"
      },
      "ResultPath": "$.training_feedback",
      "Next": "PublishContent"
    },

    "RegenerateContent": {
      "Type": "Task",
      "Resource": "arn:aws:lambda:ap-northeast-1:ACCOUNT:function:manga-regenerate-content",
      "Parameters": {
        "content_id.$": "$.content_id",
        "original_content.$": "$.ai_generated_content",
        "reviewer_guidance.$": "$.human_decision.guidance",
        "attempt_number.$": "$.review_item.generation_attempt"
      },
      "ResultPath": "$.regenerated",
      "Next": "PrepareReviewItem"
    },

    "HandleReviewRejection": {
      "Type": "Task",
      "Resource": "arn:aws:lambda:ap-northeast-1:ACCOUNT:function:manga-handle-rejection",
      "Parameters": {
        "content_id.$": "$.content_id",
        "rejection_reason.$": "$.human_decision.reason",
        "reviewer_id.$": "$.assignment.reviewer_id"
      },
      "ResultPath": "$.rejection_result",
      "Next": "ReviewComplete"
    },

    "HandlePreparationError": {
      "Type": "Task",
      "Resource": "arn:aws:lambda:ap-northeast-1:ACCOUNT:function:manga-handle-error",
      "Parameters": {
        "error.$": "$.error",
        "content_id.$": "$.content_id",
        "stage": "PREPARATION"
      },
      "End": true
    },

    "ReviewComplete": {
      "Type": "Task",
      "Resource": "arn:aws:lambda:ap-northeast-1:ACCOUNT:function:manga-review-metrics",
      "Parameters": {
        "content_id.$": "$.content_id",
        "execution_id.$": "$$.Execution.Id",
        "started_at.$": "$$.Execution.StartTime",
        "completed_at.$": "$$.State.EnteredTime"
      },
      "End": true
    }
  }
}

Production Python: Feedback Collection API

Lambda Handler for Feedback Submission via API Gateway

"""
MangaAssist Feedback Collection API
Handles user feedback (thumbs up/down, ratings, corrections) via API Gateway.
Stores in DynamoDB with real-time aggregation for quality monitoring.
"""

import json
import uuid
import time
import logging
from datetime import datetime, timezone
from decimal import Decimal
from typing import Any

import boto3
from botocore.exceptions import ClientError

logger = logging.getLogger()
logger.setLevel(logging.INFO)

dynamodb = boto3.resource("dynamodb", region_name="ap-northeast-1")
feedback_table = dynamodb.Table("manga-assist-feedback")
aggregation_table = dynamodb.Table("manga-assist-feedback-aggregation")
sqs = boto3.client("sqs", region_name="ap-northeast-1")
cloudwatch = boto3.client("cloudwatch", region_name="ap-northeast-1")

FEEDBACK_QUEUE_URL = (
    "https://sqs.ap-northeast-1.amazonaws.com/ACCOUNT/manga-feedback-processing"
)

VALID_FEEDBACK_TYPES = {"thumbs_up", "thumbs_down", "star_rating", "correction", "detailed_rating"}
VALID_DIMENSIONS = {"accuracy", "helpfulness", "tone", "relevance", "completeness"}
MAX_CORRECTION_LENGTH = 2000
MAX_COMMENT_LENGTH = 500


def decimal_default(obj: Any) -> Any:
    """JSON serializer for Decimal types from DynamoDB."""
    if isinstance(obj, Decimal):
        return float(obj)
    raise TypeError(f"Object of type {type(obj)} is not JSON serializable")


def validate_feedback_payload(body: dict) -> tuple[bool, str]:
    """
    Validate the incoming feedback payload.

    Returns:
        Tuple of (is_valid, error_message)
    """
    required_fields = ["session_id", "message_id", "feedback_type"]
    for field in required_fields:
        if field not in body:
            return False, f"Missing required field: {field}"

    feedback_type = body["feedback_type"]
    if feedback_type not in VALID_FEEDBACK_TYPES:
        return False, f"Invalid feedback_type: {feedback_type}. Valid: {VALID_FEEDBACK_TYPES}"

    if feedback_type == "star_rating":
        rating = body.get("rating")
        if rating is None or not (1 <= rating <= 5):
            return False, "star_rating requires 'rating' between 1 and 5"

    if feedback_type == "correction":
        correction = body.get("correction_text", "")
        if not correction or len(correction) > MAX_CORRECTION_LENGTH:
            return False, f"correction_text required and must be under {MAX_CORRECTION_LENGTH} chars"

    if feedback_type == "detailed_rating":
        dimensions = body.get("dimensions", {})
        if not dimensions:
            return False, "detailed_rating requires 'dimensions' dict"
        for dim, score in dimensions.items():
            if dim not in VALID_DIMENSIONS:
                return False, f"Invalid dimension: {dim}. Valid: {VALID_DIMENSIONS}"
            if not (1 <= score <= 5):
                return False, f"Dimension score must be 1-5, got {score} for {dim}"

    return True, ""


def store_feedback(feedback_id: str, body: dict, source_ip: str) -> dict:
    """
    Store feedback item in DynamoDB with TTL for GDPR compliance.

    Args:
        feedback_id: Unique identifier for this feedback
        body: Validated feedback payload
        source_ip: Client IP for audit trail

    Returns:
        The stored feedback item
    """
    now = datetime.now(timezone.utc)
    ttl_90_days = int(now.timestamp()) + (90 * 24 * 60 * 60)

    item = {
        "feedback_id": feedback_id,
        "session_id": body["session_id"],
        "message_id": body["message_id"],
        "feedback_type": body["feedback_type"],
        "user_id": body.get("user_id", "anonymous"),
        "created_at": now.isoformat(),
        "created_at_epoch": Decimal(str(now.timestamp())),
        "source_ip_hash": str(hash(source_ip) % (10**10)),
        "ttl": ttl_90_days,
        "processed": False,
    }

    # Add type-specific fields
    feedback_type = body["feedback_type"]
    if feedback_type in ("thumbs_up", "thumbs_down"):
        item["sentiment"] = 1 if feedback_type == "thumbs_up" else -1
        item["comment"] = body.get("comment", "")[:MAX_COMMENT_LENGTH]

    elif feedback_type == "star_rating":
        item["rating"] = Decimal(str(body["rating"]))
        item["comment"] = body.get("comment", "")[:MAX_COMMENT_LENGTH]

    elif feedback_type == "correction":
        item["correction_text"] = body["correction_text"][:MAX_CORRECTION_LENGTH]
        item["original_response_snippet"] = body.get("original_response_snippet", "")[:500]
        item["correction_category"] = body.get("correction_category", "general")

    elif feedback_type == "detailed_rating":
        item["dimensions"] = {
            k: Decimal(str(v)) for k, v in body["dimensions"].items()
        }
        item["overall_score"] = Decimal(
            str(sum(body["dimensions"].values()) / len(body["dimensions"]))
        )

    feedback_table.put_item(Item=item)
    return item


def update_realtime_aggregation(body: dict) -> None:
    """
    Update real-time aggregation counters for feedback monitoring.
    Uses atomic counters in DynamoDB for concurrent safety.
    """
    today = datetime.now(timezone.utc).strftime("%Y-%m-%d")
    hour = datetime.now(timezone.utc).strftime("%H")
    agg_key = f"{today}#{hour}"

    feedback_type = body["feedback_type"]

    try:
        update_expr = "SET #count = if_not_exists(#count, :zero) + :one"
        expr_names = {"#count": f"{feedback_type}_count"}
        expr_values = {":zero": Decimal("0"), ":one": Decimal("1")}

        if feedback_type == "star_rating":
            update_expr += ", #total_rating = if_not_exists(#total_rating, :zero) + :rating"
            expr_names["#total_rating"] = "total_star_rating"
            expr_values[":rating"] = Decimal(str(body["rating"]))

        elif feedback_type == "detailed_rating":
            for dim, score in body["dimensions"].items():
                safe_dim = dim.replace("-", "_")
                update_expr += f", #dim_{safe_dim} = if_not_exists(#dim_{safe_dim}, :zero) + :dim_{safe_dim}_val"
                expr_names[f"#dim_{safe_dim}"] = f"dim_{safe_dim}_total"
                expr_values[f":dim_{safe_dim}_val"] = Decimal(str(score))

        aggregation_table.update_item(
            Key={"agg_key": agg_key, "metric_type": "hourly_feedback"},
            UpdateExpression=update_expr,
            ExpressionAttributeNames=expr_names,
            ExpressionAttributeValues=expr_values,
        )
    except ClientError as e:
        logger.error(f"Aggregation update failed: {e}")


def emit_feedback_metrics(feedback_type: str, body: dict) -> None:
    """Emit CloudWatch metrics for real-time monitoring dashboards."""
    metrics = [
        {
            "MetricName": "FeedbackSubmitted",
            "Dimensions": [
                {"Name": "FeedbackType", "Value": feedback_type},
                {"Name": "Environment", "Value": "production"},
            ],
            "Value": 1,
            "Unit": "Count",
        }
    ]

    if feedback_type == "star_rating":
        metrics.append({
            "MetricName": "StarRating",
            "Dimensions": [{"Name": "Environment", "Value": "production"}],
            "Value": body["rating"],
            "Unit": "None",
        })

    if feedback_type in ("thumbs_up", "thumbs_down"):
        sentiment_value = 1.0 if feedback_type == "thumbs_up" else 0.0
        metrics.append({
            "MetricName": "SentimentScore",
            "Dimensions": [{"Name": "Environment", "Value": "production"}],
            "Value": sentiment_value,
            "Unit": "None",
        })

    try:
        cloudwatch.put_metric_data(Namespace="MangaAssist/Feedback", MetricData=metrics)
    except ClientError as e:
        logger.warning(f"CloudWatch metric emission failed: {e}")


def enqueue_for_processing(feedback_id: str, body: dict) -> None:
    """Send feedback to SQS for async processing (aggregation, alerts, training data)."""
    try:
        sqs.send_message(
            QueueUrl=FEEDBACK_QUEUE_URL,
            MessageBody=json.dumps({
                "feedback_id": feedback_id,
                "feedback_type": body["feedback_type"],
                "session_id": body["session_id"],
                "message_id": body["message_id"],
                "timestamp": datetime.now(timezone.utc).isoformat(),
            }),
            MessageGroupId=body.get("session_id", "default"),
            MessageDeduplicationId=feedback_id,
        )
    except ClientError as e:
        logger.error(f"SQS enqueue failed for {feedback_id}: {e}")


def lambda_handler(event: dict, context: Any) -> dict:
    """
    API Gateway Lambda proxy handler for feedback collection.

    Endpoints:
        POST /feedback - Submit user feedback
        POST /feedback/correction - Submit content correction
    """
    try:
        http_method = event.get("httpMethod", "")
        path = event.get("path", "")
        source_ip = event.get("requestContext", {}).get("identity", {}).get("sourceIp", "unknown")

        if http_method != "POST":
            return {
                "statusCode": 405,
                "headers": {"Content-Type": "application/json"},
                "body": json.dumps({"error": "Method not allowed"}),
            }

        body = json.loads(event.get("body", "{}"))

        # Validate payload
        is_valid, error_msg = validate_feedback_payload(body)
        if not is_valid:
            return {
                "statusCode": 400,
                "headers": {"Content-Type": "application/json"},
                "body": json.dumps({"error": error_msg}),
            }

        # Generate feedback ID and store
        feedback_id = f"fb-{uuid.uuid4().hex[:12]}-{int(time.time())}"
        stored_item = store_feedback(feedback_id, body, source_ip)

        # Parallel: aggregation, metrics, queue
        update_realtime_aggregation(body)
        emit_feedback_metrics(body["feedback_type"], body)
        enqueue_for_processing(feedback_id, body)

        logger.info(f"Feedback stored: {feedback_id} type={body['feedback_type']}")

        return {
            "statusCode": 201,
            "headers": {
                "Content-Type": "application/json",
                "Access-Control-Allow-Origin": "*",
            },
            "body": json.dumps({
                "feedback_id": feedback_id,
                "status": "received",
                "message": "Thank you for your feedback!",
            }),
        }

    except json.JSONDecodeError:
        return {
            "statusCode": 400,
            "headers": {"Content-Type": "application/json"},
            "body": json.dumps({"error": "Invalid JSON body"}),
        }
    except Exception as e:
        logger.exception(f"Unexpected error in feedback handler: {e}")
        return {
            "statusCode": 500,
            "headers": {"Content-Type": "application/json"},
            "body": json.dumps({"error": "Internal server error"}),
        }

Production Python: Human Review Queue Manager

"""
MangaAssist Human Review Queue Manager
Manages the curator review queue, assignment, and workload balancing.
Integrates with Step Functions for callback-based approval workflows.
"""

import json
import time
import logging
from datetime import datetime, timezone, timedelta
from decimal import Decimal
from typing import Optional
from dataclasses import dataclass, asdict

import boto3
from botocore.exceptions import ClientError

logger = logging.getLogger()
logger.setLevel(logging.INFO)

dynamodb = boto3.resource("dynamodb", region_name="ap-northeast-1")
review_queue_table = dynamodb.Table("manga-assist-review-queue")
reviewer_table = dynamodb.Table("manga-assist-reviewers")
sfn_client = boto3.client("stepfunctions", region_name="ap-northeast-1")
sns_client = boto3.client("sns", region_name="ap-northeast-1")
cloudwatch = boto3.client("cloudwatch", region_name="ap-northeast-1")

CURATOR_NOTIFICATION_TOPIC = (
    "arn:aws:sns:ap-northeast-1:ACCOUNT:manga-curator-notifications"
)
MAX_QUEUE_DEPTH_PER_REVIEWER = 10
SLA_BREACH_THRESHOLD_MINUTES = 60


@dataclass
class ReviewItem:
    """Represents an item in the human review queue."""
    review_id: str
    content_id: str
    content_type: str
    ai_generated_content: str
    confidence_score: float
    priority: str  # HIGH, STANDARD, LOW
    status: str  # PENDING, ASSIGNED, IN_REVIEW, COMPLETED, EXPIRED
    assigned_reviewer: Optional[str]
    task_token: str
    created_at: str
    sla_deadline: str
    requeue_count: int = 0
    generation_attempt: int = 1

    def is_sla_breached(self) -> bool:
        """Check if this review item has breached its SLA."""
        deadline = datetime.fromisoformat(self.sla_deadline)
        return datetime.now(timezone.utc) > deadline

    def time_to_sla_breach(self) -> timedelta:
        """Calculate remaining time before SLA breach."""
        deadline = datetime.fromisoformat(self.sla_deadline)
        return deadline - datetime.now(timezone.utc)


class ReviewQueueManager:
    """
    Manages the human review queue for MangaAssist content.
    Handles assignment, load balancing, SLA monitoring, and callback completion.
    """

    def __init__(self):
        self.review_queue = review_queue_table
        self.reviewers = reviewer_table

    def create_review_task(
        self,
        task_token: str,
        review_item_data: dict,
        assignment: dict,
        content_id: str,
    ) -> dict:
        """
        Create a review task and store the Step Functions callback token.
        Called by the WaitForHumanReview state in the Step Functions workflow.

        Args:
            task_token: Step Functions callback token
            review_item_data: Content details for review
            assignment: Reviewer assignment information
            content_id: Unique content identifier

        Returns:
            Created review task details
        """
        now = datetime.now(timezone.utc)
        sla_minutes = assignment.get("sla_minutes", 120)
        sla_deadline = now + timedelta(minutes=sla_minutes)

        review_id = f"rev-{content_id}-{int(now.timestamp())}"

        item = {
            "review_id": review_id,
            "content_id": content_id,
            "content_type": review_item_data.get("content_type", "general"),
            "ai_generated_content": review_item_data.get("content", ""),
            "confidence_score": Decimal(str(review_item_data.get("confidence_score", 0.0))),
            "priority": assignment.get("priority", "STANDARD"),
            "status": "ASSIGNED",
            "assigned_reviewer": assignment.get("reviewer_id"),
            "task_token": task_token,
            "created_at": now.isoformat(),
            "sla_deadline": sla_deadline.isoformat(),
            "sla_minutes": sla_minutes,
            "requeue_count": review_item_data.get("requeue_count", 0),
            "generation_attempt": review_item_data.get("generation_attempt", 1),
            "source_context": review_item_data.get("source_context", {}),
            "ttl": int((now + timedelta(days=30)).timestamp()),
        }

        self.review_queue.put_item(Item=item)

        # Update reviewer workload counter
        self._increment_reviewer_workload(assignment.get("reviewer_id"))

        # Emit queue depth metric
        self._emit_queue_metrics(assignment.get("priority", "STANDARD"))

        logger.info(
            f"Review task created: {review_id} assigned to {assignment.get('reviewer_id')} "
            f"SLA: {sla_minutes}min"
        )

        return {
            "review_id": review_id,
            "status": "ASSIGNED",
            "sla_deadline": sla_deadline.isoformat(),
        }

    def submit_review_decision(
        self,
        review_id: str,
        reviewer_id: str,
        decision: dict,
    ) -> dict:
        """
        Process a curator's review decision and complete the Step Functions callback.

        Args:
            review_id: The review task identifier
            reviewer_id: ID of the reviewing curator
            decision: Review decision including action, notes, edits

        Returns:
            Processing result
        """
        # Fetch the review item
        response = self.review_queue.get_item(Key={"review_id": review_id})
        item = response.get("Item")

        if not item:
            raise ValueError(f"Review item not found: {review_id}")

        if item["assigned_reviewer"] != reviewer_id:
            raise PermissionError(
                f"Reviewer {reviewer_id} is not assigned to {review_id}"
            )

        if item["status"] not in ("ASSIGNED", "IN_REVIEW"):
            raise ValueError(f"Review {review_id} is in invalid state: {item['status']}")

        task_token = item["task_token"]
        action = decision.get("action", "REJECT")

        now = datetime.now(timezone.utc)
        review_duration = (
            now - datetime.fromisoformat(item["created_at"])
        ).total_seconds()

        # Build the callback output
        callback_output = {
            "action": action,
            "reviewer_id": reviewer_id,
            "review_id": review_id,
            "review_duration_seconds": review_duration,
            "notes": decision.get("notes", ""),
            "timestamp": now.isoformat(),
        }

        if action == "APPROVE_WITH_EDITS":
            callback_output["edited_content"] = decision.get("edited_content", "")

        if action == "REQUEST_REGENERATION":
            callback_output["guidance"] = decision.get("guidance", "")

        if action == "REJECT":
            callback_output["reason"] = decision.get("reason", "Quality standards not met")

        # Send callback to Step Functions
        try:
            if action in ("APPROVE", "APPROVE_WITH_EDITS", "REQUEST_REGENERATION"):
                sfn_client.send_task_success(
                    taskToken=task_token,
                    output=json.dumps(callback_output, default=str),
                )
            else:
                sfn_client.send_task_failure(
                    taskToken=task_token,
                    error="ReviewRejected",
                    cause=json.dumps(callback_output, default=str),
                )
        except ClientError as e:
            if "TaskTimedOut" in str(e):
                logger.warning(f"Task token expired for review {review_id}")
                self._update_review_status(review_id, "EXPIRED")
                return {"status": "EXPIRED", "message": "Review period has expired"}
            raise

        # Update review item status
        self._update_review_status(review_id, "COMPLETED", {
            "decision_action": action,
            "decision_at": now.isoformat(),
            "review_duration_seconds": Decimal(str(round(review_duration, 2))),
        })

        # Decrement reviewer workload
        self._decrement_reviewer_workload(reviewer_id)

        # Emit review completion metrics
        self._emit_review_metrics(action, review_duration, item["priority"])

        logger.info(
            f"Review completed: {review_id} action={action} "
            f"duration={review_duration:.1f}s reviewer={reviewer_id}"
        )

        return {
            "status": "COMPLETED",
            "action": action,
            "review_duration_seconds": review_duration,
        }

    def get_reviewer_queue(self, reviewer_id: str) -> list[dict]:
        """
        Fetch all pending review items for a specific curator.

        Args:
            reviewer_id: The curator's unique identifier

        Returns:
            List of review items sorted by priority and SLA deadline
        """
        response = self.review_queue.query(
            IndexName="reviewer-status-index",
            KeyConditionExpression="assigned_reviewer = :rid AND #status IN (:s1, :s2)",
            ExpressionAttributeNames={"#status": "status"},
            ExpressionAttributeValues={
                ":rid": reviewer_id,
                ":s1": "ASSIGNED",
                ":s2": "IN_REVIEW",
            },
        )

        items = response.get("Items", [])

        # Sort: HIGH priority first, then by SLA deadline (earliest first)
        priority_order = {"HIGH": 0, "STANDARD": 1, "LOW": 2}
        items.sort(key=lambda x: (
            priority_order.get(x.get("priority", "LOW"), 2),
            x.get("sla_deadline", ""),
        ))

        # Flag SLA-breached items
        now = datetime.now(timezone.utc)
        for item in items:
            deadline = datetime.fromisoformat(item["sla_deadline"])
            item["sla_breached"] = now > deadline
            remaining = (deadline - now).total_seconds()
            item["sla_remaining_minutes"] = max(0, round(remaining / 60, 1))

        return items

    def assign_reviewer(
        self,
        review_item: dict,
        priority: str,
        reviewer_tier: str,
        sla_minutes: int,
    ) -> dict:
        """
        Assign a reviewer based on workload balancing and expertise matching.

        Args:
            review_item: Content to be reviewed
            priority: Review priority level
            reviewer_tier: Required reviewer expertise tier
            sla_minutes: SLA deadline in minutes

        Returns:
            Assignment details including reviewer_id and dashboard_url
        """
        # Fetch available reviewers for the tier
        response = self.reviewers.query(
            IndexName="tier-status-index",
            KeyConditionExpression="reviewer_tier = :tier AND #status = :active",
            ExpressionAttributeNames={"#status": "status"},
            ExpressionAttributeValues={
                ":tier": reviewer_tier,
                ":active": "ACTIVE",
            },
        )

        reviewers = response.get("Items", [])
        if not reviewers:
            # Fallback: try any available reviewer
            logger.warning(f"No {reviewer_tier} reviewers available, falling back")
            response = self.reviewers.scan(
                FilterExpression="#status = :active",
                ExpressionAttributeNames={"#status": "status"},
                ExpressionAttributeValues={":active": "ACTIVE"},
            )
            reviewers = response.get("Items", [])

        if not reviewers:
            raise RuntimeError("No reviewers available for assignment")

        # Pick reviewer with lowest current workload
        reviewers.sort(key=lambda r: int(r.get("current_workload", 0)))
        selected = reviewers[0]

        if int(selected.get("current_workload", 0)) >= MAX_QUEUE_DEPTH_PER_REVIEWER:
            logger.warning(
                f"All reviewers at capacity. Lowest workload: "
                f"{selected['reviewer_id']} with {selected['current_workload']} items"
            )
            # Emit capacity alert
            self._emit_capacity_alert(reviewer_tier)

        content_type = review_item.get("content_type", "general")
        dashboard_url = (
            f"https://manga-curator.internal.example.com/review"
            f"?reviewer={selected['reviewer_id']}&type={content_type}"
        )

        return {
            "reviewer_id": selected["reviewer_id"],
            "reviewer_name": selected.get("reviewer_name", ""),
            "reviewer_tier": reviewer_tier,
            "priority": priority,
            "sla_minutes": sla_minutes,
            "dashboard_url": dashboard_url,
            "current_workload": int(selected.get("current_workload", 0)) + 1,
        }

    def check_sla_breaches(self) -> list[dict]:
        """
        Scan for SLA-breached review items and trigger escalation.
        Called by a scheduled EventBridge rule every 5 minutes.

        Returns:
            List of breached review items with escalation actions taken
        """
        now = datetime.now(timezone.utc)
        breached_items = []

        # Query for active reviews past their SLA deadline
        response = self.review_queue.scan(
            FilterExpression="#status IN (:s1, :s2) AND sla_deadline < :now",
            ExpressionAttributeNames={"#status": "status"},
            ExpressionAttributeValues={
                ":s1": "ASSIGNED",
                ":s2": "IN_REVIEW",
                ":now": now.isoformat(),
            },
        )

        for item in response.get("Items", []):
            breach_duration = (
                now - datetime.fromisoformat(item["sla_deadline"])
            ).total_seconds() / 60

            escalation_action = self._determine_escalation(item, breach_duration)

            breached_items.append({
                "review_id": item["review_id"],
                "content_id": item["content_id"],
                "assigned_reviewer": item["assigned_reviewer"],
                "breach_duration_minutes": round(breach_duration, 1),
                "escalation_action": escalation_action,
            })

            # Send SLA breach notification
            self._notify_sla_breach(item, breach_duration, escalation_action)

        if breached_items:
            cloudwatch.put_metric_data(
                Namespace="MangaAssist/ReviewQueue",
                MetricData=[{
                    "MetricName": "SLABreachCount",
                    "Value": len(breached_items),
                    "Unit": "Count",
                }],
            )

        return breached_items

    def _determine_escalation(self, item: dict, breach_minutes: float) -> str:
        """Determine the escalation action based on breach severity."""
        if breach_minutes > 120:
            return "AUTO_APPROVE_WITH_FLAG" if float(item.get("confidence_score", 0)) > 0.85 else "REASSIGN_URGENT"
        elif breach_minutes > 60:
            return "ESCALATE_TO_SENIOR"
        else:
            return "NOTIFY_REVIEWER"

    def _increment_reviewer_workload(self, reviewer_id: str) -> None:
        """Atomically increment a reviewer's workload counter."""
        try:
            self.reviewers.update_item(
                Key={"reviewer_id": reviewer_id},
                UpdateExpression="SET current_workload = if_not_exists(current_workload, :zero) + :one",
                ExpressionAttributeValues={":zero": 0, ":one": 1},
            )
        except ClientError as e:
            logger.error(f"Failed to increment workload for {reviewer_id}: {e}")

    def _decrement_reviewer_workload(self, reviewer_id: str) -> None:
        """Atomically decrement a reviewer's workload counter."""
        try:
            self.reviewers.update_item(
                Key={"reviewer_id": reviewer_id},
                UpdateExpression="SET current_workload = current_workload - :one",
                ConditionExpression="current_workload > :zero",
                ExpressionAttributeValues={":zero": 0, ":one": 1},
            )
        except ClientError as e:
            logger.error(f"Failed to decrement workload for {reviewer_id}: {e}")

    def _update_review_status(
        self, review_id: str, status: str, extra_attrs: dict = None
    ) -> None:
        """Update the status and optional attributes of a review item."""
        update_expr = "SET #status = :status, updated_at = :now"
        expr_names = {"#status": "status"}
        expr_values = {
            ":status": status,
            ":now": datetime.now(timezone.utc).isoformat(),
        }

        if extra_attrs:
            for key, value in extra_attrs.items():
                safe_key = key.replace("-", "_")
                update_expr += f", {safe_key} = :{safe_key}"
                expr_values[f":{safe_key}"] = value

        self.review_queue.update_item(
            Key={"review_id": review_id},
            UpdateExpression=update_expr,
            ExpressionAttributeNames=expr_names,
            ExpressionAttributeValues=expr_values,
        )

    def _emit_queue_metrics(self, priority: str) -> None:
        """Emit review queue depth metrics."""
        try:
            cloudwatch.put_metric_data(
                Namespace="MangaAssist/ReviewQueue",
                MetricData=[{
                    "MetricName": "QueueDepth",
                    "Dimensions": [{"Name": "Priority", "Value": priority}],
                    "Value": 1,
                    "Unit": "Count",
                }],
            )
        except ClientError:
            pass

    def _emit_review_metrics(
        self, action: str, duration: float, priority: str
    ) -> None:
        """Emit review completion metrics."""
        try:
            cloudwatch.put_metric_data(
                Namespace="MangaAssist/ReviewQueue",
                MetricData=[
                    {
                        "MetricName": "ReviewCompleted",
                        "Dimensions": [
                            {"Name": "Action", "Value": action},
                            {"Name": "Priority", "Value": priority},
                        ],
                        "Value": 1,
                        "Unit": "Count",
                    },
                    {
                        "MetricName": "ReviewDuration",
                        "Dimensions": [{"Name": "Priority", "Value": priority}],
                        "Value": duration,
                        "Unit": "Seconds",
                    },
                ],
            )
        except ClientError:
            pass

    def _emit_capacity_alert(self, reviewer_tier: str) -> None:
        """Emit alert when reviewer capacity is exhausted."""
        try:
            sns_client.publish(
                TopicArn=CURATOR_NOTIFICATION_TOPIC,
                Subject=f"[MangaAssist] Reviewer Capacity Alert - {reviewer_tier}",
                Message=(
                    f"All {reviewer_tier} reviewers are at maximum capacity "
                    f"({MAX_QUEUE_DEPTH_PER_REVIEWER} items each). "
                    f"Consider adding more reviewers or adjusting auto-approval thresholds."
                ),
            )
        except ClientError as e:
            logger.error(f"Failed to send capacity alert: {e}")

    def _notify_sla_breach(
        self, item: dict, breach_minutes: float, action: str
    ) -> None:
        """Send SLA breach notification."""
        try:
            sns_client.publish(
                TopicArn=CURATOR_NOTIFICATION_TOPIC,
                Subject=f"[MangaAssist] SLA Breach - Review {item['review_id']}",
                Message=(
                    f"Review SLA breached by {breach_minutes:.0f} minutes.\n"
                    f"Review ID: {item['review_id']}\n"
                    f"Content ID: {item['content_id']}\n"
                    f"Assigned: {item['assigned_reviewer']}\n"
                    f"Priority: {item['priority']}\n"
                    f"Escalation: {action}"
                ),
            )
        except ClientError as e:
            logger.error(f"Failed to send SLA breach notification: {e}")


# --- Lambda Handlers ---

queue_manager = ReviewQueueManager()


def create_review_task_handler(event: dict, context) -> dict:
    """Lambda handler called by Step Functions WaitForHumanReview state."""
    return queue_manager.create_review_task(
        task_token=event["task_token"],
        review_item_data=event["review_item"],
        assignment=event["assignment"],
        content_id=event["content_id"],
    )


def submit_decision_handler(event: dict, context) -> dict:
    """
    API Gateway Lambda proxy handler for curator review decisions.
    Called from the curator dashboard when a review is submitted.
    """
    try:
        body = json.loads(event.get("body", "{}"))
        review_id = event.get("pathParameters", {}).get("review_id")
        reviewer_id = event.get("requestContext", {}).get(
            "authorizer", {}
        ).get("claims", {}).get("sub", body.get("reviewer_id"))

        if not review_id or not reviewer_id:
            return {
                "statusCode": 400,
                "body": json.dumps({"error": "review_id and reviewer_id required"}),
            }

        result = queue_manager.submit_review_decision(review_id, reviewer_id, body)

        return {
            "statusCode": 200,
            "headers": {
                "Content-Type": "application/json",
                "Access-Control-Allow-Origin": "*",
            },
            "body": json.dumps(result, default=str),
        }

    except ValueError as e:
        return {"statusCode": 404, "body": json.dumps({"error": str(e)})}
    except PermissionError as e:
        return {"statusCode": 403, "body": json.dumps({"error": str(e)})}
    except Exception as e:
        logger.exception(f"Decision handler error: {e}")
        return {"statusCode": 500, "body": json.dumps({"error": "Internal server error"})}


def sla_monitor_handler(event: dict, context) -> dict:
    """
    EventBridge scheduled handler for SLA breach detection.
    Runs every 5 minutes via: rate(5 minutes)
    """
    breached = queue_manager.check_sla_breaches()
    logger.info(f"SLA check complete. Breaches found: {len(breached)}")
    return {"breached_count": len(breached), "items": breached}


def reviewer_queue_handler(event: dict, context) -> dict:
    """API Gateway handler for fetching a curator's review queue."""
    try:
        reviewer_id = event.get("pathParameters", {}).get("reviewer_id")
        if not reviewer_id:
            return {"statusCode": 400, "body": json.dumps({"error": "reviewer_id required"})}

        items = queue_manager.get_reviewer_queue(reviewer_id)

        return {
            "statusCode": 200,
            "headers": {
                "Content-Type": "application/json",
                "Access-Control-Allow-Origin": "*",
            },
            "body": json.dumps(items, default=str),
        }
    except Exception as e:
        logger.exception(f"Queue fetch error: {e}")
        return {"statusCode": 500, "body": json.dumps({"error": "Internal server error"})}

Key Takeaways

# Takeaway MangaAssist Application
1 Confidence-threshold gating routes only uncertain AI outputs to human review, keeping the review queue manageable at scale MangaAssist auto-serves responses above 0.9 confidence; only moderate-confidence manga descriptions and recommendations enter the curator queue
2 Step Functions waitForTaskToken is the canonical AWS pattern for human-in-the-loop workflows -- it pauses execution without consuming compute, waiting for an external callback Curator reviews can take minutes to hours; the callback pattern holds zero resources while waiting, with heartbeat and timeout safeguards
3 SLA monitoring with automated escalation prevents human bottlenecks from blocking AI content pipelines A scheduled EventBridge rule checks every 5 minutes for breached reviews, auto-escalating or auto-approving based on confidence thresholds
4 Workload-balanced assignment distributes review tasks to curators with the lowest current queue depth, preventing burnout and uneven latency When a new manga description needs review, the system picks the curator with the fewest pending items and matching expertise tier
5 Feedback collection must be low-friction -- thumbs up/down captures high volume with minimal user effort, while detailed ratings and corrections provide richer training signal from engaged users MangaAssist offers binary feedback inline during chat (WebSocket), with optional star ratings and free-text corrections for users who want to contribute more
6 Every human decision becomes training data -- curator corrections (original vs. edited content pairs) are stored for future prompt engineering and fine-tuning cycles When a curator edits an AI-generated manga description, both versions are saved with metadata, feeding into the feedback-driven improvement loop (covered in detail in File 2)
7 Parallel publish actions (OpenSearch update, cache invalidation, audit record) run concurrently via Step Functions Parallel state, reducing end-to-end publish latency After curator approval, the manga description updates in OpenSearch, Redis cache is invalidated, and the decision is recorded -- all simultaneously
8 API Gateway rate limiting and input validation protect the feedback endpoint from abuse while maintaining the 3-second response target for legitimate users Feedback payloads are validated (type, length, dimensions) and rate-limited per API key, with sanitized storage to prevent injection
9 DynamoDB TTL for GDPR compliance automatically expires feedback data after 90 days unless explicitly retained for training User feedback items carry a TTL attribute; expired data is automatically purged without manual cleanup jobs
10 Heartbeat timeouts detect abandoned reviews before the main timeout expires, enabling early re-assignment to available curators If a curator goes silent for 15 minutes (no heartbeat), the system checks their status and can reassign the review to another curator without waiting the full 2-hour timeout

Quick Reference: API Gateway Endpoints for Human-in-the-Loop

Method Endpoint Purpose Auth
POST /feedback Submit user feedback (thumbs, rating, correction) API Key
POST /feedback/correction Submit detailed content correction API Key
GET /review-queue/{reviewer_id} Fetch curator's pending review items Cognito JWT
POST /review/{review_id}/decision Submit approve/reject/edit decision Cognito JWT
GET /review/{review_id} Fetch review item details Cognito JWT
GET /review-metrics Dashboard aggregation data Cognito JWT
POST /review/{review_id}/heartbeat Curator heartbeat (in-progress signal) Cognito JWT

Cost Analysis: Human Review Workflow at MangaAssist Scale

Assumptions:
- 1M messages/day, 5% require human review = 50,000 reviews/day
- Average review takes 3 minutes of curator time
- Step Functions Standard workflow

Step Functions:
- 50,000 executions/day x 30 days = 1.5M executions/month
- ~8 state transitions per execution = 12M transitions
- Cost: 12M x $0.025/1000 = $300/month

DynamoDB (feedback + review queue):
- Write: 50K reviews + 200K feedback items = 250K writes/day
- Read: 250K reads/day (queue queries, dashboard)
- On-demand: ~$350/month

Lambda (handlers):
- ~500K invocations/day across all handlers
- Average 200ms, 256MB
- Cost: ~$15/month

API Gateway (REST):
- 250K requests/day for feedback + review
- Cost: 7.5M requests/month x $3.50/million = ~$26/month

SNS (notifications):
- ~5K notifications/day (assignments, SLA alerts)
- Cost: ~$3/month

Total Human Review Infrastructure: ~$694/month
Per review item: ~$0.00046 (less than $0.001 per review)

Next: 02-feedback-augmentation-patterns.md -- Deep-dive into feedback loops, active learning, and human knowledge injection patterns.