LOCAL PREVIEW View on GitHub

Skill 2.3.1 -- Enterprise Connectivity Architecture for FM Integration

MangaAssist context: JP Manga store chatbot on AWS — Bedrock Claude 3 (Sonnet at $3/$15 per 1M tokens input/output, Haiku at $0.25/$1.25), OpenSearch Serverless (vector store), DynamoDB (sessions/products), ECS Fargate (orchestrator), API Gateway WebSocket, ElastiCache Redis. Target: useful answer in under 3 seconds, 1M messages/day scale.


Mind Map -- Enterprise Connectivity Solutions

Enterprise Connectivity Solutions (Skill 2.3.1)
│
├── API-Based Legacy Integration
│   ├── REST Adapters
│   │   ├── Resource mapping to legacy operations
│   │   ├── HTTP verb translation (GET/POST/PUT/DELETE)
│   │   ├── Pagination wrapping for bulk legacy queries
│   │   └── Content negotiation (JSON <-> XML/CSV/fixed-width)
│   ├── SOAP Wrappers
│   │   ├── WSDL-to-REST auto-generation
│   │   ├── Envelope construction/deconstruction
│   │   ├── WS-Security token passthrough
│   │   └── Schema validation at boundary
│   └── Message Format Translation
│       ├── Canonical data model (CDM)
│       ├── Field mapping rules engine
│       ├── Character encoding normalization (Shift-JIS <-> UTF-8)
│       └── Date/currency format harmonization
│
├── Event-Driven Architecture
│   ├── Amazon EventBridge
│   │   ├── Custom event bus per domain
│   │   ├── Schema registry for event contracts
│   │   ├── Content-based filtering rules
│   │   └── Archive & replay for debugging
│   ├── SNS/SQS Patterns
│   │   ├── Fan-out for multi-consumer events
│   │   ├── Dead-letter queues for poison messages
│   │   ├── FIFO ordering for transactional events
│   │   └── Message deduplication
│   └── Loose Coupling Patterns
│       ├── Publish-subscribe for inventory updates
│       ├── Command-query separation (CQRS)
│       ├── Saga orchestration for multi-step workflows
│       └── Circuit breaker at integration points
│
├── Data Synchronization
│   ├── CDC with DMS (Change Data Capture)
│   │   ├── Source: legacy Oracle/MySQL databases
│   │   ├── Target: DynamoDB / OpenSearch
│   │   ├── Continuous replication with minimal lag
│   │   └── Schema evolution handling
│   ├── Batch ETL
│   │   ├── Nightly full sync for catalog data
│   │   ├── AWS Glue jobs for transformation
│   │   ├── S3 staging for checkpointed recovery
│   │   └── Data quality validation gates
│   └── Real-Time Sync
│       ├── DynamoDB Streams -> Lambda -> downstream
│       ├── Kinesis Data Streams for high-throughput
│       ├── Conflict resolution (last-writer-wins vs merge)
│       └── Eventual consistency SLA targets
│
├── Enterprise Patterns
│   ├── Anti-Corruption Layer (ACL)
│   │   ├── Isolates FM domain from legacy models
│   │   ├── Translates legacy schemas to clean domain objects
│   │   ├── Prevents legacy coupling from leaking upstream
│   │   └── Versioned interface contracts
│   ├── Adapter Pattern
│   │   ├── Wraps legacy client libraries
│   │   ├── Normalizes error codes and exceptions
│   │   ├── Provides consistent retry semantics
│   │   └── Implements circuit breaker per adapter
│   └── Facade Pattern
│       ├── Unified API surface for multiple backends
│       ├── Aggregates data from 3+ legacy sources
│       ├── Caches composite responses in Redis
│       └── Simplifies downstream FM prompt construction
│
└── Integration Middleware
    ├── API Gateway
    │   ├── Rate limiting per legacy backend capacity
    │   ├── Request/response transformation templates
    │   ├── API key management for partner systems
    │   └── Usage plans aligned to legacy SLAs
    └── AWS AppSync
        ├── GraphQL aggregation layer
        ├── Real-time subscriptions for live inventory
        ├── Pipeline resolvers chaining multiple sources
        └── Conflict detection for offline-first clients

Architecture Diagram -- MangaAssist Enterprise Integration

