LOCAL PREVIEW View on GitHub

Document Processing Systems and Internal Knowledge Tools

MangaAssist context: JP Manga store chatbot on AWS — Bedrock Claude 3 (Sonnet at $3/$15 per 1M tokens input/output, Haiku at $0.25/$1.25), OpenSearch Serverless (vector store), DynamoDB (sessions/products), ECS Fargate (orchestrator), API Gateway WebSocket, ElastiCache Redis. Target: useful answer in under 3 seconds, 1M messages/day scale.


Skill Mapping

Field Value
Certification AWS AIP-C01 (AI Practitioner)
Domain 2 — Implementation and Integration of Foundation Models
Task 2.5 — Describe methods to implement business system enhancements
Skill 2.5.3 — Create business system enhancements
Focus Document processing (OCR + FM), Q Business knowledge tools, Bedrock Data Automation catalog enrichment

Document Processing Deep-Dive: OCR + FM for Manga License Extraction

The Problem MangaAssist Solves

MangaAssist's licensing team processes hundreds of manga license agreements annually. These documents are:

  • Bilingual -- Japanese and English text mixed on the same page
  • Semi-structured -- standard legal formatting but varying layouts across publishers
  • High-stakes -- missed expiration dates or misread royalty rates cause financial and legal exposure
  • Volume-driven -- Shueisha, Kodansha, Shogakukan, and smaller publishers each have their own formats

Manual processing takes 2-4 hours per document. Errors average 3-5% in key fields. The target: automated extraction in under 5 minutes with <1% error rate on critical fields.

OCR + FM Pipeline Architecture

flowchart TB
    subgraph Upload["Document Upload"]
        S3[S3: manga-license-docs]
        EB[EventBridge: Object Created]
    end

    subgraph Preprocessing["Preprocessing Layer"]
        DET[Document Type Detection<br/>Lambda + Haiku]
        SPLIT[Page Splitter<br/>Lambda - PyMuPDF]
        ORIENT[Orientation Correction<br/>Lambda - Pillow]
    end

    subgraph OCR["OCR Layer"]
        TX_SYNC[Textract Sync API<br/>Single-page docs]
        TX_ASYNC[Textract Async API<br/>Multi-page docs]
        TX_QUERY[Textract Queries<br/>Targeted field extraction]
        TX_TABLE[Textract Tables<br/>Royalty schedule extraction]
    end

    subgraph FM["Foundation Model Layer"]
        HAI_CLASS[Haiku: Document Classification<br/>license/invoice/compliance]
        SON_EXTRACT[Sonnet: Clause Extraction<br/>Complex legal parsing]
        SON_TRANSLATE[Sonnet: Cross-language Validation<br/>JP clause vs EN clause match]
        HAI_VALIDATE[Haiku: Field Validation<br/>Date/rate/territory checks]
    end

    subgraph Storage["Storage & Indexing"]
        DDB_DOC[(DynamoDB: Documents)]
        DDB_LICENSE[(DynamoDB: Licenses)]
        S3_ARCHIVE[S3: Processed Archive]
        OS_INDEX[OpenSearch: License Search]
    end

    subgraph Actions["Downstream Actions"]
        APPROVE[Human Approval Queue]
        NOTIFY[SNS: Stakeholder Notification]
        CAL[Calendar: Expiration Alerts]
        DASH[Dashboard: License Status]
    end

    S3 --> EB --> DET
    DET --> SPLIT --> ORIENT
    ORIENT --> TX_SYNC & TX_ASYNC
    TX_ASYNC --> TX_QUERY & TX_TABLE
    TX_SYNC --> HAI_CLASS
    TX_QUERY & TX_TABLE --> SON_EXTRACT
    HAI_CLASS --> SON_EXTRACT
    SON_EXTRACT --> SON_TRANSLATE --> HAI_VALIDATE
    HAI_VALIDATE --> DDB_DOC & DDB_LICENSE & S3_ARCHIVE
    DDB_LICENSE --> OS_INDEX
    HAI_VALIDATE --> APPROVE & NOTIFY & CAL & DASH

Textract Configuration for Japanese Documents

Japanese manga license agreements present unique OCR challenges:

Challenge Textract Feature Configuration
Mixed JP/EN text DetectDocumentText Auto-detects both scripts
Vertical Japanese text AnalyzeDocument with LAYOUT Handles tate-gaki (vertical writing)
Hanko (seal stamps) AnalyzeDocument with SIGNATURES Detects seal impressions
Furigana (reading aids) Post-processing Filter small text above kanji
Era dates (令和6年) FM post-processing Convert to ISO format via Sonnet
Complex kanji names Queries feature Target specific name fields
Royalty tables AnalyzeDocument with TABLES Extract structured rate schedules

Textract Query Examples for License Fields

{
  "Document": {
    "S3Object": {
      "Bucket": "manga-license-docs",
      "Name": "incoming/kodansha-2024-q4.pdf"
    }
  },
  "FeatureTypes": ["QUERIES", "TABLES", "LAYOUT", "SIGNATURES"],
  "QueriesConfig": {
    "Queries": [
      {"Text": "What is the licensor name?", "Alias": "LICENSOR"},
      {"Text": "What is the licensee name?", "Alias": "LICENSEE"},
      {"Text": "What is the license start date?", "Alias": "START_DATE"},
      {"Text": "What is the license end date or expiration date?", "Alias": "END_DATE"},
      {"Text": "What is the royalty rate or royalty percentage?", "Alias": "ROYALTY_RATE"},
      {"Text": "What is the minimum guarantee amount?", "Alias": "MIN_GUARANTEE"},
      {"Text": "What territories are covered?", "Alias": "TERRITORIES"},
      {"Text": "ライセンサーの名前は?", "Alias": "LICENSOR_JP"},
      {"Text": "ライセンス期間の終了日は?", "Alias": "END_DATE_JP"},
      {"Text": "ロイヤリティ率は?", "Alias": "ROYALTY_RATE_JP"}
    ]
  }
}

LicenseDocumentProcessor

"""
LicenseDocumentProcessor — Comprehensive manga license document processing
with multi-stage OCR, FM-powered extraction, cross-language validation,
and automated compliance checking.

This module handles the full lifecycle of a license document from upload
through extraction, validation, and storage.
"""

import json
import os
import re
import hashlib
import logging
from datetime import datetime, timezone, timedelta
from decimal import Decimal
from typing import Any, Optional

import boto3
from botocore.config import Config

logger = logging.getLogger()
logger.setLevel(logging.INFO)

# --- AWS Clients ---
bedrock_runtime = boto3.client(
    "bedrock-runtime",
    config=Config(retries={"max_attempts": 3, "mode": "adaptive"}, read_timeout=60),
)
textract = boto3.client("textract")
s3 = boto3.client("s3")
dynamodb = boto3.resource("dynamodb")
sns = boto3.client("sns")

MODEL_HAIKU = "anthropic.claude-3-haiku-20240307-v1:0"
MODEL_SONNET = "anthropic.claude-3-sonnet-20240229-v1:0"

DOCUMENTS_TABLE = os.environ.get("DOCUMENTS_TABLE", "manga_documents")
LICENSES_TABLE = os.environ.get("LICENSES_TABLE", "manga_licenses")
RESULTS_BUCKET = os.environ.get("RESULTS_BUCKET", "manga-document-results")
NOTIFICATION_TOPIC = os.environ.get("NOTIFICATION_TOPIC_ARN", "")

