Document Processing Systems and Internal Knowledge Tools
MangaAssist context: JP Manga store chatbot on AWS — Bedrock Claude 3 (Sonnet at $3/$15 per 1M tokens input/output, Haiku at $0.25/$1.25), OpenSearch Serverless (vector store), DynamoDB (sessions/products), ECS Fargate (orchestrator), API Gateway WebSocket, ElastiCache Redis. Target: useful answer in under 3 seconds, 1M messages/day scale.
Skill Mapping
| Field | Value |
|---|---|
| Certification | AWS AIP-C01 (AI Practitioner) |
| Domain | 2 — Implementation and Integration of Foundation Models |
| Task | 2.5 — Describe methods to implement business system enhancements |
| Skill | 2.5.3 — Create business system enhancements |
| Focus | Document processing (OCR + FM), Q Business knowledge tools, Bedrock Data Automation catalog enrichment |
Document Processing Deep-Dive: OCR + FM for Manga License Extraction
The Problem MangaAssist Solves
MangaAssist's licensing team processes hundreds of manga license agreements annually. These documents are:
- Bilingual -- Japanese and English text mixed on the same page
- Semi-structured -- standard legal formatting but varying layouts across publishers
- High-stakes -- missed expiration dates or misread royalty rates cause financial and legal exposure
- Volume-driven -- Shueisha, Kodansha, Shogakukan, and smaller publishers each have their own formats
Manual processing takes 2-4 hours per document. Errors average 3-5% in key fields. The target: automated extraction in under 5 minutes with <1% error rate on critical fields.
OCR + FM Pipeline Architecture
flowchart TB
subgraph Upload["Document Upload"]
S3[S3: manga-license-docs]
EB[EventBridge: Object Created]
end
subgraph Preprocessing["Preprocessing Layer"]
DET[Document Type Detection<br/>Lambda + Haiku]
SPLIT[Page Splitter<br/>Lambda - PyMuPDF]
ORIENT[Orientation Correction<br/>Lambda - Pillow]
end
subgraph OCR["OCR Layer"]
TX_SYNC[Textract Sync API<br/>Single-page docs]
TX_ASYNC[Textract Async API<br/>Multi-page docs]
TX_QUERY[Textract Queries<br/>Targeted field extraction]
TX_TABLE[Textract Tables<br/>Royalty schedule extraction]
end
subgraph FM["Foundation Model Layer"]
HAI_CLASS[Haiku: Document Classification<br/>license/invoice/compliance]
SON_EXTRACT[Sonnet: Clause Extraction<br/>Complex legal parsing]
SON_TRANSLATE[Sonnet: Cross-language Validation<br/>JP clause vs EN clause match]
HAI_VALIDATE[Haiku: Field Validation<br/>Date/rate/territory checks]
end
subgraph Storage["Storage & Indexing"]
DDB_DOC[(DynamoDB: Documents)]
DDB_LICENSE[(DynamoDB: Licenses)]
S3_ARCHIVE[S3: Processed Archive]
OS_INDEX[OpenSearch: License Search]
end
subgraph Actions["Downstream Actions"]
APPROVE[Human Approval Queue]
NOTIFY[SNS: Stakeholder Notification]
CAL[Calendar: Expiration Alerts]
DASH[Dashboard: License Status]
end
S3 --> EB --> DET
DET --> SPLIT --> ORIENT
ORIENT --> TX_SYNC & TX_ASYNC
TX_ASYNC --> TX_QUERY & TX_TABLE
TX_SYNC --> HAI_CLASS
TX_QUERY & TX_TABLE --> SON_EXTRACT
HAI_CLASS --> SON_EXTRACT
SON_EXTRACT --> SON_TRANSLATE --> HAI_VALIDATE
HAI_VALIDATE --> DDB_DOC & DDB_LICENSE & S3_ARCHIVE
DDB_LICENSE --> OS_INDEX
HAI_VALIDATE --> APPROVE & NOTIFY & CAL & DASH
Textract Configuration for Japanese Documents
Japanese manga license agreements present unique OCR challenges:
| Challenge | Textract Feature | Configuration |
|---|---|---|
| Mixed JP/EN text | DetectDocumentText |
Auto-detects both scripts |
| Vertical Japanese text | AnalyzeDocument with LAYOUT |
Handles tate-gaki (vertical writing) |
| Hanko (seal stamps) | AnalyzeDocument with SIGNATURES |
Detects seal impressions |
| Furigana (reading aids) | Post-processing | Filter small text above kanji |
| Era dates (令和6年) | FM post-processing | Convert to ISO format via Sonnet |
| Complex kanji names | Queries feature |
Target specific name fields |
| Royalty tables | AnalyzeDocument with TABLES |
Extract structured rate schedules |
Textract Query Examples for License Fields
{
"Document": {
"S3Object": {
"Bucket": "manga-license-docs",
"Name": "incoming/kodansha-2024-q4.pdf"
}
},
"FeatureTypes": ["QUERIES", "TABLES", "LAYOUT", "SIGNATURES"],
"QueriesConfig": {
"Queries": [
{"Text": "What is the licensor name?", "Alias": "LICENSOR"},
{"Text": "What is the licensee name?", "Alias": "LICENSEE"},
{"Text": "What is the license start date?", "Alias": "START_DATE"},
{"Text": "What is the license end date or expiration date?", "Alias": "END_DATE"},
{"Text": "What is the royalty rate or royalty percentage?", "Alias": "ROYALTY_RATE"},
{"Text": "What is the minimum guarantee amount?", "Alias": "MIN_GUARANTEE"},
{"Text": "What territories are covered?", "Alias": "TERRITORIES"},
{"Text": "ライセンサーの名前は?", "Alias": "LICENSOR_JP"},
{"Text": "ライセンス期間の終了日は?", "Alias": "END_DATE_JP"},
{"Text": "ロイヤリティ率は?", "Alias": "ROYALTY_RATE_JP"}
]
}
}
LicenseDocumentProcessor
"""
LicenseDocumentProcessor — Comprehensive manga license document processing
with multi-stage OCR, FM-powered extraction, cross-language validation,
and automated compliance checking.
This module handles the full lifecycle of a license document from upload
through extraction, validation, and storage.
"""
import json
import os
import re
import hashlib
import logging
from datetime import datetime, timezone, timedelta
from decimal import Decimal
from typing import Any, Optional
import boto3
from botocore.config import Config
logger = logging.getLogger()
logger.setLevel(logging.INFO)
# --- AWS Clients ---
bedrock_runtime = boto3.client(
"bedrock-runtime",
config=Config(retries={"max_attempts": 3, "mode": "adaptive"}, read_timeout=60),
)
textract = boto3.client("textract")
s3 = boto3.client("s3")
dynamodb = boto3.resource("dynamodb")
sns = boto3.client("sns")
MODEL_HAIKU = "anthropic.claude-3-haiku-20240307-v1:0"
MODEL_SONNET = "anthropic.claude-3-sonnet-20240229-v1:0"
DOCUMENTS_TABLE = os.environ.get("DOCUMENTS_TABLE", "manga_documents")
LICENSES_TABLE = os.environ.get("LICENSES_TABLE", "manga_licenses")
RESULTS_BUCKET = os.environ.get("RESULTS_BUCKET", "manga-document-results")
NOTIFICATION_TOPIC = os.environ.get("NOTIFICATION_TOPIC_ARN", "")
doc_table = dynamodb.Table(DOCUMENTS_TABLE)
license_table = dynamodb.Table(LICENSES_TABLE)
# ============================================================
# Stage 1: Document Preprocessing
# ============================================================
def preprocess_document(bucket: str, key: str) -> dict:
"""
Determine document characteristics before OCR.
Returns metadata about page count, file size, and recommended OCR strategy.
"""
head = s3.head_object(Bucket=bucket, Key=key)
file_size_mb = head["ContentLength"] / (1024 * 1024)
content_type = head.get("ContentType", "application/pdf")
# Determine OCR strategy based on file size
if file_size_mb < 5:
ocr_strategy = "sync"
max_pages_sync = 1 # Textract sync only supports 1 page for PDF
else:
ocr_strategy = "async"
max_pages_sync = 0
return {
"bucket": bucket,
"key": key,
"file_size_mb": round(file_size_mb, 2),
"content_type": content_type,
"ocr_strategy": ocr_strategy,
"max_pages_sync": max_pages_sync,
"document_hash": hashlib.sha256(f"{bucket}/{key}".encode()).hexdigest()[:16],
}
# ============================================================
# Stage 2: OCR Extraction with Textract
# ============================================================
def run_textract_with_queries(bucket: str, key: str) -> dict:
"""
Run Textract async job with Queries, Tables, Layout, and Signatures features.
Returns job ID for polling.
"""
queries = [
{"Text": "What is the licensor name?", "Alias": "LICENSOR"},
{"Text": "What is the licensee name?", "Alias": "LICENSEE"},
{"Text": "What is the license start date?", "Alias": "START_DATE"},
{"Text": "What is the license end date or expiration date?", "Alias": "END_DATE"},
{"Text": "What is the royalty rate or royalty percentage?", "Alias": "ROYALTY_RATE"},
{"Text": "What is the minimum guarantee amount?", "Alias": "MIN_GUARANTEE"},
{"Text": "What territories are covered?", "Alias": "TERRITORIES"},
{"Text": "What manga titles are licensed?", "Alias": "MANGA_TITLES"},
{"Text": "ライセンサーの名前は?", "Alias": "LICENSOR_JP"},
{"Text": "ライセンス期間の終了日は?", "Alias": "END_DATE_JP"},
{"Text": "ロイヤリティ率は?", "Alias": "ROYALTY_RATE_JP"},
{"Text": "対象作品名は?", "Alias": "MANGA_TITLES_JP"},
]
response = textract.start_document_analysis(
DocumentLocation={"S3Object": {"Bucket": bucket, "Name": key}},
FeatureTypes=["QUERIES", "TABLES", "LAYOUT", "SIGNATURES"],
QueriesConfig={"Queries": queries},
OutputConfig={
"S3Bucket": RESULTS_BUCKET,
"S3Prefix": f"textract-raw/{hashlib.md5(key.encode()).hexdigest()}/",
},
)
return {"job_id": response["JobId"], "status": "IN_PROGRESS"}
def collect_textract_results(job_id: str) -> dict:
"""
Collect all pages of Textract results for a completed async job.
Extracts: raw text, query results, tables, and signature detections.
"""
all_blocks = []
next_token = None
while True:
params = {"JobId": job_id}
if next_token:
params["NextToken"] = next_token
response = textract.get_document_analysis(**params)
if response["JobStatus"] == "FAILED":
raise RuntimeError(f"Textract job {job_id} failed: {response.get('StatusMessage')}")
if response["JobStatus"] == "IN_PROGRESS":
return {"status": "IN_PROGRESS"}
all_blocks.extend(response.get("Blocks", []))
next_token = response.get("NextToken")
if not next_token:
break
# Parse blocks into structured output
raw_text_lines = []
query_results = {}
tables = []
signatures = []
for block in all_blocks:
block_type = block["BlockType"]
if block_type == "LINE":
raw_text_lines.append(block.get("Text", ""))
elif block_type == "QUERY_RESULT":
# Find the parent QUERY block to get the alias
pass # Handled below
elif block_type == "QUERY":
alias = block.get("Query", {}).get("Alias", "")
# Find child QUERY_RESULT
for rel in block.get("Relationships", []):
if rel["Type"] == "ANSWER":
for answer_id in rel["Ids"]:
answer_block = next(
(b for b in all_blocks if b["Id"] == answer_id), None
)
if answer_block:
query_results[alias] = {
"text": answer_block.get("Text", ""),
"confidence": answer_block.get("Confidence", 0),
}
elif block_type == "TABLE":
table_data = _extract_table(block, all_blocks)
if table_data:
tables.append(table_data)
elif block_type == "SIGNATURE":
signatures.append({
"page": block.get("Page", 0),
"confidence": block.get("Confidence", 0),
"geometry": block.get("Geometry", {}),
})
return {
"status": "SUCCEEDED",
"raw_text": "\n".join(raw_text_lines),
"query_results": query_results,
"tables": tables,
"signatures": signatures,
"total_pages": max((b.get("Page", 1) for b in all_blocks), default=1),
"total_blocks": len(all_blocks),
}
def _extract_table(table_block: dict, all_blocks: dict) -> list[list[str]]:
"""Extract a 2D table from Textract TABLE block and its children."""
rows = {}
for rel in table_block.get("Relationships", []):
if rel["Type"] == "CHILD":
for cell_id in rel["Ids"]:
cell = next((b for b in all_blocks if b["Id"] == cell_id), None)
if cell and cell["BlockType"] == "CELL":
row_idx = cell.get("RowIndex", 0)
col_idx = cell.get("ColumnIndex", 0)
# Get cell text from child WORD blocks
cell_text = ""
for cell_rel in cell.get("Relationships", []):
if cell_rel["Type"] == "CHILD":
words = [
next((b for b in all_blocks if b["Id"] == wid), {}).get("Text", "")
for wid in cell_rel["Ids"]
]
cell_text = " ".join(words)
rows.setdefault(row_idx, {})[col_idx] = cell_text
if not rows:
return []
max_col = max(max(cols.keys()) for cols in rows.values())
return [
[rows.get(r, {}).get(c, "") for c in range(1, max_col + 1)]
for r in sorted(rows.keys())
]
# ============================================================
# Stage 3: FM-Powered Extraction with Sonnet
# ============================================================
LICENSE_EXTRACTION_PROMPT = """You are a specialist in Japanese manga publishing license agreements.
You have received OCR text and structured query results from a license document.
Your task: Extract all contractual terms into a precise JSON structure.
IMPORTANT RULES:
1. Japanese era dates must be converted to ISO format:
- 令和 (Reiwa): 令和1年 = 2019, 令和6年 = 2024, 令和7年 = 2025
- 平成 (Heisei): 平成31年 = 2019
2. Currency amounts: convert to USD if in JPY (use approximate rate 1 USD = 150 JPY)
3. Royalty rates: express as decimal (8% = 0.08)
4. If a field appears in both Japanese and English, extract BOTH and note any discrepancies
5. For manga titles, include both romanized and original Japanese titles
OCR Query Results (targeted extraction):
<queries>{query_results}</queries>
OCR Table Data:
<tables>{tables}</tables>
Full OCR Text:
<text>{raw_text}</text>
Signature Detection:
<signatures>{signatures}</signatures>
Return JSON with this exact structure:
{{
"licensor": {{
"name_en": "...",
"name_jp": "...",
"address": "...",
"registration_number": "..."
}},
"licensee": {{
"name_en": "...",
"name_jp": "...",
"address": "...",
"registration_number": "..."
}},
"manga_titles": [
{{
"title_en": "...",
"title_jp": "...",
"author": "...",
"volumes_start": 1,
"volumes_end": null,
"isbn_prefix": "..."
}}
],
"license_terms": {{
"start_date": "YYYY-MM-DD",
"end_date": "YYYY-MM-DD",
"auto_renewal": true/false,
"renewal_notice_days": 90,
"territories": ["JP", "US", ...],
"language_rights": ["ja", "en", ...],
"digital_rights": true/false,
"print_rights": true/false,
"audio_rights": true/false,
"merchandise_rights": true/false
}},
"financial_terms": {{
"royalty_rate": 0.08,
"royalty_base": "net_sales" or "gross_sales",
"minimum_guarantee_usd": 50000,
"advance_payment_usd": 10000,
"payment_frequency": "quarterly",
"currency": "JPY" or "USD",
"print_run_minimum": 5000
}},
"key_clauses": [
{{
"clause_number": "5.1",
"title": "...",
"summary_en": "...",
"summary_jp": "...",
"risk_level": "low" or "medium" or "high"
}}
],
"termination_conditions": ["...", "..."],
"signatures_detected": true/false,
"extraction_confidence": 0.95,
"discrepancies": [
{{
"field": "...",
"en_value": "...",
"jp_value": "...",
"note": "..."
}}
]
}}
Return ONLY valid JSON."""
def extract_license_with_sonnet(textract_results: dict) -> dict:
"""
Use Claude 3 Sonnet for high-accuracy license data extraction.
Sonnet chosen over Haiku for legal documents due to:
- Better reasoning about complex clause structures
- More accurate Japanese era date conversion
- Higher reliability on financial figure extraction
"""
prompt = LICENSE_EXTRACTION_PROMPT.format(
query_results=json.dumps(textract_results.get("query_results", {}), ensure_ascii=False),
tables=json.dumps(textract_results.get("tables", []), ensure_ascii=False),
raw_text=textract_results.get("raw_text", "")[:40000],
signatures=json.dumps(textract_results.get("signatures", []), ensure_ascii=False),
)
response = bedrock_runtime.invoke_model(
modelId=MODEL_SONNET,
contentType="application/json",
accept="application/json",
body=json.dumps({
"anthropic_version": "bedrock-2023-05-31",
"max_tokens": 4096,
"temperature": 0.0,
"messages": [{"role": "user", "content": prompt}],
}),
)
result_text = json.loads(response["body"].read())["content"][0]["text"]
# Parse response — handle markdown code blocks
json_match = re.search(r"```(?:json)?\s*(.*?)\s*```", result_text, re.DOTALL)
extracted = json.loads(json_match.group(1) if json_match else result_text)
return extracted
# ============================================================
# Stage 4: Cross-Language Validation
# ============================================================
CROSS_LANGUAGE_PROMPT = """You are a bilingual (Japanese-English) legal document validator.
Compare the Japanese-language extracted fields against the English-language extracted fields
from the same manga license agreement. Identify any discrepancies.
Extracted data:
<data>{extracted_data}</data>
For each field that has both JP and EN values, verify:
1. Names match (accounting for romanization differences)
2. Dates match (accounting for era format vs ISO)
3. Financial figures match (accounting for currency conversion)
4. Territory descriptions are equivalent
5. Rights granted are the same in both languages
Return JSON with:
{{
"validation_passed": true/false,
"field_comparisons": [
{{
"field": "licensor.name",
"en_value": "...",
"jp_value": "...",
"match": true/false,
"note": "..."
}}
],
"critical_discrepancies": [
{{
"field": "...",
"severity": "critical" or "warning",
"description": "..."
}}
],
"overall_confidence": 0.95
}}
Return ONLY valid JSON."""
def cross_language_validate(extracted_data: dict) -> dict:
"""
Validate extracted license data by comparing JP and EN fields.
Uses Sonnet for nuanced bilingual comparison.
"""
prompt = CROSS_LANGUAGE_PROMPT.format(
extracted_data=json.dumps(extracted_data, ensure_ascii=False, indent=2)
)
response = bedrock_runtime.invoke_model(
modelId=MODEL_SONNET,
contentType="application/json",
accept="application/json",
body=json.dumps({
"anthropic_version": "bedrock-2023-05-31",
"max_tokens": 2048,
"temperature": 0.0,
"messages": [{"role": "user", "content": prompt}],
}),
)
result_text = json.loads(response["body"].read())["content"][0]["text"]
json_match = re.search(r"```(?:json)?\s*(.*?)\s*```", result_text, re.DOTALL)
validation = json.loads(json_match.group(1) if json_match else result_text)
return validation
# ============================================================
# Stage 5: Business Rule Validation
# ============================================================
def validate_business_rules(extracted_data: dict) -> list[dict]:
"""
Apply MangaAssist-specific business rules to extracted license data.
Returns a list of validation findings.
"""
findings = []
now = datetime.now(timezone.utc)
# --- License term validations ---
terms = extracted_data.get("license_terms", {})
# Check end date is in the future
end_date_str = terms.get("end_date", "")
if end_date_str:
try:
end_date = datetime.fromisoformat(end_date_str).replace(tzinfo=timezone.utc)
if end_date < now:
findings.append({
"rule": "LICENSE_EXPIRED",
"severity": "critical",
"field": "license_terms.end_date",
"message": f"License end date {end_date_str} is in the past",
"value": end_date_str,
})
elif end_date < now + timedelta(days=90):
findings.append({
"rule": "LICENSE_EXPIRING_SOON",
"severity": "warning",
"field": "license_terms.end_date",
"message": f"License expires within 90 days: {end_date_str}",
"value": end_date_str,
})
except ValueError:
findings.append({
"rule": "INVALID_DATE_FORMAT",
"severity": "critical",
"field": "license_terms.end_date",
"message": f"Cannot parse end date: {end_date_str}",
"value": end_date_str,
})
# --- Financial validations ---
financial = extracted_data.get("financial_terms", {})
royalty_rate = financial.get("royalty_rate", 0)
if isinstance(royalty_rate, (int, float)):
if royalty_rate < 0.03 or royalty_rate > 0.25:
findings.append({
"rule": "UNUSUAL_ROYALTY_RATE",
"severity": "warning",
"field": "financial_terms.royalty_rate",
"message": f"Royalty rate {royalty_rate} outside normal range (3%-25%)",
"value": str(royalty_rate),
})
min_guarantee = financial.get("minimum_guarantee_usd", 0)
if isinstance(min_guarantee, (int, float)) and min_guarantee > 500000:
findings.append({
"rule": "HIGH_MINIMUM_GUARANTEE",
"severity": "warning",
"field": "financial_terms.minimum_guarantee_usd",
"message": f"Minimum guarantee ${min_guarantee:,.0f} exceeds review threshold",
"value": str(min_guarantee),
})
# --- Content validations ---
titles = extracted_data.get("manga_titles", [])
if not titles:
findings.append({
"rule": "NO_TITLES_EXTRACTED",
"severity": "critical",
"field": "manga_titles",
"message": "No manga titles were extracted from the document",
"value": "empty",
})
for idx, title in enumerate(titles):
if not title.get("title_en") and not title.get("title_jp"):
findings.append({
"rule": "MISSING_TITLE_NAME",
"severity": "critical",
"field": f"manga_titles[{idx}]",
"message": f"Title at index {idx} has no name in either language",
"value": json.dumps(title),
})
# --- Signature validation ---
if not extracted_data.get("signatures_detected"):
findings.append({
"rule": "NO_SIGNATURES_DETECTED",
"severity": "warning",
"field": "signatures_detected",
"message": "No signatures or hanko seals detected — document may be unsigned",
"value": "false",
})
# --- Territory validation ---
territories = terms.get("territories", [])
if not territories:
findings.append({
"rule": "NO_TERRITORIES",
"severity": "critical",
"field": "license_terms.territories",
"message": "No territories specified in the license",
"value": "empty",
})
return findings
# ============================================================
# Stage 6: Storage and Notification
# ============================================================
def store_processed_license(
document_id: str,
extracted_data: dict,
cross_lang_validation: dict,
business_findings: list[dict],
textract_results: dict,
source_info: dict,
) -> dict:
"""Persist all processing results to DynamoDB and S3."""
now_iso = datetime.now(timezone.utc).isoformat()
critical_findings = [f for f in business_findings if f["severity"] == "critical"]
status = "requires_review" if critical_findings else "ready_for_approval"
# DynamoDB: License record
license_record = {
"license_id": document_id,
"licensor": extracted_data.get("licensor", {}),
"licensee": extracted_data.get("licensee", {}),
"manga_titles": extracted_data.get("manga_titles", []),
"license_terms": extracted_data.get("license_terms", {}),
"financial_terms": extracted_data.get("financial_terms", {}),
"status": status,
"extraction_confidence": Decimal(str(extracted_data.get("extraction_confidence", 0))),
"cross_language_confidence": Decimal(str(cross_lang_validation.get("overall_confidence", 0))),
"critical_findings_count": len(critical_findings),
"total_findings_count": len(business_findings),
"source_bucket": source_info.get("bucket", ""),
"source_key": source_info.get("key", ""),
"processed_at": now_iso,
}
safe_record = json.loads(json.dumps(license_record, default=str), parse_float=Decimal)
license_table.put_item(Item=safe_record)
# S3: Full processing archive (includes raw text, all validations)
archive = {
"document_id": document_id,
"extracted_data": extracted_data,
"cross_language_validation": cross_lang_validation,
"business_findings": business_findings,
"textract_summary": {
"total_pages": textract_results.get("total_pages"),
"total_blocks": textract_results.get("total_blocks"),
"query_results": textract_results.get("query_results"),
},
"processing_metadata": {
"processed_at": now_iso,
"extraction_model": MODEL_SONNET,
"validation_model": MODEL_SONNET,
"status": status,
},
}
s3.put_object(
Bucket=RESULTS_BUCKET,
Key=f"processed-licenses/{document_id}/full-archive.json",
Body=json.dumps(archive, ensure_ascii=False, indent=2, default=str),
ContentType="application/json",
)
# Notify stakeholders
if NOTIFICATION_TOPIC:
sns.publish(
TopicArn=NOTIFICATION_TOPIC,
Subject=f"License Processed: {document_id} [{status}]",
Message=json.dumps({
"document_id": document_id,
"status": status,
"licensor": extracted_data.get("licensor", {}).get("name_en", "Unknown"),
"manga_titles_count": len(extracted_data.get("manga_titles", [])),
"critical_findings": len(critical_findings),
"extraction_confidence": extracted_data.get("extraction_confidence", 0),
}, indent=2),
)
return {"document_id": document_id, "status": status, "stored_at": now_iso}
# ============================================================
# Orchestrator: Full Pipeline
# ============================================================
def process_license_document(event: dict, context: Any) -> dict:
"""
Full license document processing pipeline.
Called by Step Functions or directly by S3 event.
"""
bucket = event.get("bucket") or event.get("detail", {}).get("bucket", {}).get("name", "")
key = event.get("key") or event.get("detail", {}).get("object", {}).get("key", "")
document_id = event.get("document_id") or hashlib.sha256(f"{bucket}/{key}".encode()).hexdigest()[:16]
logger.info("Processing license document: %s/%s (ID: %s)", bucket, key, document_id)
# Stage 1: Preprocess
doc_info = preprocess_document(bucket, key)
# Stage 2: OCR
textract_job = run_textract_with_queries(bucket, key)
# In a Step Functions workflow, we would wait/poll here.
# For direct invocation, we poll synchronously:
import time
for _ in range(60):
results = collect_textract_results(textract_job["job_id"])
if results["status"] != "IN_PROGRESS":
break
time.sleep(5)
else:
raise TimeoutError(f"Textract job {textract_job['job_id']} did not complete in 5 minutes")
if results["status"] != "SUCCEEDED":
raise RuntimeError(f"Textract failed for document {document_id}")
# Stage 3: FM extraction
extracted_data = extract_license_with_sonnet(results)
# Stage 4: Cross-language validation
cross_lang = cross_language_validate(extracted_data)
# Stage 5: Business rules
findings = validate_business_rules(extracted_data)
# Stage 6: Store
store_result = store_processed_license(
document_id=document_id,
extracted_data=extracted_data,
cross_lang_validation=cross_lang,
business_findings=findings,
textract_results=results,
source_info={"bucket": bucket, "key": key},
)
return {
"document_id": document_id,
"status": store_result["status"],
"extraction_confidence": extracted_data.get("extraction_confidence", 0),
"cross_language_confidence": cross_lang.get("overall_confidence", 0),
"findings_count": len(findings),
"critical_findings": len([f for f in findings if f["severity"] == "critical"]),
"manga_titles_extracted": len(extracted_data.get("manga_titles", [])),
}
Amazon Q Business Connector Patterns
Q Business Architecture for MangaAssist
flowchart TB
subgraph Sources["Knowledge Sources"]
S3_KB[S3: Knowledge Base Docs<br/>SOPs, Guides, FAQs]
S3_LEGAL[S3: Legal Templates<br/>Contract templates, checklists]
DDB_PUB[(DynamoDB: Publishers<br/>Contacts, relationships)]
CONF[Confluence: Wiki<br/>Internal policies, runbooks]
JIRA[Jira: Tickets<br/>Known issues, resolutions]
end
subgraph Connectors["Q Business Connectors"]
C_S3[S3 Native Connector<br/>Crawl + index documents]
C_CUSTOM[Custom Lambda Connector<br/>DynamoDB to documents]
C_CONF[Confluence Connector<br/>Space + page crawl]
C_JIRA[Web Crawler Connector<br/>Jira ticket extraction]
end
subgraph QBusiness["Amazon Q Business"]
INGEST[Document Ingestion]
INDEX[Q Business Index<br/>Semantic embeddings]
RANK[Relevance Ranking<br/>BM25 + semantic]
ACL[Access Control Layer<br/>IAM Identity Center]
GUARD[Guardrails<br/>Topic blocking, PII filter]
end
subgraph Users["Internal Users"]
CS_AGENT[Customer Support Agent<br/>"What's our return policy for damaged manga?"]
LIC_MGR[Licensing Manager<br/>"Show me Kodansha contract renewal terms"]
OPS[Operations Staff<br/>"How do I process a bulk import?"]
end
S3_KB --> C_S3
S3_LEGAL --> C_S3
DDB_PUB --> C_CUSTOM
CONF --> C_CONF
JIRA --> C_JIRA
C_S3 & C_CUSTOM & C_CONF & C_JIRA --> INGEST
INGEST --> INDEX
INDEX --> RANK
RANK --> ACL --> GUARD
GUARD --> CS_AGENT & LIC_MGR & OPS
QBusinessIndexer
"""
QBusinessIndexer — Manages Amazon Q Business index operations for MangaAssist.
Handles document ingestion, access control configuration, relevance tuning,
and guardrail setup for the internal knowledge assistant.
"""
import json
import os
import logging
from datetime import datetime, timezone
from typing import Any, Optional
import boto3
logger = logging.getLogger()
logger.setLevel(logging.INFO)
qbusiness = boto3.client("qbusiness")
iam_idc = boto3.client("sso-admin")
s3 = boto3.client("s3")
APPLICATION_ID = os.environ["Q_APPLICATION_ID"]
INDEX_ID = os.environ["Q_INDEX_ID"]
IDC_INSTANCE_ARN = os.environ.get("IDC_INSTANCE_ARN", "")
# ---------- Index Management ----------
def create_manga_index(application_id: str) -> dict:
"""Create the Q Business index with MangaAssist-specific field configuration."""
response = qbusiness.create_index(
applicationId=application_id,
displayName="MangaAssist Internal Knowledge",
description="Internal knowledge base for MangaAssist operations",
capacityConfiguration={"units": 1}, # Scale up as document count grows
)
index_id = response["indexId"]
logger.info("Created Q Business index: %s", index_id)
# Add custom document attributes for faceted search
custom_attributes = [
{
"name": "department",
"type": "STRING",
"search": {"displayable": True, "facetable": True, "searchable": True},
},
{
"name": "document_category",
"type": "STRING",
"search": {"displayable": True, "facetable": True, "searchable": True},
},
{
"name": "publisher_name",
"type": "STRING",
"search": {"displayable": True, "facetable": True, "searchable": True},
},
{
"name": "content_language",
"type": "STRING",
"search": {"displayable": True, "facetable": True, "searchable": False},
},
{
"name": "last_reviewed_date",
"type": "DATE",
"search": {"displayable": True, "facetable": False, "searchable": False},
},
{
"name": "confidence_level",
"type": "STRING",
"search": {"displayable": True, "facetable": True, "searchable": False},
},
]
for attr in custom_attributes:
try:
qbusiness.update_index(
applicationId=application_id,
indexId=index_id,
documentAttributeConfigurations=[attr],
)
except Exception as e:
logger.warning("Failed to add attribute %s: %s", attr["name"], str(e))
return {"index_id": index_id, "attributes_configured": len(custom_attributes)}
# ---------- Access Control ----------
DEPARTMENT_ACCESS_MAP = {
"customer_support": {
"allowed_categories": ["faq", "sop", "product_info", "return_policy", "troubleshooting"],
"blocked_categories": ["financial", "legal_contract", "hr_policy"],
},
"licensing": {
"allowed_categories": ["legal_contract", "publisher_info", "licensing_guide", "compliance"],
"blocked_categories": ["hr_policy", "engineering_runbook"],
},
"operations": {
"allowed_categories": ["sop", "inventory_guide", "shipping_procedure", "troubleshooting"],
"blocked_categories": ["legal_contract", "financial", "hr_policy"],
},
"management": {
"allowed_categories": ["all"],
"blocked_categories": [],
},
}
def configure_access_controls(application_id: str, index_id: str) -> dict:
"""
Set up department-level access control for Q Business.
Maps IAM Identity Center groups to document access levels.
"""
results = {}
for department, access_config in DEPARTMENT_ACCESS_MAP.items():
try:
# Q Business uses attribute-based access control
# Documents are filtered based on the user's group membership
# and the document's 'department' attribute
if "all" in access_config["allowed_categories"]:
# Management has unrestricted access
filter_config = None
else:
filter_config = {
"andAllFilters": [
{
"equalsTo": {
"key": "department",
"value": {"stringValue": department},
}
}
]
}
results[department] = {
"configured": True,
"allowed": access_config["allowed_categories"],
"blocked": access_config["blocked_categories"],
"filter": filter_config,
}
except Exception as e:
logger.error("Failed to configure access for %s: %s", department, str(e))
results[department] = {"configured": False, "error": str(e)}
return results
# ---------- Relevance Tuning ----------
def configure_relevance_tuning(application_id: str, index_id: str) -> dict:
"""
Configure relevance boosting for MangaAssist-specific search patterns.
Prioritizes recent documents, high-confidence content, and language match.
"""
tuning_config = {
"document_freshness": {
"enabled": True,
"description": "Boost recently updated documents — stale policies are dangerous",
},
"field_boosts": [
{
"field": "last_reviewed_date",
"boost_factor": 2.0,
"description": "Documents reviewed within last 6 months get 2x boost",
},
{
"field": "confidence_level",
"boost_values": {"verified": 3.0, "reviewed": 2.0, "draft": 0.5},
"description": "Verified docs boosted 3x, drafts demoted",
},
{
"field": "document_category",
"boost_values": {"sop": 2.0, "faq": 1.5, "archive": 0.3},
"description": "SOPs and FAQs boosted, archived content demoted",
},
],
}
logger.info("Relevance tuning configured: %s", json.dumps(tuning_config))
return tuning_config
# ---------- Guardrails ----------
def configure_guardrails(application_id: str) -> dict:
"""
Set up Q Business guardrails to prevent inappropriate responses.
Blocks topics outside MangaAssist's business scope and filters PII.
"""
# Topic blocking — prevent Q from answering non-business questions
blocked_topics = [
"personal_advice",
"medical_advice",
"legal_advice_to_customers",
"competitor_pricing",
"employee_compensation",
"unreleased_products",
]
# PII filtering — redact sensitive information in responses
pii_filters = [
"CREDIT_CARD_NUMBER",
"BANK_ACCOUNT_NUMBER",
"SSN",
"PHONE_NUMBER",
"EMAIL_ADDRESS",
]
# Response length and attribution requirements
response_config = {
"max_response_length": 2000,
"require_source_attribution": True,
"min_confidence_threshold": 0.3,
"fallback_message": (
"I don't have enough information to answer that question confidently. "
"Please check with your team lead or submit a request to the knowledge team."
),
}
config = {
"blocked_topics": blocked_topics,
"pii_filters": pii_filters,
"response_config": response_config,
}
logger.info("Guardrails configured: %d blocked topics, %d PII filters",
len(blocked_topics), len(pii_filters))
return config
# ---------- Document Sync Lambda ----------
def sync_knowledge_documents(event: dict, context: Any) -> dict:
"""
Lambda handler for Q Business document sync.
Called by EventBridge on schedule or S3 events.
Converts various document formats into Q Business documents.
"""
source_bucket = event.get("bucket", "manga-knowledge-base")
prefix = event.get("prefix", "docs/")
category = event.get("category", "general")
department = event.get("department", "all")
# List documents in S3
paginator = s3.get_paginator("list_objects_v2")
documents = []
for page in paginator.paginate(Bucket=source_bucket, Prefix=prefix):
for obj in page.get("Contents", []):
key = obj["Key"]
# Skip directories and unsupported files
if key.endswith("/"):
continue
ext = key.rsplit(".", 1)[-1].lower() if "." in key else ""
if ext not in ("pdf", "md", "txt", "html", "docx"):
continue
doc_id = f"s3-{source_bucket}-{key}".replace("/", "-").replace(".", "-")
title = key.split("/")[-1].rsplit(".", 1)[0].replace("-", " ").replace("_", " ").title()
documents.append({
"id": doc_id,
"title": title,
"s3Path": {
"bucket": source_bucket,
"key": key,
},
"attributes": [
{"key": "department", "value": {"stringValue": department}},
{"key": "document_category", "value": {"stringValue": category}},
{"key": "content_language", "value": {"stringValue": "ja-en"}},
{"key": "confidence_level", "value": {"stringValue": "reviewed"}},
{"key": "_source_uri", "value": {"stringValue": f"s3://{source_bucket}/{key}"}},
],
"contentType": "PLAIN_TEXT" if ext in ("txt", "md") else "PDF",
})
# Batch ingest (Q Business supports batch operations)
batch_size = 10
ingested = 0
for i in range(0, len(documents), batch_size):
batch = documents[i:i + batch_size]
try:
qbusiness.batch_put_document(
applicationId=APPLICATION_ID,
indexId=INDEX_ID,
documents=batch,
)
ingested += len(batch)
except Exception:
logger.exception("Failed to ingest batch starting at index %d", i)
result = {
"source": f"s3://{source_bucket}/{prefix}",
"documents_found": len(documents),
"documents_ingested": ingested,
"category": category,
"department": department,
"timestamp": datetime.now(timezone.utc).isoformat(),
}
logger.info("Knowledge sync complete: %s", json.dumps(result))
return result
Data Automation Workflows for Catalog Enrichment
Catalog Enrichment Pipeline Architecture
flowchart TB
subgraph Input["Data Input Sources"]
PUB_CSV[Publisher Catalog CSV]
COVER_IMG[Manga Cover Images]
WIKI_DATA[Wikidata / MAL API]
SALES_DATA[Sales History CSV]
end
subgraph Trigger["Pipeline Triggers"]
S3_EVENT[S3 Object Created]
EB_SCHED[EventBridge Schedule<br/>Daily 2 AM JST]
MANUAL[Manual API Trigger]
end
subgraph Enrichment["Enrichment Pipeline"]
PARSE[Parse Raw Data<br/>Lambda: CSV/JSON parser]
DEDUP[Deduplication<br/>Lambda: ISBN + title match]
FM_ENRICH[FM Enrichment<br/>Bedrock Haiku:<br/>genres, synopsis, keywords]
COVER_ANALYZE[Cover Analysis<br/>Bedrock Sonnet (vision):<br/>art style, content rating]
CROSS_REF[Cross-Reference<br/>Lambda: MAL/Wikidata lookup]
QUALITY[Quality Check<br/>Lambda: completeness scoring]
end
subgraph Storage["Enriched Output"]
DDB_PROD[(DynamoDB: Products<br/>Enriched catalog)]
OS_VEC[OpenSearch: Vectors<br/>Semantic search embeddings]
S3_ARCH[S3: Processing Archive]
REDIS[ElastiCache Redis<br/>Hot product cache]
end
PUB_CSV --> S3_EVENT
COVER_IMG --> S3_EVENT
WIKI_DATA --> EB_SCHED
SALES_DATA --> S3_EVENT
S3_EVENT & EB_SCHED & MANUAL --> PARSE
PARSE --> DEDUP --> FM_ENRICH
FM_ENRICH --> COVER_ANALYZE --> CROSS_REF --> QUALITY
QUALITY --> DDB_PROD & OS_VEC & S3_ARCH & REDIS
CatalogEnrichmentPipeline
"""
CatalogEnrichmentPipeline — Bedrock Data Automation integration for
enriching MangaAssist's product catalog with AI-generated metadata.
Processes publisher catalogs, cover images, and external data sources
to create rich, searchable product records with genre tags, synopses,
search keywords, and content ratings.
"""
import json
import os
import csv
import io
import re
import hashlib
import logging
from datetime import datetime, timezone
from decimal import Decimal
from typing import Any, Optional
from base64 import b64encode
import boto3
from botocore.config import Config
logger = logging.getLogger()
logger.setLevel(logging.INFO)
bedrock_runtime = boto3.client(
"bedrock-runtime",
config=Config(retries={"max_attempts": 3, "mode": "adaptive"}, read_timeout=30),
)
s3 = boto3.client("s3")
dynamodb = boto3.resource("dynamodb")
MODEL_HAIKU = "anthropic.claude-3-haiku-20240307-v1:0"
MODEL_SONNET = "anthropic.claude-3-sonnet-20240229-v1:0"
PRODUCTS_TABLE = os.environ.get("PRODUCTS_TABLE", "manga_products")
CATALOG_BUCKET = os.environ.get("CATALOG_BUCKET", "manga-catalog-data")
COVER_BUCKET = os.environ.get("COVER_BUCKET", "manga-cover-images")
products_table = dynamodb.Table(PRODUCTS_TABLE)
# ============================================================
# Text-Based Catalog Enrichment (Haiku)
# ============================================================
ENRICHMENT_PROMPT = """You are a manga catalog specialist for a Japanese e-commerce store.
Given raw catalog data for a manga title, generate rich metadata for search and recommendation.
Raw catalog entry:
<entry>{entry}</entry>
Generate JSON with:
{{
"title_en": "English title (romanized if no official EN title)",
"title_jp": "Japanese title in original script",
"title_romaji": "Romanized Japanese title",
"author_en": "Author name (romanized)",
"author_jp": "Author name in Japanese",
"artist_en": "Artist name if different from author",
"publisher_en": "Publisher name (romanized)",
"publisher_jp": "Publisher name in Japanese",
"genres": ["primary genre", "secondary genre"],
"sub_genres": ["more specific genre tags"],
"themes": ["thematic elements"],
"demographics": "shonen/shojo/seinen/josei/kodomo",
"age_rating": "all_ages/teen/older_teen/mature",
"synopsis_en": "2-3 sentence English synopsis. Be descriptive and engaging.",
"synopsis_jp": "2-3 sentence Japanese synopsis. Natural, engaging Japanese.",
"search_keywords_en": ["10-15 English search keywords"],
"search_keywords_jp": ["10-15 Japanese search keywords"],
"similar_titles": ["3-5 similar manga for recommendation"],
"content_warnings": ["any applicable warnings"],
"reading_direction": "right_to_left/left_to_right",
"serialization_status": "ongoing/completed/hiatus",
"estimated_reading_time_minutes": 30,
"mood_tags": ["action-packed", "heartwarming", etc.]
}}
IMPORTANT:
- Generate Japanese text in natural, fluent Japanese (not machine-translated)
- Include both common and niche search keywords
- Similar titles should be well-known manga with genuine similarity
- If unsure about a field, use null rather than guessing
Return ONLY valid JSON."""
def enrich_single_entry(raw_entry: dict) -> dict:
"""Enrich one catalog entry using Bedrock Haiku."""
prompt = ENRICHMENT_PROMPT.format(entry=json.dumps(raw_entry, ensure_ascii=False))
response = bedrock_runtime.invoke_model(
modelId=MODEL_HAIKU,
contentType="application/json",
accept="application/json",
body=json.dumps({
"anthropic_version": "bedrock-2023-05-31",
"max_tokens": 1024,
"temperature": 0.3,
"messages": [{"role": "user", "content": prompt}],
}),
)
result_text = json.loads(response["body"].read())["content"][0]["text"]
json_match = re.search(r"```(?:json)?\s*(.*?)\s*```", result_text, re.DOTALL)
return json.loads(json_match.group(1) if json_match else result_text)
# ============================================================
# Cover Image Analysis (Sonnet with Vision)
# ============================================================
COVER_ANALYSIS_PROMPT = """Analyze this manga cover image and extract visual metadata.
Return JSON with:
{{
"art_style": "realistic/semi-realistic/chibi/minimalist/detailed/sketchy",
"color_palette": "vibrant/dark/pastel/monochrome/warm/cool",
"dominant_colors": ["color1", "color2", "color3"],
"character_count": 0,
"character_genders": ["male", "female", ...],
"character_ages": ["teen", "adult", ...],
"setting_type": "urban/rural/fantasy/school/space/historical/none",
"mood": "action/romance/mystery/horror/comedy/dramatic/peaceful",
"visual_content_rating": "all_ages/teen/older_teen/mature",
"content_flags": ["violence", "fan_service", etc. or empty array],
"text_on_cover": {{
"title_text": "detected title text",
"author_text": "detected author text",
"volume_number": "detected volume number or null"
}},
"visual_quality_score": 0.85,
"genre_from_visuals": ["inferred genres based on art"],
"comparable_art_style": "similar to [known manga artist/series]"
}}
Analyze the cover thoroughly. Focus on elements that help categorize the manga for search and recommendation."""
def analyze_cover_image(bucket: str, key: str) -> dict:
"""
Analyze a manga cover image using Bedrock Sonnet's vision capabilities.
Returns visual metadata for the product catalog.
"""
# Download image from S3
obj = s3.get_object(Bucket=bucket, Key=key)
image_bytes = obj["Body"].read()
image_b64 = b64encode(image_bytes).decode("utf-8")
# Determine media type
ext = key.rsplit(".", 1)[-1].lower()
media_type_map = {"jpg": "image/jpeg", "jpeg": "image/jpeg", "png": "image/png", "webp": "image/webp"}
media_type = media_type_map.get(ext, "image/jpeg")
response = bedrock_runtime.invoke_model(
modelId=MODEL_SONNET,
contentType="application/json",
accept="application/json",
body=json.dumps({
"anthropic_version": "bedrock-2023-05-31",
"max_tokens": 1024,
"temperature": 0.2,
"messages": [{
"role": "user",
"content": [
{
"type": "image",
"source": {
"type": "base64",
"media_type": media_type,
"data": image_b64,
},
},
{"type": "text", "text": COVER_ANALYSIS_PROMPT},
],
}],
}),
)
result_text = json.loads(response["body"].read())["content"][0]["text"]
json_match = re.search(r"```(?:json)?\s*(.*?)\s*```", result_text, re.DOTALL)
return json.loads(json_match.group(1) if json_match else result_text)
# ============================================================
# Deduplication
# ============================================================
def deduplicate_entry(
isbn: str,
title_en: str,
title_jp: str,
) -> Optional[dict]:
"""
Check if a product already exists in the catalog.
Returns existing record if found, None if new.
Uses ISBN as primary key, with title fuzzy match as fallback.
"""
if isbn:
try:
response = products_table.get_item(Key={"product_id": isbn})
if "Item" in response:
return dict(response["Item"])
except Exception:
pass
# Fallback: scan for title match (expensive — only for ISBN-less entries)
if title_en or title_jp:
try:
filter_parts = []
expr_values = {}
if title_en:
filter_parts.append("contains(title_en, :ten)")
expr_values[":ten"] = title_en
if title_jp:
filter_parts.append("contains(title_jp, :tjp)")
expr_values[":tjp"] = title_jp
response = products_table.scan(
FilterExpression=" OR ".join(filter_parts),
ExpressionAttributeValues=expr_values,
Limit=5,
)
items = response.get("Items", [])
if items:
return dict(items[0])
except Exception:
pass
return None
# ============================================================
# Quality Scoring
# ============================================================
def compute_quality_score(enriched: dict, cover_analysis: Optional[dict] = None) -> dict:
"""
Score the completeness and quality of an enriched product record.
Returns a quality score (0.0-1.0) and a list of missing/weak fields.
"""
checks = {
"title_en": bool(enriched.get("title_en")),
"title_jp": bool(enriched.get("title_jp")),
"author": bool(enriched.get("author_en")),
"genres": bool(enriched.get("genres")) and len(enriched.get("genres", [])) >= 1,
"synopsis_en": bool(enriched.get("synopsis_en")) and len(enriched.get("synopsis_en", "")) > 50,
"synopsis_jp": bool(enriched.get("synopsis_jp")) and len(enriched.get("synopsis_jp", "")) > 20,
"search_keywords": bool(enriched.get("search_keywords_en")) and len(enriched.get("search_keywords_en", [])) >= 5,
"age_rating": bool(enriched.get("age_rating")),
"demographics": bool(enriched.get("demographics")),
"similar_titles": bool(enriched.get("similar_titles")) and len(enriched.get("similar_titles", [])) >= 2,
"cover_analyzed": cover_analysis is not None,
}
# Weighted scoring
weights = {
"title_en": 0.15, "title_jp": 0.15, "author": 0.10,
"genres": 0.10, "synopsis_en": 0.10, "synopsis_jp": 0.05,
"search_keywords": 0.10, "age_rating": 0.05, "demographics": 0.05,
"similar_titles": 0.05, "cover_analyzed": 0.10,
}
score = sum(weights[k] for k, v in checks.items() if v)
missing = [k for k, v in checks.items() if not v]
return {
"quality_score": round(score, 3),
"checks_passed": sum(1 for v in checks.values() if v),
"checks_total": len(checks),
"missing_fields": missing,
"grade": "A" if score >= 0.9 else "B" if score >= 0.7 else "C" if score >= 0.5 else "D",
}
# ============================================================
# Full Pipeline Handler
# ============================================================
def process_catalog_batch(event: dict, context: Any) -> dict:
"""
Process a batch of catalog entries from a CSV upload.
Enriches each entry, analyzes covers (if available), deduplicates,
quality-scores, and stores in DynamoDB.
"""
bucket = event.get("bucket", CATALOG_BUCKET)
key = event.get("key", "")
if not key:
# Scheduled run: process any unprocessed files
return {"status": "no_key_provided", "message": "Provide a specific CSV key to process"}
# Download and parse CSV
obj = s3.get_object(Bucket=bucket, Key=key)
csv_content = obj["Body"].read().decode("utf-8-sig")
reader = csv.DictReader(io.StringIO(csv_content))
stats = {
"total": 0,
"enriched": 0,
"deduplicated": 0,
"cover_analyzed": 0,
"errors": 0,
"quality_scores": [],
}
for row in reader:
stats["total"] += 1
isbn = row.get("isbn", row.get("ISBN", ""))
title = row.get("title", row.get("Title", ""))
try:
# Step 1: Deduplication
existing = deduplicate_entry(isbn, title, row.get("title_jp", ""))
if existing:
stats["deduplicated"] += 1
# Merge: keep existing data, update with new raw data
logger.info("Duplicate found for ISBN %s — merging", isbn)
# Step 2: FM Enrichment
enriched = enrich_single_entry(row)
stats["enriched"] += 1
# Step 3: Cover analysis (if cover image exists)
cover_analysis = None
cover_key = f"covers/{isbn}.jpg" if isbn else None
if cover_key:
try:
s3.head_object(Bucket=COVER_BUCKET, Key=cover_key)
cover_analysis = analyze_cover_image(COVER_BUCKET, cover_key)
stats["cover_analyzed"] += 1
except s3.exceptions.ClientError:
# No cover image available
pass
# Step 4: Quality scoring
quality = compute_quality_score(enriched, cover_analysis)
stats["quality_scores"].append(quality["quality_score"])
# Step 5: Build final product record
product_id = isbn or hashlib.sha256(title.encode()).hexdigest()[:16]
product_record = {
"product_id": product_id,
"isbn": isbn,
"raw_catalog_data": row,
**enriched,
"cover_analysis": cover_analysis,
"quality": quality,
"source_file": key,
"enriched_at": datetime.now(timezone.utc).isoformat(),
"enrichment_models": {
"text": MODEL_HAIKU,
"vision": MODEL_SONNET if cover_analysis else None,
},
}
# Remove None values and convert for DynamoDB
product_record = {k: v for k, v in product_record.items() if v is not None}
safe_record = json.loads(json.dumps(product_record, default=str), parse_float=Decimal)
products_table.put_item(Item=safe_record)
except Exception:
logger.exception("Error processing entry: %s", title or isbn or f"row-{stats['total']}")
stats["errors"] += 1
# Compute average quality
if stats["quality_scores"]:
stats["avg_quality_score"] = round(
sum(stats["quality_scores"]) / len(stats["quality_scores"]), 3
)
else:
stats["avg_quality_score"] = 0.0
del stats["quality_scores"] # Don't return the full list
stats["timestamp"] = datetime.now(timezone.utc).isoformat()
stats["source_file"] = key
# Archive results
result_key = f"enrichment-results/{key.split('/')[-1]}-{datetime.now(timezone.utc).strftime('%Y%m%d-%H%M%S')}.json"
s3.put_object(
Bucket=bucket,
Key=result_key,
Body=json.dumps(stats, indent=2),
ContentType="application/json",
)
logger.info("Catalog enrichment pipeline complete: %s", json.dumps(stats))
return stats
Comparison: OCR Approaches for Japanese Documents
| Approach | Accuracy (JP) | Speed | Cost | Best For |
|---|---|---|---|---|
| Textract DetectDocumentText | 85-92% | Fast (sync) | $1.50/1K pages | Simple text extraction |
| Textract AnalyzeDocument + Layout | 88-95% | Medium (async) | $6.50/1K pages | Complex layouts with tables |
| Textract Queries | 90-97% on target fields | Medium | $15/1K pages | Known field extraction |
| Textract + Sonnet post-processing | 95-99% on key fields | Slow | $1.50 + FM cost | High-accuracy legal documents |
| Sonnet Vision (direct) | 80-90% | Fast | ~$0.02/page | Quick analysis of simple pages |
MangaAssist's pipeline uses Textract + Sonnet post-processing for license agreements (high-stakes, accuracy-critical) and Textract DetectDocumentText for routine documents (invoices, catalogs).
Q Business vs RAG Comparison for Internal Knowledge
| Dimension | Amazon Q Business | Custom RAG (OpenSearch + Bedrock) |
|---|---|---|
| Setup time | Hours (managed service) | Days-weeks (build pipeline) |
| Connector ecosystem | S3, Confluence, SharePoint, Jira, Salesforce | Custom build for each source |
| Access control | IAM Identity Center integration | Custom ACL implementation |
| Index management | Fully managed | Manual shard/replica tuning |
| Relevance tuning | Limited but sufficient | Full control over scoring |
| Cost at 50 users | ~$150/month | ~$500-1000/month (OpenSearch + compute) |
| Customization | Limited guardrails + plugins | Fully customizable |
| Latency | 1-3 seconds | 500ms-2 seconds |
| Best for | Internal knowledge bases, quick deployment | Customer-facing RAG, complex retrieval logic |
MangaAssist uses Q Business for internal teams (fast to deploy, IAM-integrated, no engineering maintenance) and custom RAG for the customer chatbot (sub-second latency, custom ranking, personalization).
Key Takeaways
- Textract Queries are the highest-accuracy approach for extracting specific fields from Japanese legal documents
- Sonnet for legal extraction -- the 12x cost premium over Haiku is justified for high-stakes license documents where a misread royalty rate has direct financial impact
- Q Business connectors -- S3 native connector handles 80% of use cases; custom Lambda connectors bridge DynamoDB and other sources
- Cover image analysis via Sonnet Vision adds genre and art-style metadata that pure text enrichment misses
- Quality scoring ensures every product record meets a minimum completeness bar before going live
- Cross-language validation catches discrepancies between JP and EN sections of bilingual contracts