┌─────────────────────────────────────────────────────────────────────────────────────┐
│                           MangaAssist Enterprise Integration                        │
│                                                                                     │
│  ┌──────────────┐    WebSocket     ┌──────────────────┐                             │
│  │   Customer    │◄───────────────►│   API Gateway     │                             │
│  │  (Browser /   │                 │   (WebSocket)     │                             │
│  │   Mobile)     │                 └────────┬─────────┘                             │
│  └──────────────┘                           │                                       │
│                                             ▼                                       │
│                                  ┌──────────────────┐     ┌───────────────────┐     │
│                                  │   ECS Fargate     │────►│  ElastiCache      │     │
│                                  │   (Orchestrator)  │◄────│  Redis            │     │
│                                  │                   │     │  (Session +       │     │
│                                  │  - Route intent   │     │   Response Cache)  │     │
│                                  │  - Compose prompt │     └───────────────────┘     │
│                                  │  - Call Bedrock   │                               │
│                                  │  - Merge results  │                               │
│                                  └──┬───┬───┬───┬───┘                               │
│                                     │   │   │   │                                    │
│               ┌─────────────────────┘   │   │   └─────────────────────┐              │
│               ▼                         ▼   ▼                         ▼              │
│  ┌────────────────────┐  ┌─────────────────────────┐  ┌────────────────────────┐    │
│  │  Anti-Corruption   │  │   Amazon Bedrock         │  │   OpenSearch           │    │
│  │  Layer (ACL)       │  │   Claude 3 Sonnet/Haiku  │  │   Serverless           │    │
│  │                    │  │                           │  │   (Vector Store)       │    │
│  │  - Schema xlate    │  │   - Answer generation     │  │                        │    │
│  │  - Error normalize │  │   - Intent classification │  │   - Manga embeddings   │    │
│  │  - Encoding fix    │  │   - Summarization         │  │   - Semantic search    │    │
│  └──┬────┬────┬──────┘  └───────────────────────────┘  └────────────────────────┘    │
│     │    │    │                                                                      │
│     │    │    │         ┌────────────────────────────────────────┐                    │
│     │    │    │         │         EventBridge (Event Bus)        │                    │
│     │    │    │         │                                        │                    │
│     │    │    │         │  Rules:                                │                    │
│     │    │    │         │  - inventory.updated -> sync pipeline  │                    │
│     │    │    │         │  - order.placed -> order mgmt          │                    │
│     │    │    │         │  - payment.completed -> ERP update     │                    │
│     │    │    │         └────────┬──────────┬───────────┬───────┘                    │
│     │    │    │                  │          │           │                             │
│     ▼    ▼    ▼                  ▼          ▼           ▼                             │
│  ┌──────────────┐  ┌───────────────┐ ┌──────────┐ ┌──────────────┐                  │
│  │ Legacy ERP   │  │ Order Mgmt    │ │ Payment  │ │  DMS (CDC)   │                  │
│  │ (Inventory)  │  │ System        │ │ Gateway  │ │              │                  │
│  │              │  │               │ │          │ │ Oracle ──►   │                  │
│  │ SOAP/XML API │  │ REST API v2   │ │ REST API │ │ DynamoDB     │                  │
│  │ Shift-JIS    │  │ (paginated)   │ │ (PCI)    │ │              │                  │
│  │ 50 req/s max │  │ 200 req/s     │ │ mTLS     │ │ Continuous   │                  │
│  └──────────────┘  └───────────────┘ └──────────┘ │ replication  │                  │
│                                                    └──────────────┘                  │
│                                                                                      │
│  ┌──────────────────────────────────────────────────────────────────────────────┐    │
│  │                        Data Synchronization Layer                            │    │
│  │                                                                              │    │
│  │  ┌────────────┐    ┌──────────────┐    ┌──────────────┐    ┌─────────────┐  │    │
│  │  │ DMS CDC    │───►│  S3 Staging  │───►│  Glue ETL    │───►│ DynamoDB /  │  │    │
│  │  │ (Real-time)│    │  (Checkpoint)│    │  (Transform) │    │ OpenSearch  │  │    │
│  │  └────────────┘    └──────────────┘    └──────────────┘    └─────────────┘  │    │
│  │                                                                              │    │
│  │  Nightly batch: Full catalog sync (Glue) ──► S3 ──► OpenSearch re-index     │    │
│  └──────────────────────────────────────────────────────────────────────────────┘    │
└─────────────────────────────────────────────────────────────────────────────────────┘

Data Flow for "Is Volume 42 of One Piece in stock?"

1. Customer sends WebSocket message
2. API Gateway -> ECS Fargate orchestrator
3. Orchestrator checks Redis cache (hit? return in <200ms)
4. Cache miss -> Bedrock Haiku classifies intent: "inventory_check"
5. Orchestrator routes to ACL -> Legacy ERP adapter
6. ACL translates: REST GET /inventory?isbn=978-4088... -> SOAP envelope (Shift-JIS)
7. Legacy ERP responds with XML stock data (50 req/s limit, ~800ms p99)
8. ACL translates response: XML -> JSON, Shift-JIS -> UTF-8
9. Orchestrator enriches with OpenSearch (related volumes, reviews)
10. Bedrock Sonnet generates natural language answer with stock info
11. Response cached in Redis (TTL=300s for inventory data)
12. Total latency budget: ~2.4s (within 3s target)

Production Code -- Enterprise Connectivity Components

1. API Adapter for Legacy Inventory System

"""
enterprise_connectivity/adapters/legacy_inventory_adapter.py

Adapter that wraps the legacy ERP's SOAP/XML inventory API behind a clean
async interface. Handles Shift-JIS encoding, SOAP envelope construction,
circuit breaking, and response normalization.

MangaAssist production component -- sits inside the Anti-Corruption Layer.
"""

import asyncio
import hashlib
import logging
import time
from dataclasses import dataclass, field
from enum import Enum
from typing import Optional
from xml.etree import ElementTree as ET

import aiohttp
from aiohttp import ClientTimeout

logger = logging.getLogger(__name__)


# ---------------------------------------------------------------------------
# Domain value objects
# ---------------------------------------------------------------------------

class StockStatus(Enum):
    IN_STOCK = "in_stock"
    LOW_STOCK = "low_stock"
    OUT_OF_STOCK = "out_of_stock"
    PREORDER = "preorder"
    UNKNOWN = "unknown"


@dataclass(frozen=True)
class InventoryItem:
    """Clean domain object returned by the adapter -- no legacy coupling."""
    isbn: str
    title: str
    quantity_available: int
    warehouse_location: str
    stock_status: StockStatus
    last_restock_date: Optional[str] = None
    price_jpy: Optional[int] = None
    estimated_restock_days: Optional[int] = None