doc_table = dynamodb.Table(DOCUMENTS_TABLE)
license_table = dynamodb.Table(LICENSES_TABLE)


# ============================================================
# Stage 1: Document Preprocessing
# ============================================================

def preprocess_document(bucket: str, key: str) -> dict:
    """
    Determine document characteristics before OCR.
    Returns metadata about page count, file size, and recommended OCR strategy.
    """
    head = s3.head_object(Bucket=bucket, Key=key)
    file_size_mb = head["ContentLength"] / (1024 * 1024)
    content_type = head.get("ContentType", "application/pdf")

    # Determine OCR strategy based on file size
    if file_size_mb < 5:
        ocr_strategy = "sync"
        max_pages_sync = 1  # Textract sync only supports 1 page for PDF
    else:
        ocr_strategy = "async"
        max_pages_sync = 0

    return {
        "bucket": bucket,
        "key": key,
        "file_size_mb": round(file_size_mb, 2),
        "content_type": content_type,
        "ocr_strategy": ocr_strategy,
        "max_pages_sync": max_pages_sync,
        "document_hash": hashlib.sha256(f"{bucket}/{key}".encode()).hexdigest()[:16],
    }


# ============================================================
# Stage 2: OCR Extraction with Textract
# ============================================================

def run_textract_with_queries(bucket: str, key: str) -> dict:
    """
    Run Textract async job with Queries, Tables, Layout, and Signatures features.
    Returns job ID for polling.
    """
    queries = [
        {"Text": "What is the licensor name?", "Alias": "LICENSOR"},
        {"Text": "What is the licensee name?", "Alias": "LICENSEE"},
        {"Text": "What is the license start date?", "Alias": "START_DATE"},
        {"Text": "What is the license end date or expiration date?", "Alias": "END_DATE"},
        {"Text": "What is the royalty rate or royalty percentage?", "Alias": "ROYALTY_RATE"},
        {"Text": "What is the minimum guarantee amount?", "Alias": "MIN_GUARANTEE"},
        {"Text": "What territories are covered?", "Alias": "TERRITORIES"},
        {"Text": "What manga titles are licensed?", "Alias": "MANGA_TITLES"},
        {"Text": "ライセンサーの名前は?", "Alias": "LICENSOR_JP"},
        {"Text": "ライセンス期間の終了日は?", "Alias": "END_DATE_JP"},
        {"Text": "ロイヤリティ率は?", "Alias": "ROYALTY_RATE_JP"},
        {"Text": "対象作品名は?", "Alias": "MANGA_TITLES_JP"},
    ]

    response = textract.start_document_analysis(
        DocumentLocation={"S3Object": {"Bucket": bucket, "Name": key}},
        FeatureTypes=["QUERIES", "TABLES", "LAYOUT", "SIGNATURES"],
        QueriesConfig={"Queries": queries},
        OutputConfig={
            "S3Bucket": RESULTS_BUCKET,
            "S3Prefix": f"textract-raw/{hashlib.md5(key.encode()).hexdigest()}/",
        },
    )

    return {"job_id": response["JobId"], "status": "IN_PROGRESS"}


def collect_textract_results(job_id: str) -> dict:
    """
    Collect all pages of Textract results for a completed async job.
    Extracts: raw text, query results, tables, and signature detections.
    """
    all_blocks = []
    next_token = None

    while True:
        params = {"JobId": job_id}
        if next_token:
            params["NextToken"] = next_token
        response = textract.get_document_analysis(**params)

        if response["JobStatus"] == "FAILED":
            raise RuntimeError(f"Textract job {job_id} failed: {response.get('StatusMessage')}")
        if response["JobStatus"] == "IN_PROGRESS":
            return {"status": "IN_PROGRESS"}

        all_blocks.extend(response.get("Blocks", []))
        next_token = response.get("NextToken")
        if not next_token:
            break

    # Parse blocks into structured output
    raw_text_lines = []
    query_results = {}
    tables = []
    signatures = []

    for block in all_blocks:
        block_type = block["BlockType"]

        if block_type == "LINE":
            raw_text_lines.append(block.get("Text", ""))

        elif block_type == "QUERY_RESULT":
            # Find the parent QUERY block to get the alias
            pass  # Handled below

        elif block_type == "QUERY":
            alias = block.get("Query", {}).get("Alias", "")
            # Find child QUERY_RESULT
            for rel in block.get("Relationships", []):
                if rel["Type"] == "ANSWER":
                    for answer_id in rel["Ids"]:
                        answer_block = next(
                            (b for b in all_blocks if b["Id"] == answer_id), None
                        )
                        if answer_block:
                            query_results[alias] = {
                                "text": answer_block.get("Text", ""),
                                "confidence": answer_block.get("Confidence", 0),
                            }

        elif block_type == "TABLE":
            table_data = _extract_table(block, all_blocks)
            if table_data:
                tables.append(table_data)

        elif block_type == "SIGNATURE":
            signatures.append({
                "page": block.get("Page", 0),
                "confidence": block.get("Confidence", 0),
                "geometry": block.get("Geometry", {}),
            })

    return {
        "status": "SUCCEEDED",
        "raw_text": "\n".join(raw_text_lines),
        "query_results": query_results,
        "tables": tables,
        "signatures": signatures,
        "total_pages": max((b.get("Page", 1) for b in all_blocks), default=1),
        "total_blocks": len(all_blocks),
    }


def _extract_table(table_block: dict, all_blocks: dict) -> list[list[str]]:
    """Extract a 2D table from Textract TABLE block and its children."""
    rows = {}
    for rel in table_block.get("Relationships", []):
        if rel["Type"] == "CHILD":
            for cell_id in rel["Ids"]:
                cell = next((b for b in all_blocks if b["Id"] == cell_id), None)
                if cell and cell["BlockType"] == "CELL":
                    row_idx = cell.get("RowIndex", 0)
                    col_idx = cell.get("ColumnIndex", 0)
                    # Get cell text from child WORD blocks
                    cell_text = ""
                    for cell_rel in cell.get("Relationships", []):
                        if cell_rel["Type"] == "CHILD":
                            words = [
                                next((b for b in all_blocks if b["Id"] == wid), {}).get("Text", "")
                                for wid in cell_rel["Ids"]
                            ]
                            cell_text = " ".join(words)
                    rows.setdefault(row_idx, {})[col_idx] = cell_text

    if not rows:
        return []

    max_col = max(max(cols.keys()) for cols in rows.values())
    return [
        [rows.get(r, {}).get(c, "") for c in range(1, max_col + 1)]
        for r in sorted(rows.keys())
    ]


# ============================================================
# Stage 3: FM-Powered Extraction with Sonnet
# ============================================================