@dataclass
class CircuitBreakerState:
    """Tracks failures to implement circuit-breaker pattern."""
    failure_count: int = 0
    last_failure_time: float = 0.0
    is_open: bool = False
    half_open_deadline: float = 0.0

    # Thresholds
    failure_threshold: int = 5
    recovery_timeout_seconds: float = 30.0

    def record_failure(self) -> None:
        self.failure_count += 1
        self.last_failure_time = time.monotonic()
        if self.failure_count >= self.failure_threshold:
            self.is_open = True
            self.half_open_deadline = (
                time.monotonic() + self.recovery_timeout_seconds
            )
            logger.warning(
                "Circuit breaker OPEN after %d failures. "
                "Will half-open at +%.0fs.",
                self.failure_count,
                self.recovery_timeout_seconds,
            )

    def record_success(self) -> None:
        if self.is_open:
            logger.info("Circuit breaker CLOSED after successful probe.")
        self.failure_count = 0
        self.is_open = False

    def allow_request(self) -> bool:
        if not self.is_open:
            return True
        if time.monotonic() >= self.half_open_deadline:
            logger.info("Circuit breaker HALF-OPEN -- allowing probe request.")
            return True  # half-open: allow one probe
        return False


# ---------------------------------------------------------------------------
# SOAP envelope helpers
# ---------------------------------------------------------------------------

SOAP_ENVELOPE_TEMPLATE = """<?xml version="1.0" encoding="Shift_JIS"?>
<soapenv:Envelope
    xmlns:soapenv="http://schemas.xmlsoap.org/soap/envelope/"
    xmlns:inv="http://legacy-erp.manga-store.co.jp/inventory/v1">
  <soapenv:Header>
    <inv:AuthToken>{auth_token}</inv:AuthToken>
  </soapenv:Header>
  <soapenv:Body>
    <inv:GetStockLevel>
      <inv:ISBN>{isbn}</inv:ISBN>
      <inv:WarehouseCode>{warehouse_code}</inv:WarehouseCode>
    </inv:GetStockLevel>
  </soapenv:Body>
</soapenv:Envelope>"""

SOAP_BATCH_ENVELOPE_TEMPLATE = """<?xml version="1.0" encoding="Shift_JIS"?>
<soapenv:Envelope
    xmlns:soapenv="http://schemas.xmlsoap.org/soap/envelope/"
    xmlns:inv="http://legacy-erp.manga-store.co.jp/inventory/v1">
  <soapenv:Header>
    <inv:AuthToken>{auth_token}</inv:AuthToken>
  </soapenv:Header>
  <soapenv:Body>
    <inv:GetBatchStockLevels>
      {isbn_elements}
    </inv:GetBatchStockLevels>
  </soapenv:Body>
</soapenv:Envelope>"""


def _build_soap_request(isbn: str, warehouse: str, token: str) -> bytes:
    """Build SOAP envelope and encode to Shift-JIS bytes."""
    xml_str = SOAP_ENVELOPE_TEMPLATE.format(
        auth_token=token,
        isbn=isbn,
        warehouse_code=warehouse,
    )
    return xml_str.encode("shift_jis")


def _build_batch_soap_request(
    isbns: list[str], warehouse: str, token: str
) -> bytes:
    """Build batch SOAP envelope for up to 20 ISBNs per call."""
    isbn_elements = "\n      ".join(
        f"<inv:ISBN>{isbn}</inv:ISBN>" for isbn in isbns
    )
    xml_str = SOAP_BATCH_ENVELOPE_TEMPLATE.format(
        auth_token=token,
        isbn_elements=isbn_elements,
    )
    return xml_str.encode("shift_jis")


def _parse_stock_response(raw_xml: bytes) -> list[dict]:
    """
    Parse legacy ERP XML response (Shift-JIS encoded) into Python dicts.
    Handles both single-item and batch responses.
    """
    text = raw_xml.decode("shift_jis", errors="replace")
    root = ET.fromstring(text)

    ns = {
        "soapenv": "http://schemas.xmlsoap.org/soap/envelope/",
        "inv": "http://legacy-erp.manga-store.co.jp/inventory/v1",
    }

    items = root.findall(".//inv:StockItem", ns)
    results = []
    for item in items:
        results.append({
            "isbn": _text(item, "inv:ISBN", ns),
            "title": _text(item, "inv:Title", ns),
            "quantity": int(_text(item, "inv:Quantity", ns) or "0"),
            "warehouse": _text(item, "inv:WarehouseCode", ns),
            "status_code": _text(item, "inv:StatusCode", ns),
            "last_restock": _text(item, "inv:LastRestockDate", ns),
            "price": _text(item, "inv:PriceJPY", ns),
            "restock_eta_days": _text(item, "inv:RestockETADays", ns),
        })
    return results


def _text(element: ET.Element, path: str, ns: dict) -> Optional[str]:
    """Safely extract text from an XML element."""
    node = element.find(path, ns)
    return node.text.strip() if node is not None and node.text else None


def _map_status_code(code: Optional[str]) -> StockStatus:
    """Map legacy numeric status codes to clean domain enum."""
    mapping = {
        "01": StockStatus.IN_STOCK,
        "02": StockStatus.LOW_STOCK,
        "03": StockStatus.OUT_OF_STOCK,
        "04": StockStatus.PREORDER,
    }
    return mapping.get(code or "", StockStatus.UNKNOWN)


# ---------------------------------------------------------------------------
# Main adapter class
# ---------------------------------------------------------------------------

class LegacyInventoryAdapter:
    """
    Production adapter for the legacy manga inventory ERP.

    Features:
    - SOAP/XML <-> Python dict translation
    - Shift-JIS <-> UTF-8 encoding
    - Circuit breaker (opens after 5 consecutive failures, 30s recovery)
    - Concurrency limiter (respects legacy 50 req/s ceiling)
    - Retry with exponential back-off (max 3 attempts)
    - Request deduplication within a short window
    - Batch query support (up to 20 ISBNs per SOAP call)

    Usage:
        adapter = LegacyInventoryAdapter(
            endpoint="https://erp.manga-store.co.jp/soap/inventory",
            auth_token="<token>",
        )
        item = await adapter.get_stock("978-4-08-882751-6")
    """

    MAX_RETRIES = 3
    BASE_BACKOFF_SECONDS = 0.5
    REQUEST_TIMEOUT_SECONDS = 5.0
    MAX_CONCURRENT_REQUESTS = 50  # legacy system ceiling
    BATCH_SIZE = 20
    DEDUP_WINDOW_SECONDS = 2.0

    def __init__(
        self,
        endpoint: str,
        auth_token: str,
        warehouse_code: str = "TK01",
    ) -> None:
        self._endpoint = endpoint
        self._auth_token = auth_token
        self._warehouse = warehouse_code

        self._circuit = CircuitBreakerState()
        self._semaphore = asyncio.Semaphore(self.MAX_CONCURRENT_REQUESTS)
        self._session: Optional[aiohttp.ClientSession] = None

        # In-flight dedup: isbn -> (timestamp, Future)
        self._inflight: dict[str, tuple[float, asyncio.Future]] = {}

    async def _get_session(self) -> aiohttp.ClientSession:
        if self._session is None or self._session.closed:
            self._session = aiohttp.ClientSession(
                timeout=ClientTimeout(total=self.REQUEST_TIMEOUT_SECONDS),
                headers={"Content-Type": "text/xml; charset=Shift_JIS"},
            )
        return self._session

    # -- Public API ----------------------------------------------------------

    async def get_stock(self, isbn: str) -> InventoryItem:
        """
        Query stock level for a single ISBN.
        Returns a clean InventoryItem domain object.
        """
        if not self._circuit.allow_request():
            raise LegacySystemUnavailableError(
                "Circuit breaker is OPEN for legacy ERP."
            )

        # Dedup: if same ISBN is already in flight, wait for that result
        now = time.monotonic()
        if isbn in self._inflight:
            ts, fut = self._inflight[isbn]
            if now - ts < self.DEDUP_WINDOW_SECONDS:
                logger.debug("Dedup hit for ISBN %s, reusing in-flight.", isbn)
                return await fut

        future: asyncio.Future = asyncio.get_event_loop().create_future()
        self._inflight[isbn] = (now, future)

        try:
            result = await self._execute_with_retry(isbn)
            future.set_result(result)
            return result
        except Exception as exc:
            future.set_exception(exc)
            raise
        finally:
            self._inflight.pop(isbn, None)

    async def get_stock_batch(self, isbns: list[str]) -> list[InventoryItem]:
        """
        Batch query for multiple ISBNs (up to BATCH_SIZE per call).
        Splits into chunks automatically.
        """
        if not self._circuit.allow_request():
            raise LegacySystemUnavailableError(
                "Circuit breaker is OPEN for legacy ERP."
            )

        results: list[InventoryItem] = []
        chunks = [
            isbns[i : i + self.BATCH_SIZE]
            for i in range(0, len(isbns), self.BATCH_SIZE)
        ]
        for chunk in chunks:
            body = _build_batch_soap_request(
                chunk, self._warehouse, self._auth_token
            )
            raw = await self._send_soap_request(body)
            parsed = _parse_stock_response(raw)
            results.extend(self._to_domain_objects(parsed))

        return results

    async def health_check(self) -> dict:
        """Lightweight probe -- calls the ERP healthcheck endpoint."""
        session = await self._get_session()
        try:
            async with session.get(
                self._endpoint.replace("/soap/inventory", "/health"),
                timeout=ClientTimeout(total=3.0),
            ) as resp:
                ok = resp.status == 200
                self._circuit.record_success() if ok else self._circuit.record_failure()
                return {
                    "healthy": ok,
                    "status_code": resp.status,
                    "circuit_open": self._circuit.is_open,
                }
        except Exception as exc:
            self._circuit.record_failure()
            return {
                "healthy": False,
                "error": str(exc),
                "circuit_open": self._circuit.is_open,
            }

    async def close(self) -> None:
        if self._session and not self._session.closed:
            await self._session.close()

    # -- Private helpers -----------------------------------------------------

    async def _execute_with_retry(self, isbn: str) -> InventoryItem:
        body = _build_soap_request(isbn, self._warehouse, self._auth_token)
        last_exc: Optional[Exception] = None

        for attempt in range(1, self.MAX_RETRIES + 1):
            try:
                raw = await self._send_soap_request(body)
                parsed = _parse_stock_response(raw)
                if not parsed:
                    raise LegacyDataError(f"No stock data for ISBN {isbn}")

                self._circuit.record_success()
                return self._to_domain_objects(parsed)[0]

            except (aiohttp.ClientError, asyncio.TimeoutError) as exc:
                last_exc = exc
                self._circuit.record_failure()
                backoff = self.BASE_BACKOFF_SECONDS * (2 ** (attempt - 1))
                logger.warning(
                    "Legacy ERP attempt %d/%d failed: %s. Retrying in %.1fs.",
                    attempt, self.MAX_RETRIES, exc, backoff,
                )
                await asyncio.sleep(backoff)

        raise LegacySystemUnavailableError(
            f"Legacy ERP unreachable after {self.MAX_RETRIES} attempts."
        ) from last_exc

    async def _send_soap_request(self, body: bytes) -> bytes:
        session = await self._get_session()
        async with self._semaphore:
            async with session.post(self._endpoint, data=body) as resp:
                if resp.status != 200:
                    raise LegacySoapFaultError(
                        f"SOAP fault: HTTP {resp.status}"
                    )
                return await resp.read()

    @staticmethod
    def _to_domain_objects(parsed: list[dict]) -> list[InventoryItem]:
        items = []
        for row in parsed:
            items.append(InventoryItem(
                isbn=row["isbn"] or "",
                title=row["title"] or "",
                quantity_available=row["quantity"],
                warehouse_location=row["warehouse"] or "",
                stock_status=_map_status_code(row["status_code"]),
                last_restock_date=row.get("last_restock"),
                price_jpy=int(row["price"]) if row.get("price") else None,
                estimated_restock_days=(
                    int(row["restock_eta_days"])
                    if row.get("restock_eta_days") else None
                ),
            ))
        return items