LICENSE_EXTRACTION_PROMPT = """You are a specialist in Japanese manga publishing license agreements.
You have received OCR text and structured query results from a license document.

Your task: Extract all contractual terms into a precise JSON structure.

IMPORTANT RULES:
1. Japanese era dates must be converted to ISO format:
   - 令和 (Reiwa): 令和1年 = 2019, 令和6年 = 2024, 令和7年 = 2025
   - 平成 (Heisei): 平成31年 = 2019
2. Currency amounts: convert to USD if in JPY (use approximate rate 1 USD = 150 JPY)
3. Royalty rates: express as decimal (8% = 0.08)
4. If a field appears in both Japanese and English, extract BOTH and note any discrepancies
5. For manga titles, include both romanized and original Japanese titles

OCR Query Results (targeted extraction):
<queries>{query_results}</queries>

OCR Table Data:
<tables>{tables}</tables>

Full OCR Text:
<text>{raw_text}</text>

Signature Detection:
<signatures>{signatures}</signatures>

Return JSON with this exact structure:
{{
  "licensor": {{
    "name_en": "...",
    "name_jp": "...",
    "address": "...",
    "registration_number": "..."
  }},
  "licensee": {{
    "name_en": "...",
    "name_jp": "...",
    "address": "...",
    "registration_number": "..."
  }},
  "manga_titles": [
    {{
      "title_en": "...",
      "title_jp": "...",
      "author": "...",
      "volumes_start": 1,
      "volumes_end": null,
      "isbn_prefix": "..."
    }}
  ],
  "license_terms": {{
    "start_date": "YYYY-MM-DD",
    "end_date": "YYYY-MM-DD",
    "auto_renewal": true/false,
    "renewal_notice_days": 90,
    "territories": ["JP", "US", ...],
    "language_rights": ["ja", "en", ...],
    "digital_rights": true/false,
    "print_rights": true/false,
    "audio_rights": true/false,
    "merchandise_rights": true/false
  }},
  "financial_terms": {{
    "royalty_rate": 0.08,
    "royalty_base": "net_sales" or "gross_sales",
    "minimum_guarantee_usd": 50000,
    "advance_payment_usd": 10000,
    "payment_frequency": "quarterly",
    "currency": "JPY" or "USD",
    "print_run_minimum": 5000
  }},
  "key_clauses": [
    {{
      "clause_number": "5.1",
      "title": "...",
      "summary_en": "...",
      "summary_jp": "...",
      "risk_level": "low" or "medium" or "high"
    }}
  ],
  "termination_conditions": ["...", "..."],
  "signatures_detected": true/false,
  "extraction_confidence": 0.95,
  "discrepancies": [
    {{
      "field": "...",
      "en_value": "...",
      "jp_value": "...",
      "note": "..."
    }}
  ]
}}

Return ONLY valid JSON."""


def extract_license_with_sonnet(textract_results: dict) -> dict:
    """
    Use Claude 3 Sonnet for high-accuracy license data extraction.
    Sonnet chosen over Haiku for legal documents due to:
    - Better reasoning about complex clause structures
    - More accurate Japanese era date conversion
    - Higher reliability on financial figure extraction
    """
    prompt = LICENSE_EXTRACTION_PROMPT.format(
        query_results=json.dumps(textract_results.get("query_results", {}), ensure_ascii=False),
        tables=json.dumps(textract_results.get("tables", []), ensure_ascii=False),
        raw_text=textract_results.get("raw_text", "")[:40000],
        signatures=json.dumps(textract_results.get("signatures", []), ensure_ascii=False),
    )

    response = bedrock_runtime.invoke_model(
        modelId=MODEL_SONNET,
        contentType="application/json",
        accept="application/json",
        body=json.dumps({
            "anthropic_version": "bedrock-2023-05-31",
            "max_tokens": 4096,
            "temperature": 0.0,
            "messages": [{"role": "user", "content": prompt}],
        }),
    )

    result_text = json.loads(response["body"].read())["content"][0]["text"]

    # Parse response — handle markdown code blocks
    json_match = re.search(r"```(?:json)?\s*(.*?)\s*```", result_text, re.DOTALL)
    extracted = json.loads(json_match.group(1) if json_match else result_text)

    return extracted


# ============================================================
# Stage 4: Cross-Language Validation
# ============================================================

CROSS_LANGUAGE_PROMPT = """You are a bilingual (Japanese-English) legal document validator.

Compare the Japanese-language extracted fields against the English-language extracted fields
from the same manga license agreement. Identify any discrepancies.

Extracted data:
<data>{extracted_data}</data>

For each field that has both JP and EN values, verify:
1. Names match (accounting for romanization differences)
2. Dates match (accounting for era format vs ISO)
3. Financial figures match (accounting for currency conversion)
4. Territory descriptions are equivalent
5. Rights granted are the same in both languages

Return JSON with:
{{
  "validation_passed": true/false,
  "field_comparisons": [
    {{
      "field": "licensor.name",
      "en_value": "...",
      "jp_value": "...",
      "match": true/false,
      "note": "..."
    }}
  ],
  "critical_discrepancies": [
    {{
      "field": "...",
      "severity": "critical" or "warning",
      "description": "..."
    }}
  ],
  "overall_confidence": 0.95
}}

Return ONLY valid JSON."""


def cross_language_validate(extracted_data: dict) -> dict:
    """
    Validate extracted license data by comparing JP and EN fields.
    Uses Sonnet for nuanced bilingual comparison.
    """
    prompt = CROSS_LANGUAGE_PROMPT.format(
        extracted_data=json.dumps(extracted_data, ensure_ascii=False, indent=2)
    )

    response = bedrock_runtime.invoke_model(
        modelId=MODEL_SONNET,
        contentType="application/json",
        accept="application/json",
        body=json.dumps({
            "anthropic_version": "bedrock-2023-05-31",
            "max_tokens": 2048,
            "temperature": 0.0,
            "messages": [{"role": "user", "content": prompt}],
        }),
    )

    result_text = json.loads(response["body"].read())["content"][0]["text"]
    json_match = re.search(r"```(?:json)?\s*(.*?)\s*```", result_text, re.DOTALL)
    validation = json.loads(json_match.group(1) if json_match else result_text)

    return validation


# ============================================================
# Stage 5: Business Rule Validation
# ============================================================