# ---------------------------------------------------------------------------
# Custom exceptions
# ---------------------------------------------------------------------------

class LegacySystemUnavailableError(Exception):
    """Raised when the legacy ERP is unreachable or circuit is open."""

class LegacySoapFaultError(Exception):
    """Raised when the legacy ERP returns a SOAP fault."""

class LegacyDataError(Exception):
    """Raised when the response data is missing or unparseable."""

2. EventBridge Integration -- Inventory Event Pipeline

"""
enterprise_connectivity/events/inventory_event_pipeline.py

EventBridge integration that publishes and consumes inventory events.
Bridges the gap between legacy ERP batch inventory updates and the
real-time needs of the MangaAssist chatbot.

Events flow:
  Legacy ERP -> DMS CDC -> Lambda -> EventBridge -> (fan-out)
    -> SQS -> Lambda -> DynamoDB (product catalog)
    -> SQS -> Lambda -> OpenSearch (vector re-index)
    -> SQS -> Lambda -> Redis cache invalidation
"""

import json
import logging
import os
import time
from dataclasses import asdict, dataclass
from datetime import datetime, timezone
from typing import Any, Optional

import boto3
from botocore.config import Config

logger = logging.getLogger(__name__)

# ---------------------------------------------------------------------------
# Configuration
# ---------------------------------------------------------------------------

EVENT_BUS_NAME = os.environ.get(
    "INVENTORY_EVENT_BUS", "manga-assist-inventory"
)
EVENT_SOURCE = "manga-assist.inventory"
AWS_REGION = os.environ.get("AWS_REGION", "ap-northeast-1")

boto_config = Config(
    region_name=AWS_REGION,
    retries={"max_attempts": 3, "mode": "adaptive"},
)

eventbridge = boto3.client("events", config=boto_config)
sqs = boto3.client("sqs", config=boto_config)


# ---------------------------------------------------------------------------
# Event schema
# ---------------------------------------------------------------------------

@dataclass
class InventoryChangeEvent:
    """
    Canonical event schema for inventory changes.
    Published to EventBridge for fan-out to multiple consumers.
    """
    event_type: str          # "inventory.updated" | "inventory.restocked" | "inventory.depleted"
    isbn: str
    title: str
    previous_quantity: int
    new_quantity: int
    warehouse_code: str
    change_source: str       # "erp_cdc" | "manual_adjustment" | "order_fulfillment"
    timestamp_utc: str
    correlation_id: str
    metadata: dict = None

    def __post_init__(self):
        if self.metadata is None:
            self.metadata = {}

    def to_eventbridge_entry(self) -> dict:
        """Format for EventBridge PutEvents API."""
        return {
            "Source": EVENT_SOURCE,
            "DetailType": self.event_type,
            "Detail": json.dumps(asdict(self), default=str),
            "EventBusName": EVENT_BUS_NAME,
            "Time": datetime.fromisoformat(self.timestamp_utc),
        }

    @classmethod
    def from_cdc_record(cls, cdc_record: dict, correlation_id: str) -> "InventoryChangeEvent":
        """
        Factory: build from a DMS CDC change record.
        CDC records arrive as: {"data": {"ISBN": ..., "QTY": ..., ...}, "metadata": {...}}
        """
        data = cdc_record.get("data", {})
        meta = cdc_record.get("metadata", {})
        old_qty = int(data.get("OLD_QTY", 0))
        new_qty = int(data.get("QTY", 0))

        if new_qty == 0:
            event_type = "inventory.depleted"
        elif old_qty == 0 and new_qty > 0:
            event_type = "inventory.restocked"
        else:
            event_type = "inventory.updated"

        return cls(
            event_type=event_type,
            isbn=data.get("ISBN", ""),
            title=data.get("TITLE", ""),
            previous_quantity=old_qty,
            new_quantity=new_qty,
            warehouse_code=data.get("WAREHOUSE_CD", "TK01"),
            change_source="erp_cdc",
            timestamp_utc=datetime.now(timezone.utc).isoformat(),
            correlation_id=correlation_id,
            metadata={
                "cdc_operation": meta.get("operation", "unknown"),
                "cdc_timestamp": meta.get("timestamp", ""),
                "table_name": meta.get("table-name", ""),
            },
        )


# ---------------------------------------------------------------------------
# Publisher
# ---------------------------------------------------------------------------

class InventoryEventPublisher:
    """
    Publishes inventory change events to EventBridge.
    Supports batching (up to 10 entries per PutEvents call).
    """

    MAX_BATCH = 10  # EventBridge limit

    def __init__(self, event_bus: str = EVENT_BUS_NAME):
        self._event_bus = event_bus
        self._buffer: list[dict] = []

    def enqueue(self, event: InventoryChangeEvent) -> None:
        """Add event to buffer. Auto-flushes when buffer is full."""
        self._buffer.append(event.to_eventbridge_entry())
        if len(self._buffer) >= self.MAX_BATCH:
            self.flush()

    def flush(self) -> dict:
        """Send buffered events to EventBridge."""
        if not self._buffer:
            return {"sent": 0}

        batch = self._buffer[: self.MAX_BATCH]
        self._buffer = self._buffer[self.MAX_BATCH :]

        response = eventbridge.put_events(Entries=batch)
        failed = response.get("FailedEntryCount", 0)

        if failed > 0:
            logger.error(
                "EventBridge PutEvents: %d/%d entries failed.",
                failed, len(batch),
            )
            # Re-enqueue failed entries for retry
            for i, entry_resp in enumerate(response.get("Entries", [])):
                if entry_resp.get("ErrorCode"):
                    self._buffer.append(batch[i])
                    logger.error(
                        "  Failed entry %d: %s - %s",
                        i,
                        entry_resp.get("ErrorCode"),
                        entry_resp.get("ErrorMessage"),
                    )
        else:
            logger.info("Published %d events to EventBridge.", len(batch))

        return {"sent": len(batch) - failed, "failed": failed}

    def flush_all(self) -> dict:
        """Flush all remaining buffered events."""
        total_sent = 0
        total_failed = 0
        while self._buffer:
            result = self.flush()
            total_sent += result.get("sent", 0)
            total_failed += result.get("failed", 0)
        return {"total_sent": total_sent, "total_failed": total_failed}


# ---------------------------------------------------------------------------
# Consumer -- Lambda handler for downstream processing
# ---------------------------------------------------------------------------

def lambda_handler_inventory_sync(event: dict, context: Any) -> dict:
    """
    Lambda function triggered by SQS (fed by EventBridge rule).
    Updates DynamoDB product catalog with new stock levels.

    EventBridge Rule:
      Source: manga-assist.inventory
      DetailType: inventory.updated | inventory.restocked | inventory.depleted
      Target: SQS queue -> this Lambda
    """
    dynamodb = boto3.resource("dynamodb", config=boto_config)
    table = dynamodb.Table(os.environ.get("PRODUCTS_TABLE", "manga-products"))

    processed = 0
    errors = 0

    for record in event.get("Records", []):
        try:
            body = json.loads(record["body"])
            # EventBridge wraps detail in "detail" key
            detail = body if "isbn" in body else body.get("detail", {})

            isbn = detail["isbn"]
            new_qty = detail["new_quantity"]
            event_type = detail["event_type"]

            # Determine stock_status for DynamoDB
            if new_qty == 0:
                stock_status = "out_of_stock"
            elif new_qty <= 5:
                stock_status = "low_stock"
            else:
                stock_status = "in_stock"

            table.update_item(
                Key={"isbn": isbn},
                UpdateExpression=(
                    "SET quantity_available = :qty, "
                    "stock_status = :status, "
                    "last_sync_utc = :ts, "
                    "last_event_type = :etype"
                ),
                ExpressionAttributeValues={
                    ":qty": new_qty,
                    ":status": stock_status,
                    ":ts": datetime.now(timezone.utc).isoformat(),
                    ":etype": event_type,
                },
            )

            processed += 1
            logger.info(
                "Updated DynamoDB: ISBN=%s qty=%d status=%s",
                isbn, new_qty, stock_status,
            )

        except Exception as exc:
            errors += 1
            logger.error("Failed to process record: %s", exc)

    return {"processed": processed, "errors": errors}


def lambda_handler_cache_invalidation(event: dict, context: Any) -> dict:
    """
    Lambda function triggered by SQS (fed by EventBridge rule).
    Invalidates Redis cache entries for changed inventory items.

    This ensures the chatbot does not serve stale stock data after
    an inventory update event.
    """
    import redis

    redis_client = redis.Redis(
        host=os.environ.get("REDIS_HOST", "manga-cache.xxxxx.apne1.cache.amazonaws.com"),
        port=6379,
        ssl=True,
        decode_responses=True,
    )

    invalidated = 0

    for record in event.get("Records", []):
        try:
            body = json.loads(record["body"])
            detail = body if "isbn" in body else body.get("detail", {})
            isbn = detail["isbn"]

            # Invalidate all cache keys containing this ISBN
            pattern = f"inventory:*{isbn}*"
            keys = list(redis_client.scan_iter(match=pattern, count=100))

            if keys:
                redis_client.delete(*keys)
                invalidated += len(keys)
                logger.info("Invalidated %d cache keys for ISBN %s.", len(keys), isbn)

        except Exception as exc:
            logger.error("Cache invalidation failed: %s", exc)

    return {"invalidated": invalidated}

3. Data Synchronization Pipeline -- CDC and Batch Sync

"""
enterprise_connectivity/sync/data_sync_pipeline.py

Data synchronization pipeline for MangaAssist:
  - Real-time CDC via DMS -> DynamoDB Streams -> Lambda
  - Nightly batch ETL via Glue -> S3 -> OpenSearch re-index
  - Conflict resolution and data quality validation

Keeps the chatbot's product catalog, vector embeddings, and cache
in sync with the legacy ERP as the source of truth.
"""

import json
import logging
import os
import hashlib
from datetime import datetime, timezone
from typing import Any, Optional

import boto3
from botocore.config import Config

logger = logging.getLogger(__name__)

AWS_REGION = os.environ.get("AWS_REGION", "ap-northeast-1")
boto_config = Config(
    region_name=AWS_REGION,
    retries={"max_attempts": 3, "mode": "adaptive"},
)