def validate_business_rules(extracted_data: dict) -> list[dict]:
    """
    Apply MangaAssist-specific business rules to extracted license data.
    Returns a list of validation findings.
    """
    findings = []
    now = datetime.now(timezone.utc)

    # --- License term validations ---
    terms = extracted_data.get("license_terms", {})

    # Check end date is in the future
    end_date_str = terms.get("end_date", "")
    if end_date_str:
        try:
            end_date = datetime.fromisoformat(end_date_str).replace(tzinfo=timezone.utc)
            if end_date < now:
                findings.append({
                    "rule": "LICENSE_EXPIRED",
                    "severity": "critical",
                    "field": "license_terms.end_date",
                    "message": f"License end date {end_date_str} is in the past",
                    "value": end_date_str,
                })
            elif end_date < now + timedelta(days=90):
                findings.append({
                    "rule": "LICENSE_EXPIRING_SOON",
                    "severity": "warning",
                    "field": "license_terms.end_date",
                    "message": f"License expires within 90 days: {end_date_str}",
                    "value": end_date_str,
                })
        except ValueError:
            findings.append({
                "rule": "INVALID_DATE_FORMAT",
                "severity": "critical",
                "field": "license_terms.end_date",
                "message": f"Cannot parse end date: {end_date_str}",
                "value": end_date_str,
            })

    # --- Financial validations ---
    financial = extracted_data.get("financial_terms", {})

    royalty_rate = financial.get("royalty_rate", 0)
    if isinstance(royalty_rate, (int, float)):
        if royalty_rate < 0.03 or royalty_rate > 0.25:
            findings.append({
                "rule": "UNUSUAL_ROYALTY_RATE",
                "severity": "warning",
                "field": "financial_terms.royalty_rate",
                "message": f"Royalty rate {royalty_rate} outside normal range (3%-25%)",
                "value": str(royalty_rate),
            })

    min_guarantee = financial.get("minimum_guarantee_usd", 0)
    if isinstance(min_guarantee, (int, float)) and min_guarantee > 500000:
        findings.append({
            "rule": "HIGH_MINIMUM_GUARANTEE",
            "severity": "warning",
            "field": "financial_terms.minimum_guarantee_usd",
            "message": f"Minimum guarantee ${min_guarantee:,.0f} exceeds review threshold",
            "value": str(min_guarantee),
        })

    # --- Content validations ---
    titles = extracted_data.get("manga_titles", [])
    if not titles:
        findings.append({
            "rule": "NO_TITLES_EXTRACTED",
            "severity": "critical",
            "field": "manga_titles",
            "message": "No manga titles were extracted from the document",
            "value": "empty",
        })

    for idx, title in enumerate(titles):
        if not title.get("title_en") and not title.get("title_jp"):
            findings.append({
                "rule": "MISSING_TITLE_NAME",
                "severity": "critical",
                "field": f"manga_titles[{idx}]",
                "message": f"Title at index {idx} has no name in either language",
                "value": json.dumps(title),
            })

    # --- Signature validation ---
    if not extracted_data.get("signatures_detected"):
        findings.append({
            "rule": "NO_SIGNATURES_DETECTED",
            "severity": "warning",
            "field": "signatures_detected",
            "message": "No signatures or hanko seals detected — document may be unsigned",
            "value": "false",
        })

    # --- Territory validation ---
    territories = terms.get("territories", [])
    if not territories:
        findings.append({
            "rule": "NO_TERRITORIES",
            "severity": "critical",
            "field": "license_terms.territories",
            "message": "No territories specified in the license",
            "value": "empty",
        })

    return findings


# ============================================================
# Stage 6: Storage and Notification
# ============================================================

def store_processed_license(
    document_id: str,
    extracted_data: dict,
    cross_lang_validation: dict,
    business_findings: list[dict],
    textract_results: dict,
    source_info: dict,
) -> dict:
    """Persist all processing results to DynamoDB and S3."""
    now_iso = datetime.now(timezone.utc).isoformat()

    critical_findings = [f for f in business_findings if f["severity"] == "critical"]
    status = "requires_review" if critical_findings else "ready_for_approval"

    # DynamoDB: License record
    license_record = {
        "license_id": document_id,
        "licensor": extracted_data.get("licensor", {}),
        "licensee": extracted_data.get("licensee", {}),
        "manga_titles": extracted_data.get("manga_titles", []),
        "license_terms": extracted_data.get("license_terms", {}),
        "financial_terms": extracted_data.get("financial_terms", {}),
        "status": status,
        "extraction_confidence": Decimal(str(extracted_data.get("extraction_confidence", 0))),
        "cross_language_confidence": Decimal(str(cross_lang_validation.get("overall_confidence", 0))),
        "critical_findings_count": len(critical_findings),
        "total_findings_count": len(business_findings),
        "source_bucket": source_info.get("bucket", ""),
        "source_key": source_info.get("key", ""),
        "processed_at": now_iso,
    }

    safe_record = json.loads(json.dumps(license_record, default=str), parse_float=Decimal)
    license_table.put_item(Item=safe_record)

    # S3: Full processing archive (includes raw text, all validations)
    archive = {
        "document_id": document_id,
        "extracted_data": extracted_data,
        "cross_language_validation": cross_lang_validation,
        "business_findings": business_findings,
        "textract_summary": {
            "total_pages": textract_results.get("total_pages"),
            "total_blocks": textract_results.get("total_blocks"),
            "query_results": textract_results.get("query_results"),
        },
        "processing_metadata": {
            "processed_at": now_iso,
            "extraction_model": MODEL_SONNET,
            "validation_model": MODEL_SONNET,
            "status": status,
        },
    }

    s3.put_object(
        Bucket=RESULTS_BUCKET,
        Key=f"processed-licenses/{document_id}/full-archive.json",
        Body=json.dumps(archive, ensure_ascii=False, indent=2, default=str),
        ContentType="application/json",
    )

    # Notify stakeholders
    if NOTIFICATION_TOPIC:
        sns.publish(
            TopicArn=NOTIFICATION_TOPIC,
            Subject=f"License Processed: {document_id} [{status}]",
            Message=json.dumps({
                "document_id": document_id,
                "status": status,
                "licensor": extracted_data.get("licensor", {}).get("name_en", "Unknown"),
                "manga_titles_count": len(extracted_data.get("manga_titles", [])),
                "critical_findings": len(critical_findings),
                "extraction_confidence": extracted_data.get("extraction_confidence", 0),
            }, indent=2),
        )

    return {"document_id": document_id, "status": status, "stored_at": now_iso}


# ============================================================
# Orchestrator: Full Pipeline
# ============================================================

def process_license_document(event: dict, context: Any) -> dict:
    """
    Full license document processing pipeline.
    Called by Step Functions or directly by S3 event.
    """
    bucket = event.get("bucket") or event.get("detail", {}).get("bucket", {}).get("name", "")
    key = event.get("key") or event.get("detail", {}).get("object", {}).get("key", "")
    document_id = event.get("document_id") or hashlib.sha256(f"{bucket}/{key}".encode()).hexdigest()[:16]

    logger.info("Processing license document: %s/%s (ID: %s)", bucket, key, document_id)

    # Stage 1: Preprocess
    doc_info = preprocess_document(bucket, key)

    # Stage 2: OCR
    textract_job = run_textract_with_queries(bucket, key)
    # In a Step Functions workflow, we would wait/poll here.
    # For direct invocation, we poll synchronously:
    import time
    for _ in range(60):
        results = collect_textract_results(textract_job["job_id"])
        if results["status"] != "IN_PROGRESS":
            break
        time.sleep(5)
    else:
        raise TimeoutError(f"Textract job {textract_job['job_id']} did not complete in 5 minutes")

    if results["status"] != "SUCCEEDED":
        raise RuntimeError(f"Textract failed for document {document_id}")

    # Stage 3: FM extraction
    extracted_data = extract_license_with_sonnet(results)

    # Stage 4: Cross-language validation
    cross_lang = cross_language_validate(extracted_data)

    # Stage 5: Business rules
    findings = validate_business_rules(extracted_data)

    # Stage 6: Store
    store_result = store_processed_license(
        document_id=document_id,
        extracted_data=extracted_data,
        cross_lang_validation=cross_lang,
        business_findings=findings,
        textract_results=results,
        source_info={"bucket": bucket, "key": key},
    )

    return {
        "document_id": document_id,
        "status": store_result["status"],
        "extraction_confidence": extracted_data.get("extraction_confidence", 0),
        "cross_language_confidence": cross_lang.get("overall_confidence", 0),
        "findings_count": len(findings),
        "critical_findings": len([f for f in findings if f["severity"] == "critical"]),
        "manga_titles_extracted": len(extracted_data.get("manga_titles", [])),
    }

Amazon Q Business Connector Patterns

Q Business Architecture for MangaAssist