s3 = boto3.client("s3", config=boto_config)
glue = boto3.client("glue", config=boto_config)
dynamodb = boto3.resource("dynamodb", config=boto_config)


# ---------------------------------------------------------------------------
# Data quality validation
# ---------------------------------------------------------------------------

class DataQualityValidator:
    """
    Validates incoming data before it reaches DynamoDB / OpenSearch.
    Prevents corrupt or incomplete legacy data from poisoning the chatbot.
    """

    REQUIRED_FIELDS = {"isbn", "title", "quantity", "warehouse_code"}
    ISBN_PATTERN_10 = r"^\d{9}[\dX]$"
    ISBN_PATTERN_13 = r"^\d{13}$"

    def __init__(self):
        import re
        self._isbn10_re = re.compile(self.ISBN_PATTERN_10)
        self._isbn13_re = re.compile(self.ISBN_PATTERN_13)
        self._validation_errors: list[dict] = []

    def validate_record(self, record: dict) -> tuple[bool, list[str]]:
        """Validate a single record. Returns (is_valid, list_of_errors)."""
        errors = []

        # Check required fields
        missing = self.REQUIRED_FIELDS - set(record.keys())
        if missing:
            errors.append(f"Missing required fields: {missing}")

        # Validate ISBN format
        isbn = record.get("isbn", "")
        isbn_clean = isbn.replace("-", "")
        if not (self._isbn10_re.match(isbn_clean) or self._isbn13_re.match(isbn_clean)):
            errors.append(f"Invalid ISBN format: {isbn}")

        # Validate quantity is non-negative
        qty = record.get("quantity")
        if qty is not None and (not isinstance(qty, (int, float)) or qty < 0):
            errors.append(f"Invalid quantity: {qty}")

        # Validate title is non-empty and reasonable length
        title = record.get("title", "")
        if not title or len(title) > 500:
            errors.append(f"Invalid title length: {len(title)}")

        is_valid = len(errors) == 0
        if not is_valid:
            self._validation_errors.append({
                "record": record,
                "errors": errors,
                "timestamp": datetime.now(timezone.utc).isoformat(),
            })

        return is_valid, errors

    def get_error_report(self) -> dict:
        """Summary of all validation errors in this batch."""
        return {
            "total_errors": len(self._validation_errors),
            "errors": self._validation_errors[-100:],  # last 100
        }


# ---------------------------------------------------------------------------
# Real-time CDC sync handler
# ---------------------------------------------------------------------------

class CDCSyncHandler:
    """
    Processes DynamoDB Streams events triggered by DMS CDC writes.
    Re-indexes changed records in OpenSearch for vector search.
    """

    def __init__(self, opensearch_endpoint: str, index_name: str = "manga-catalog"):
        self._os_endpoint = opensearch_endpoint
        self._index_name = index_name
        self._validator = DataQualityValidator()
        self._bedrock = boto3.client("bedrock-runtime", config=boto_config)

    def handle_stream_event(self, event: dict) -> dict:
        """
        Lambda handler for DynamoDB Streams events.
        Processes INSERT and MODIFY events to keep OpenSearch in sync.
        """
        processed = 0
        skipped = 0
        errors = 0

        for record in event.get("Records", []):
            event_name = record.get("eventName", "")

            if event_name not in ("INSERT", "MODIFY"):
                skipped += 1
                continue

            try:
                new_image = record.get("dynamodb", {}).get("NewImage", {})
                item = self._deserialize_dynamodb_item(new_image)

                is_valid, validation_errors = self._validator.validate_record(item)
                if not is_valid:
                    logger.warning(
                        "Skipping invalid record ISBN=%s: %s",
                        item.get("isbn"), validation_errors,
                    )
                    skipped += 1
                    continue

                # Generate embedding for vector search
                embedding = self._generate_embedding(item)

                # Upsert into OpenSearch
                self._upsert_opensearch(item, embedding)
                processed += 1

            except Exception as exc:
                errors += 1
                logger.error("CDC sync error: %s", exc)

        return {
            "processed": processed,
            "skipped": skipped,
            "errors": errors,
            "validation_report": self._validator.get_error_report(),
        }

    def _generate_embedding(self, item: dict) -> list[float]:
        """Generate text embedding via Bedrock for OpenSearch vector field."""
        text = f"{item.get('title', '')} {item.get('author', '')} {item.get('genre', '')}"

        response = self._bedrock.invoke_model(
            modelId="amazon.titan-embed-text-v2:0",
            body=json.dumps({"inputText": text}),
        )
        body = json.loads(response["body"].read())
        return body.get("embedding", [])

    def _upsert_opensearch(self, item: dict, embedding: list[float]) -> None:
        """Upsert document into OpenSearch Serverless."""
        from opensearchpy import OpenSearch, RequestsHttpConnection
        from requests_aws4auth import AWS4Auth

        credentials = boto3.Session().get_credentials()
        awsauth = AWS4Auth(
            credentials.access_key,
            credentials.secret_key,
            AWS_REGION,
            "aoss",
            session_token=credentials.token,
        )

        client = OpenSearch(
            hosts=[{"host": self._os_endpoint, "port": 443}],
            http_auth=awsauth,
            use_ssl=True,
            connection_class=RequestsHttpConnection,
        )

        doc = {
            "isbn": item["isbn"],
            "title": item["title"],
            "author": item.get("author", ""),
            "genre": item.get("genre", ""),
            "quantity_available": item.get("quantity", 0),
            "stock_status": item.get("stock_status", "unknown"),
            "price_jpy": item.get("price_jpy"),
            "embedding": embedding,
            "last_sync_utc": datetime.now(timezone.utc).isoformat(),
        }

        client.index(
            index=self._index_name,
            id=item["isbn"],
            body=doc,
        )

    @staticmethod
    def _deserialize_dynamodb_item(image: dict) -> dict:
        """Convert DynamoDB stream format to plain dict."""
        deserializer = boto3.dynamodb.types.TypeDeserializer()
        return {k: deserializer.deserialize(v) for k, v in image.items()}


# ---------------------------------------------------------------------------
# Batch ETL orchestrator
# ---------------------------------------------------------------------------

class BatchSyncOrchestrator:
    """
    Orchestrates nightly full catalog sync:
      1. Export from legacy ERP (via DMS full-load task)
      2. Stage in S3
      3. Run Glue ETL for transformation + quality checks
      4. Load into DynamoDB and trigger OpenSearch re-index

    Provides checkpoint/restart for fault tolerance.
    """

    def __init__(
        self,
        s3_bucket: str,
        glue_job_name: str,
        products_table: str = "manga-products",
    ):
        self._bucket = s3_bucket
        self._glue_job = glue_job_name
        self._table_name = products_table

    def start_full_sync(self) -> dict:
        """Kick off the nightly batch sync pipeline."""
        run_id = datetime.now(timezone.utc).strftime("%Y%m%d_%H%M%S")
        s3_prefix = f"batch-sync/{run_id}/"

        logger.info("Starting batch sync run_id=%s", run_id)

        # Step 1: Start Glue job
        glue_run = glue.start_job_run(
            JobName=self._glue_job,
            Arguments={
                "--S3_BUCKET": self._bucket,
                "--S3_PREFIX": s3_prefix,
                "--RUN_ID": run_id,
                "--TARGET_TABLE": self._table_name,
            },
        )

        return {
            "run_id": run_id,
            "glue_job_run_id": glue_run["JobRunId"],
            "s3_staging": f"s3://{self._bucket}/{s3_prefix}",
            "status": "started",
        }

    def check_sync_status(self, glue_run_id: str) -> dict:
        """Poll Glue job status."""
        response = glue.get_job_run(
            JobName=self._glue_job,
            RunId=glue_run_id,
        )
        run = response["JobRun"]
        return {
            "status": run["JobRunState"],
            "started": str(run.get("StartedOn", "")),
            "completed": str(run.get("CompletedOn", "")),
            "error": run.get("ErrorMessage", ""),
            "execution_time_sec": run.get("ExecutionTime", 0),
        }

    def validate_sync_results(self, run_id: str) -> dict:
        """
        Post-sync validation: compare record counts between
        legacy ERP export and DynamoDB to detect data loss.
        """
        # Count records in S3 staging
        s3_prefix = f"batch-sync/{run_id}/transformed/"
        paginator = s3.get_paginator("list_objects_v2")
        s3_count = 0
        for page in paginator.paginate(Bucket=self._bucket, Prefix=s3_prefix):
            s3_count += page.get("KeyCount", 0)

        # Count records in DynamoDB
        table = dynamodb.Table(self._table_name)
        dynamo_count = table.item_count  # approximate, updated every ~6 hours

        discrepancy = abs(s3_count - dynamo_count)
        threshold = max(10, int(s3_count * 0.01))  # 1% tolerance

        return {
            "run_id": run_id,
            "s3_record_count": s3_count,
            "dynamodb_record_count": dynamo_count,
            "discrepancy": discrepancy,
            "within_tolerance": discrepancy <= threshold,
            "tolerance_threshold": threshold,
        }

Key Takeaways

# Takeaway MangaAssist Application
1 Anti-corruption layers isolate FM domains from legacy coupling ACL translates SOAP/XML + Shift-JIS from the ERP into clean Python domain objects before Bedrock prompt construction
2 Circuit breakers protect throughput when legacy systems degrade If the ERP fails 5x in a row, the breaker opens for 30s -- chatbot falls back to cached data instead of blocking
3 EventBridge enables loose coupling via publish-subscribe Inventory changes fan out to DynamoDB, OpenSearch, and Redis cache invalidation without point-to-point wiring
4 CDC (Change Data Capture) keeps read stores in near-real-time sync DMS streams ERP database changes -> DynamoDB Streams -> Lambda -> OpenSearch re-index pipeline
5 Batch ETL provides a safety net for full reconciliation Nightly Glue job re-syncs the entire manga catalog; post-sync validation catches any CDC drift
6 Request deduplication prevents wasted legacy capacity Concurrent chatbot queries for the same ISBN share a single in-flight ERP call
7 Data quality gates stop corrupt data before it reaches the chatbot Validator rejects records with invalid ISBNs, negative quantities, or missing fields
8 Encoding normalization is critical for JP content Shift-JIS to UTF-8 conversion at the adapter boundary -- all downstream systems operate in UTF-8 only

Exam-Relevant Connections

  • API Gateway + VTL transforms can perform lightweight request/response mapping without Lambda, reducing latency for simple format translations.
  • AppSync pipeline resolvers can aggregate data from DynamoDB (products), OpenSearch (search), and the legacy ERP (stock) in a single GraphQL query -- relevant for Skill 2.3.1's "seamless incorporation."
  • DMS + Kinesis Data Streams is the AWS-native answer to CDC from on-premises databases; contrast with third-party tools like Debezium.
  • EventBridge Schema Registry auto-discovers event schemas from the bus -- important for governance when multiple teams publish events.
  • SQS FIFO queues guarantee exactly-once processing and ordering for transactional events (e.g., payment confirmations) -- standard SQS only guarantees at-least-once.