flowchart TB
    subgraph Sources["Knowledge Sources"]
        S3_KB[S3: Knowledge Base Docs<br/>SOPs, Guides, FAQs]
        S3_LEGAL[S3: Legal Templates<br/>Contract templates, checklists]
        DDB_PUB[(DynamoDB: Publishers<br/>Contacts, relationships)]
        CONF[Confluence: Wiki<br/>Internal policies, runbooks]
        JIRA[Jira: Tickets<br/>Known issues, resolutions]
    end

    subgraph Connectors["Q Business Connectors"]
        C_S3[S3 Native Connector<br/>Crawl + index documents]
        C_CUSTOM[Custom Lambda Connector<br/>DynamoDB to documents]
        C_CONF[Confluence Connector<br/>Space + page crawl]
        C_JIRA[Web Crawler Connector<br/>Jira ticket extraction]
    end

    subgraph QBusiness["Amazon Q Business"]
        INGEST[Document Ingestion]
        INDEX[Q Business Index<br/>Semantic embeddings]
        RANK[Relevance Ranking<br/>BM25 + semantic]
        ACL[Access Control Layer<br/>IAM Identity Center]
        GUARD[Guardrails<br/>Topic blocking, PII filter]
    end

    subgraph Users["Internal Users"]
        CS_AGENT[Customer Support Agent<br/>"What's our return policy for damaged manga?"]
        LIC_MGR[Licensing Manager<br/>"Show me Kodansha contract renewal terms"]
        OPS[Operations Staff<br/>"How do I process a bulk import?"]
    end

    S3_KB --> C_S3
    S3_LEGAL --> C_S3
    DDB_PUB --> C_CUSTOM
    CONF --> C_CONF
    JIRA --> C_JIRA

    C_S3 & C_CUSTOM & C_CONF & C_JIRA --> INGEST
    INGEST --> INDEX
    INDEX --> RANK
    RANK --> ACL --> GUARD

    GUARD --> CS_AGENT & LIC_MGR & OPS

QBusinessIndexer

"""
QBusinessIndexer — Manages Amazon Q Business index operations for MangaAssist.
Handles document ingestion, access control configuration, relevance tuning,
and guardrail setup for the internal knowledge assistant.
"""

import json
import os
import logging
from datetime import datetime, timezone
from typing import Any, Optional

import boto3

logger = logging.getLogger()
logger.setLevel(logging.INFO)

qbusiness = boto3.client("qbusiness")
iam_idc = boto3.client("sso-admin")
s3 = boto3.client("s3")

APPLICATION_ID = os.environ["Q_APPLICATION_ID"]
INDEX_ID = os.environ["Q_INDEX_ID"]
IDC_INSTANCE_ARN = os.environ.get("IDC_INSTANCE_ARN", "")


# ---------- Index Management ----------

def create_manga_index(application_id: str) -> dict:
    """Create the Q Business index with MangaAssist-specific field configuration."""
    response = qbusiness.create_index(
        applicationId=application_id,
        displayName="MangaAssist Internal Knowledge",
        description="Internal knowledge base for MangaAssist operations",
        capacityConfiguration={"units": 1},  # Scale up as document count grows
    )

    index_id = response["indexId"]
    logger.info("Created Q Business index: %s", index_id)

    # Add custom document attributes for faceted search
    custom_attributes = [
        {
            "name": "department",
            "type": "STRING",
            "search": {"displayable": True, "facetable": True, "searchable": True},
        },
        {
            "name": "document_category",
            "type": "STRING",
            "search": {"displayable": True, "facetable": True, "searchable": True},
        },
        {
            "name": "publisher_name",
            "type": "STRING",
            "search": {"displayable": True, "facetable": True, "searchable": True},
        },
        {
            "name": "content_language",
            "type": "STRING",
            "search": {"displayable": True, "facetable": True, "searchable": False},
        },
        {
            "name": "last_reviewed_date",
            "type": "DATE",
            "search": {"displayable": True, "facetable": False, "searchable": False},
        },
        {
            "name": "confidence_level",
            "type": "STRING",
            "search": {"displayable": True, "facetable": True, "searchable": False},
        },
    ]

    for attr in custom_attributes:
        try:
            qbusiness.update_index(
                applicationId=application_id,
                indexId=index_id,
                documentAttributeConfigurations=[attr],
            )
        except Exception as e:
            logger.warning("Failed to add attribute %s: %s", attr["name"], str(e))

    return {"index_id": index_id, "attributes_configured": len(custom_attributes)}


# ---------- Access Control ----------

DEPARTMENT_ACCESS_MAP = {
    "customer_support": {
        "allowed_categories": ["faq", "sop", "product_info", "return_policy", "troubleshooting"],
        "blocked_categories": ["financial", "legal_contract", "hr_policy"],
    },
    "licensing": {
        "allowed_categories": ["legal_contract", "publisher_info", "licensing_guide", "compliance"],
        "blocked_categories": ["hr_policy", "engineering_runbook"],
    },
    "operations": {
        "allowed_categories": ["sop", "inventory_guide", "shipping_procedure", "troubleshooting"],
        "blocked_categories": ["legal_contract", "financial", "hr_policy"],
    },
    "management": {
        "allowed_categories": ["all"],
        "blocked_categories": [],
    },
}


def configure_access_controls(application_id: str, index_id: str) -> dict:
    """
    Set up department-level access control for Q Business.
    Maps IAM Identity Center groups to document access levels.
    """
    results = {}

    for department, access_config in DEPARTMENT_ACCESS_MAP.items():
        try:
            # Q Business uses attribute-based access control
            # Documents are filtered based on the user's group membership
            # and the document's 'department' attribute
            if "all" in access_config["allowed_categories"]:
                # Management has unrestricted access
                filter_config = None
            else:
                filter_config = {
                    "andAllFilters": [
                        {
                            "equalsTo": {
                                "key": "department",
                                "value": {"stringValue": department},
                            }
                        }
                    ]
                }

            results[department] = {
                "configured": True,
                "allowed": access_config["allowed_categories"],
                "blocked": access_config["blocked_categories"],
                "filter": filter_config,
            }

        except Exception as e:
            logger.error("Failed to configure access for %s: %s", department, str(e))
            results[department] = {"configured": False, "error": str(e)}

    return results


# ---------- Relevance Tuning ----------

def configure_relevance_tuning(application_id: str, index_id: str) -> dict:
    """
    Configure relevance boosting for MangaAssist-specific search patterns.
    Prioritizes recent documents, high-confidence content, and language match.
    """
    tuning_config = {
        "document_freshness": {
            "enabled": True,
            "description": "Boost recently updated documents — stale policies are dangerous",
        },
        "field_boosts": [
            {
                "field": "last_reviewed_date",
                "boost_factor": 2.0,
                "description": "Documents reviewed within last 6 months get 2x boost",
            },
            {
                "field": "confidence_level",
                "boost_values": {"verified": 3.0, "reviewed": 2.0, "draft": 0.5},
                "description": "Verified docs boosted 3x, drafts demoted",
            },
            {
                "field": "document_category",
                "boost_values": {"sop": 2.0, "faq": 1.5, "archive": 0.3},
                "description": "SOPs and FAQs boosted, archived content demoted",
            },
        ],
    }

    logger.info("Relevance tuning configured: %s", json.dumps(tuning_config))
    return tuning_config


# ---------- Guardrails ----------

def configure_guardrails(application_id: str) -> dict:
    """
    Set up Q Business guardrails to prevent inappropriate responses.
    Blocks topics outside MangaAssist's business scope and filters PII.
    """
    # Topic blocking — prevent Q from answering non-business questions
    blocked_topics = [
        "personal_advice",
        "medical_advice",
        "legal_advice_to_customers",
        "competitor_pricing",
        "employee_compensation",
        "unreleased_products",
    ]

    # PII filtering — redact sensitive information in responses
    pii_filters = [
        "CREDIT_CARD_NUMBER",
        "BANK_ACCOUNT_NUMBER",
        "SSN",
        "PHONE_NUMBER",
        "EMAIL_ADDRESS",
    ]

    # Response length and attribution requirements
    response_config = {
        "max_response_length": 2000,
        "require_source_attribution": True,
        "min_confidence_threshold": 0.3,
        "fallback_message": (
            "I don't have enough information to answer that question confidently. "
            "Please check with your team lead or submit a request to the knowledge team."
        ),
    }

    config = {
        "blocked_topics": blocked_topics,
        "pii_filters": pii_filters,
        "response_config": response_config,
    }

    logger.info("Guardrails configured: %d blocked topics, %d PII filters",
                len(blocked_topics), len(pii_filters))
    return config


# ---------- Document Sync Lambda ----------

def sync_knowledge_documents(event: dict, context: Any) -> dict:
    """
    Lambda handler for Q Business document sync.
    Called by EventBridge on schedule or S3 events.
    Converts various document formats into Q Business documents.
    """
    source_bucket = event.get("bucket", "manga-knowledge-base")
    prefix = event.get("prefix", "docs/")
    category = event.get("category", "general")
    department = event.get("department", "all")

    # List documents in S3
    paginator = s3.get_paginator("list_objects_v2")
    documents = []

    for page in paginator.paginate(Bucket=source_bucket, Prefix=prefix):
        for obj in page.get("Contents", []):
            key = obj["Key"]
            # Skip directories and unsupported files
            if key.endswith("/"):
                continue
            ext = key.rsplit(".", 1)[-1].lower() if "." in key else ""
            if ext not in ("pdf", "md", "txt", "html", "docx"):
                continue

            doc_id = f"s3-{source_bucket}-{key}".replace("/", "-").replace(".", "-")
            title = key.split("/")[-1].rsplit(".", 1)[0].replace("-", " ").replace("_", " ").title()

            documents.append({
                "id": doc_id,
                "title": title,
                "s3Path": {
                    "bucket": source_bucket,
                    "key": key,
                },
                "attributes": [
                    {"key": "department", "value": {"stringValue": department}},
                    {"key": "document_category", "value": {"stringValue": category}},
                    {"key": "content_language", "value": {"stringValue": "ja-en"}},
                    {"key": "confidence_level", "value": {"stringValue": "reviewed"}},
                    {"key": "_source_uri", "value": {"stringValue": f"s3://{source_bucket}/{key}"}},
                ],
                "contentType": "PLAIN_TEXT" if ext in ("txt", "md") else "PDF",
            })

    # Batch ingest (Q Business supports batch operations)
    batch_size = 10
    ingested = 0
    for i in range(0, len(documents), batch_size):
        batch = documents[i:i + batch_size]
        try:
            qbusiness.batch_put_document(
                applicationId=APPLICATION_ID,
                indexId=INDEX_ID,
                documents=batch,
            )
            ingested += len(batch)
        except Exception:
            logger.exception("Failed to ingest batch starting at index %d", i)

    result = {
        "source": f"s3://{source_bucket}/{prefix}",
        "documents_found": len(documents),
        "documents_ingested": ingested,
        "category": category,
        "department": department,
        "timestamp": datetime.now(timezone.utc).isoformat(),
    }

    logger.info("Knowledge sync complete: %s", json.dumps(result))
    return result

Data Automation Workflows for Catalog Enrichment

Catalog Enrichment Pipeline Architecture

flowchart TB
    subgraph Input["Data Input Sources"]
        PUB_CSV[Publisher Catalog CSV]
        COVER_IMG[Manga Cover Images]
        WIKI_DATA[Wikidata / MAL API]
        SALES_DATA[Sales History CSV]
    end

    subgraph Trigger["Pipeline Triggers"]
        S3_EVENT[S3 Object Created]
        EB_SCHED[EventBridge Schedule<br/>Daily 2 AM JST]
        MANUAL[Manual API Trigger]
    end

    subgraph Enrichment["Enrichment Pipeline"]
        PARSE[Parse Raw Data<br/>Lambda: CSV/JSON parser]
        DEDUP[Deduplication<br/>Lambda: ISBN + title match]
        FM_ENRICH[FM Enrichment<br/>Bedrock Haiku:<br/>genres, synopsis, keywords]
        COVER_ANALYZE[Cover Analysis<br/>Bedrock Sonnet (vision):<br/>art style, content rating]
        CROSS_REF[Cross-Reference<br/>Lambda: MAL/Wikidata lookup]
        QUALITY[Quality Check<br/>Lambda: completeness scoring]
    end

    subgraph Storage["Enriched Output"]
        DDB_PROD[(DynamoDB: Products<br/>Enriched catalog)]
        OS_VEC[OpenSearch: Vectors<br/>Semantic search embeddings]
        S3_ARCH[S3: Processing Archive]
        REDIS[ElastiCache Redis<br/>Hot product cache]
    end

    PUB_CSV --> S3_EVENT
    COVER_IMG --> S3_EVENT
    WIKI_DATA --> EB_SCHED
    SALES_DATA --> S3_EVENT

    S3_EVENT & EB_SCHED & MANUAL --> PARSE
    PARSE --> DEDUP --> FM_ENRICH
    FM_ENRICH --> COVER_ANALYZE --> CROSS_REF --> QUALITY

    QUALITY --> DDB_PROD & OS_VEC & S3_ARCH & REDIS

CatalogEnrichmentPipeline

"""
CatalogEnrichmentPipeline — Bedrock Data Automation integration for
enriching MangaAssist's product catalog with AI-generated metadata.

Processes publisher catalogs, cover images, and external data sources
to create rich, searchable product records with genre tags, synopses,
search keywords, and content ratings.
"""

import json
import os
import csv
import io
import re
import hashlib
import logging
from datetime import datetime, timezone
from decimal import Decimal
from typing import Any, Optional
from base64 import b64encode

import boto3
from botocore.config import Config

logger = logging.getLogger()
logger.setLevel(logging.INFO)

bedrock_runtime = boto3.client(
    "bedrock-runtime",
    config=Config(retries={"max_attempts": 3, "mode": "adaptive"}, read_timeout=30),
)
s3 = boto3.client("s3")
dynamodb = boto3.resource("dynamodb")

MODEL_HAIKU = "anthropic.claude-3-haiku-20240307-v1:0"
MODEL_SONNET = "anthropic.claude-3-sonnet-20240229-v1:0"

PRODUCTS_TABLE = os.environ.get("PRODUCTS_TABLE", "manga_products")
CATALOG_BUCKET = os.environ.get("CATALOG_BUCKET", "manga-catalog-data")
COVER_BUCKET = os.environ.get("COVER_BUCKET", "manga-cover-images")

products_table = dynamodb.Table(PRODUCTS_TABLE)


# ============================================================
# Text-Based Catalog Enrichment (Haiku)
# ============================================================

ENRICHMENT_PROMPT = """You are a manga catalog specialist for a Japanese e-commerce store.
Given raw catalog data for a manga title, generate rich metadata for search and recommendation.

Raw catalog entry:
<entry>{entry}</entry>

Generate JSON with:
{{
  "title_en": "English title (romanized if no official EN title)",
  "title_jp": "Japanese title in original script",
  "title_romaji": "Romanized Japanese title",
  "author_en": "Author name (romanized)",
  "author_jp": "Author name in Japanese",
  "artist_en": "Artist name if different from author",
  "publisher_en": "Publisher name (romanized)",
  "publisher_jp": "Publisher name in Japanese",
  "genres": ["primary genre", "secondary genre"],
  "sub_genres": ["more specific genre tags"],
  "themes": ["thematic elements"],
  "demographics": "shonen/shojo/seinen/josei/kodomo",
  "age_rating": "all_ages/teen/older_teen/mature",
  "synopsis_en": "2-3 sentence English synopsis. Be descriptive and engaging.",
  "synopsis_jp": "2-3 sentence Japanese synopsis. Natural, engaging Japanese.",
  "search_keywords_en": ["10-15 English search keywords"],
  "search_keywords_jp": ["10-15 Japanese search keywords"],
  "similar_titles": ["3-5 similar manga for recommendation"],
  "content_warnings": ["any applicable warnings"],
  "reading_direction": "right_to_left/left_to_right",
  "serialization_status": "ongoing/completed/hiatus",
  "estimated_reading_time_minutes": 30,
  "mood_tags": ["action-packed", "heartwarming", etc.]
}}

IMPORTANT:
- Generate Japanese text in natural, fluent Japanese (not machine-translated)
- Include both common and niche search keywords
- Similar titles should be well-known manga with genuine similarity
- If unsure about a field, use null rather than guessing

Return ONLY valid JSON."""


def enrich_single_entry(raw_entry: dict) -> dict:
    """Enrich one catalog entry using Bedrock Haiku."""
    prompt = ENRICHMENT_PROMPT.format(entry=json.dumps(raw_entry, ensure_ascii=False))

    response = bedrock_runtime.invoke_model(
        modelId=MODEL_HAIKU,
        contentType="application/json",
        accept="application/json",
        body=json.dumps({
            "anthropic_version": "bedrock-2023-05-31",
            "max_tokens": 1024,
            "temperature": 0.3,
            "messages": [{"role": "user", "content": prompt}],
        }),
    )

    result_text = json.loads(response["body"].read())["content"][0]["text"]
    json_match = re.search(r"```(?:json)?\s*(.*?)\s*```", result_text, re.DOTALL)
    return json.loads(json_match.group(1) if json_match else result_text)


# ============================================================
# Cover Image Analysis (Sonnet with Vision)
# ============================================================

COVER_ANALYSIS_PROMPT = """Analyze this manga cover image and extract visual metadata.

Return JSON with:
{{
  "art_style": "realistic/semi-realistic/chibi/minimalist/detailed/sketchy",
  "color_palette": "vibrant/dark/pastel/monochrome/warm/cool",
  "dominant_colors": ["color1", "color2", "color3"],
  "character_count": 0,
  "character_genders": ["male", "female", ...],
  "character_ages": ["teen", "adult", ...],
  "setting_type": "urban/rural/fantasy/school/space/historical/none",
  "mood": "action/romance/mystery/horror/comedy/dramatic/peaceful",
  "visual_content_rating": "all_ages/teen/older_teen/mature",
  "content_flags": ["violence", "fan_service", etc. or empty array],
  "text_on_cover": {{
    "title_text": "detected title text",
    "author_text": "detected author text",
    "volume_number": "detected volume number or null"
  }},
  "visual_quality_score": 0.85,
  "genre_from_visuals": ["inferred genres based on art"],
  "comparable_art_style": "similar to [known manga artist/series]"
}}

Analyze the cover thoroughly. Focus on elements that help categorize the manga for search and recommendation."""


def analyze_cover_image(bucket: str, key: str) -> dict:
    """
    Analyze a manga cover image using Bedrock Sonnet's vision capabilities.
    Returns visual metadata for the product catalog.
    """
    # Download image from S3
    obj = s3.get_object(Bucket=bucket, Key=key)
    image_bytes = obj["Body"].read()
    image_b64 = b64encode(image_bytes).decode("utf-8")

    # Determine media type
    ext = key.rsplit(".", 1)[-1].lower()
    media_type_map = {"jpg": "image/jpeg", "jpeg": "image/jpeg", "png": "image/png", "webp": "image/webp"}
    media_type = media_type_map.get(ext, "image/jpeg")

    response = bedrock_runtime.invoke_model(
        modelId=MODEL_SONNET,
        contentType="application/json",
        accept="application/json",
        body=json.dumps({
            "anthropic_version": "bedrock-2023-05-31",
            "max_tokens": 1024,
            "temperature": 0.2,
            "messages": [{
                "role": "user",
                "content": [
                    {
                        "type": "image",
                        "source": {
                            "type": "base64",
                            "media_type": media_type,
                            "data": image_b64,
                        },
                    },
                    {"type": "text", "text": COVER_ANALYSIS_PROMPT},
                ],
            }],
        }),
    )

    result_text = json.loads(response["body"].read())["content"][0]["text"]
    json_match = re.search(r"```(?:json)?\s*(.*?)\s*```", result_text, re.DOTALL)
    return json.loads(json_match.group(1) if json_match else result_text)


# ============================================================
# Deduplication
# ============================================================

def deduplicate_entry(
    isbn: str,
    title_en: str,
    title_jp: str,
) -> Optional[dict]:
    """
    Check if a product already exists in the catalog.
    Returns existing record if found, None if new.
    Uses ISBN as primary key, with title fuzzy match as fallback.
    """
    if isbn:
        try:
            response = products_table.get_item(Key={"product_id": isbn})
            if "Item" in response:
                return dict(response["Item"])
        except Exception:
            pass

    # Fallback: scan for title match (expensive — only for ISBN-less entries)
    if title_en or title_jp:
        try:
            filter_parts = []
            expr_values = {}
            if title_en:
                filter_parts.append("contains(title_en, :ten)")
                expr_values[":ten"] = title_en
            if title_jp:
                filter_parts.append("contains(title_jp, :tjp)")
                expr_values[":tjp"] = title_jp

            response = products_table.scan(
                FilterExpression=" OR ".join(filter_parts),
                ExpressionAttributeValues=expr_values,
                Limit=5,
            )
            items = response.get("Items", [])
            if items:
                return dict(items[0])
        except Exception:
            pass

    return None


# ============================================================
# Quality Scoring
# ============================================================

def compute_quality_score(enriched: dict, cover_analysis: Optional[dict] = None) -> dict:
    """
    Score the completeness and quality of an enriched product record.
    Returns a quality score (0.0-1.0) and a list of missing/weak fields.
    """
    checks = {
        "title_en": bool(enriched.get("title_en")),
        "title_jp": bool(enriched.get("title_jp")),
        "author": bool(enriched.get("author_en")),
        "genres": bool(enriched.get("genres")) and len(enriched.get("genres", [])) >= 1,
        "synopsis_en": bool(enriched.get("synopsis_en")) and len(enriched.get("synopsis_en", "")) > 50,
        "synopsis_jp": bool(enriched.get("synopsis_jp")) and len(enriched.get("synopsis_jp", "")) > 20,
        "search_keywords": bool(enriched.get("search_keywords_en")) and len(enriched.get("search_keywords_en", [])) >= 5,
        "age_rating": bool(enriched.get("age_rating")),
        "demographics": bool(enriched.get("demographics")),
        "similar_titles": bool(enriched.get("similar_titles")) and len(enriched.get("similar_titles", [])) >= 2,
        "cover_analyzed": cover_analysis is not None,
    }

    # Weighted scoring
    weights = {
        "title_en": 0.15, "title_jp": 0.15, "author": 0.10,
        "genres": 0.10, "synopsis_en": 0.10, "synopsis_jp": 0.05,
        "search_keywords": 0.10, "age_rating": 0.05, "demographics": 0.05,
        "similar_titles": 0.05, "cover_analyzed": 0.10,
    }

    score = sum(weights[k] for k, v in checks.items() if v)
    missing = [k for k, v in checks.items() if not v]

    return {
        "quality_score": round(score, 3),
        "checks_passed": sum(1 for v in checks.values() if v),
        "checks_total": len(checks),
        "missing_fields": missing,
        "grade": "A" if score >= 0.9 else "B" if score >= 0.7 else "C" if score >= 0.5 else "D",
    }


# ============================================================
# Full Pipeline Handler
# ============================================================

def process_catalog_batch(event: dict, context: Any) -> dict:
    """
    Process a batch of catalog entries from a CSV upload.
    Enriches each entry, analyzes covers (if available), deduplicates,
    quality-scores, and stores in DynamoDB.
    """
    bucket = event.get("bucket", CATALOG_BUCKET)
    key = event.get("key", "")

    if not key:
        # Scheduled run: process any unprocessed files
        return {"status": "no_key_provided", "message": "Provide a specific CSV key to process"}

    # Download and parse CSV
    obj = s3.get_object(Bucket=bucket, Key=key)
    csv_content = obj["Body"].read().decode("utf-8-sig")
    reader = csv.DictReader(io.StringIO(csv_content))

    stats = {
        "total": 0,
        "enriched": 0,
        "deduplicated": 0,
        "cover_analyzed": 0,
        "errors": 0,
        "quality_scores": [],
    }

    for row in reader:
        stats["total"] += 1
        isbn = row.get("isbn", row.get("ISBN", ""))
        title = row.get("title", row.get("Title", ""))

        try:
            # Step 1: Deduplication
            existing = deduplicate_entry(isbn, title, row.get("title_jp", ""))
            if existing:
                stats["deduplicated"] += 1
                # Merge: keep existing data, update with new raw data
                logger.info("Duplicate found for ISBN %s — merging", isbn)

            # Step 2: FM Enrichment
            enriched = enrich_single_entry(row)
            stats["enriched"] += 1

            # Step 3: Cover analysis (if cover image exists)
            cover_analysis = None
            cover_key = f"covers/{isbn}.jpg" if isbn else None
            if cover_key:
                try:
                    s3.head_object(Bucket=COVER_BUCKET, Key=cover_key)
                    cover_analysis = analyze_cover_image(COVER_BUCKET, cover_key)
                    stats["cover_analyzed"] += 1
                except s3.exceptions.ClientError:
                    # No cover image available
                    pass

            # Step 4: Quality scoring
            quality = compute_quality_score(enriched, cover_analysis)
            stats["quality_scores"].append(quality["quality_score"])

            # Step 5: Build final product record
            product_id = isbn or hashlib.sha256(title.encode()).hexdigest()[:16]
            product_record = {
                "product_id": product_id,
                "isbn": isbn,
                "raw_catalog_data": row,
                **enriched,
                "cover_analysis": cover_analysis,
                "quality": quality,
                "source_file": key,
                "enriched_at": datetime.now(timezone.utc).isoformat(),
                "enrichment_models": {
                    "text": MODEL_HAIKU,
                    "vision": MODEL_SONNET if cover_analysis else None,
                },
            }

            # Remove None values and convert for DynamoDB
            product_record = {k: v for k, v in product_record.items() if v is not None}
            safe_record = json.loads(json.dumps(product_record, default=str), parse_float=Decimal)
            products_table.put_item(Item=safe_record)

        except Exception:
            logger.exception("Error processing entry: %s", title or isbn or f"row-{stats['total']}")
            stats["errors"] += 1

    # Compute average quality
    if stats["quality_scores"]:
        stats["avg_quality_score"] = round(
            sum(stats["quality_scores"]) / len(stats["quality_scores"]), 3
        )
    else:
        stats["avg_quality_score"] = 0.0

    del stats["quality_scores"]  # Don't return the full list
    stats["timestamp"] = datetime.now(timezone.utc).isoformat()
    stats["source_file"] = key

    # Archive results
    result_key = f"enrichment-results/{key.split('/')[-1]}-{datetime.now(timezone.utc).strftime('%Y%m%d-%H%M%S')}.json"
    s3.put_object(
        Bucket=bucket,
        Key=result_key,
        Body=json.dumps(stats, indent=2),
        ContentType="application/json",
    )

    logger.info("Catalog enrichment pipeline complete: %s", json.dumps(stats))
    return stats

Comparison: OCR Approaches for Japanese Documents

Approach Accuracy (JP) Speed Cost Best For
Textract DetectDocumentText 85-92% Fast (sync) $1.50/1K pages Simple text extraction
Textract AnalyzeDocument + Layout 88-95% Medium (async) $6.50/1K pages Complex layouts with tables
Textract Queries 90-97% on target fields Medium $15/1K pages Known field extraction
Textract + Sonnet post-processing 95-99% on key fields Slow $1.50 + FM cost High-accuracy legal documents
Sonnet Vision (direct) 80-90% Fast ~$0.02/page Quick analysis of simple pages

MangaAssist's pipeline uses Textract + Sonnet post-processing for license agreements (high-stakes, accuracy-critical) and Textract DetectDocumentText for routine documents (invoices, catalogs).


Q Business vs RAG Comparison for Internal Knowledge

Dimension Amazon Q Business Custom RAG (OpenSearch + Bedrock)
Setup time Hours (managed service) Days-weeks (build pipeline)
Connector ecosystem S3, Confluence, SharePoint, Jira, Salesforce Custom build for each source
Access control IAM Identity Center integration Custom ACL implementation
Index management Fully managed Manual shard/replica tuning
Relevance tuning Limited but sufficient Full control over scoring
Cost at 50 users ~$150/month ~$500-1000/month (OpenSearch + compute)
Customization Limited guardrails + plugins Fully customizable
Latency 1-3 seconds 500ms-2 seconds
Best for Internal knowledge bases, quick deployment Customer-facing RAG, complex retrieval logic

MangaAssist uses Q Business for internal teams (fast to deploy, IAM-integrated, no engineering maintenance) and custom RAG for the customer chatbot (sub-second latency, custom ranking, personalization).


Key Takeaways

  1. Textract Queries are the highest-accuracy approach for extracting specific fields from Japanese legal documents
  2. Sonnet for legal extraction -- the 12x cost premium over Haiku is justified for high-stakes license documents where a misread royalty rate has direct financial impact
  3. Q Business connectors -- S3 native connector handles 80% of use cases; custom Lambda connectors bridge DynamoDB and other sources
  4. Cover image analysis via Sonnet Vision adds genre and art-style metadata that pure text enrichment misses
  5. Quality scoring ensures every product record meets a minimum completeness bar before going live
  6. Cross-language validation catches discrepancies between JP and EN sections of bilingual contracts