Skill 2.3.1 -- Enterprise Connectivity Architecture for FM Integration
MangaAssist context: JP Manga store chatbot on AWS — Bedrock Claude 3 (Sonnet at $3/$15 per 1M tokens input/output, Haiku at $0.25/$1.25), OpenSearch Serverless (vector store), DynamoDB (sessions/products), ECS Fargate (orchestrator), API Gateway WebSocket, ElastiCache Redis. Target: useful answer in under 3 seconds, 1M messages/day scale.
Mind Map -- Enterprise Connectivity Solutions
Enterprise Connectivity Solutions (Skill 2.3.1)
│
├── API-Based Legacy Integration
│ ├── REST Adapters
│ │ ├── Resource mapping to legacy operations
│ │ ├── HTTP verb translation (GET/POST/PUT/DELETE)
│ │ ├── Pagination wrapping for bulk legacy queries
│ │ └── Content negotiation (JSON <-> XML/CSV/fixed-width)
│ ├── SOAP Wrappers
│ │ ├── WSDL-to-REST auto-generation
│ │ ├── Envelope construction/deconstruction
│ │ ├── WS-Security token passthrough
│ │ └── Schema validation at boundary
│ └── Message Format Translation
│ ├── Canonical data model (CDM)
│ ├── Field mapping rules engine
│ ├── Character encoding normalization (Shift-JIS <-> UTF-8)
│ └── Date/currency format harmonization
│
├── Event-Driven Architecture
│ ├── Amazon EventBridge
│ │ ├── Custom event bus per domain
│ │ ├── Schema registry for event contracts
│ │ ├── Content-based filtering rules
│ │ └── Archive & replay for debugging
│ ├── SNS/SQS Patterns
│ │ ├── Fan-out for multi-consumer events
│ │ ├── Dead-letter queues for poison messages
│ │ ├── FIFO ordering for transactional events
│ │ └── Message deduplication
│ └── Loose Coupling Patterns
│ ├── Publish-subscribe for inventory updates
│ ├── Command-query separation (CQRS)
│ ├── Saga orchestration for multi-step workflows
│ └── Circuit breaker at integration points
│
├── Data Synchronization
│ ├── CDC with DMS (Change Data Capture)
│ │ ├── Source: legacy Oracle/MySQL databases
│ │ ├── Target: DynamoDB / OpenSearch
│ │ ├── Continuous replication with minimal lag
│ │ └── Schema evolution handling
│ ├── Batch ETL
│ │ ├── Nightly full sync for catalog data
│ │ ├── AWS Glue jobs for transformation
│ │ ├── S3 staging for checkpointed recovery
│ │ └── Data quality validation gates
│ └── Real-Time Sync
│ ├── DynamoDB Streams -> Lambda -> downstream
│ ├── Kinesis Data Streams for high-throughput
│ ├── Conflict resolution (last-writer-wins vs merge)
│ └── Eventual consistency SLA targets
│
├── Enterprise Patterns
│ ├── Anti-Corruption Layer (ACL)
│ │ ├── Isolates FM domain from legacy models
│ │ ├── Translates legacy schemas to clean domain objects
│ │ ├── Prevents legacy coupling from leaking upstream
│ │ └── Versioned interface contracts
│ ├── Adapter Pattern
│ │ ├── Wraps legacy client libraries
│ │ ├── Normalizes error codes and exceptions
│ │ ├── Provides consistent retry semantics
│ │ └── Implements circuit breaker per adapter
│ └── Facade Pattern
│ ├── Unified API surface for multiple backends
│ ├── Aggregates data from 3+ legacy sources
│ ├── Caches composite responses in Redis
│ └── Simplifies downstream FM prompt construction
│
└── Integration Middleware
├── API Gateway
│ ├── Rate limiting per legacy backend capacity
│ ├── Request/response transformation templates
│ ├── API key management for partner systems
│ └── Usage plans aligned to legacy SLAs
└── AWS AppSync
├── GraphQL aggregation layer
├── Real-time subscriptions for live inventory
├── Pipeline resolvers chaining multiple sources
└── Conflict detection for offline-first clients
Architecture Diagram -- MangaAssist Enterprise Integration
┌─────────────────────────────────────────────────────────────────────────────────────┐
│ MangaAssist Enterprise Integration │
│ │
│ ┌──────────────┐ WebSocket ┌──────────────────┐ │
│ │ Customer │◄───────────────►│ API Gateway │ │
│ │ (Browser / │ │ (WebSocket) │ │
│ │ Mobile) │ └────────┬─────────┘ │
│ └──────────────┘ │ │
│ ▼ │
│ ┌──────────────────┐ ┌───────────────────┐ │
│ │ ECS Fargate │────►│ ElastiCache │ │
│ │ (Orchestrator) │◄────│ Redis │ │
│ │ │ │ (Session + │ │
│ │ - Route intent │ │ Response Cache) │ │
│ │ - Compose prompt │ └───────────────────┘ │
│ │ - Call Bedrock │ │
│ │ - Merge results │ │
│ └──┬───┬───┬───┬───┘ │
│ │ │ │ │ │
│ ┌─────────────────────┘ │ │ └─────────────────────┐ │
│ ▼ ▼ ▼ ▼ │
│ ┌────────────────────┐ ┌─────────────────────────┐ ┌────────────────────────┐ │
│ │ Anti-Corruption │ │ Amazon Bedrock │ │ OpenSearch │ │
│ │ Layer (ACL) │ │ Claude 3 Sonnet/Haiku │ │ Serverless │ │
│ │ │ │ │ │ (Vector Store) │ │
│ │ - Schema xlate │ │ - Answer generation │ │ │ │
│ │ - Error normalize │ │ - Intent classification │ │ - Manga embeddings │ │
│ │ - Encoding fix │ │ - Summarization │ │ - Semantic search │ │
│ └──┬────┬────┬──────┘ └───────────────────────────┘ └────────────────────────┘ │
│ │ │ │ │
│ │ │ │ ┌────────────────────────────────────────┐ │
│ │ │ │ │ EventBridge (Event Bus) │ │
│ │ │ │ │ │ │
│ │ │ │ │ Rules: │ │
│ │ │ │ │ - inventory.updated -> sync pipeline │ │
│ │ │ │ │ - order.placed -> order mgmt │ │
│ │ │ │ │ - payment.completed -> ERP update │ │
│ │ │ │ └────────┬──────────┬───────────┬───────┘ │
│ │ │ │ │ │ │ │
│ ▼ ▼ ▼ ▼ ▼ ▼ │
│ ┌──────────────┐ ┌───────────────┐ ┌──────────┐ ┌──────────────┐ │
│ │ Legacy ERP │ │ Order Mgmt │ │ Payment │ │ DMS (CDC) │ │
│ │ (Inventory) │ │ System │ │ Gateway │ │ │ │
│ │ │ │ │ │ │ │ Oracle ──► │ │
│ │ SOAP/XML API │ │ REST API v2 │ │ REST API │ │ DynamoDB │ │
│ │ Shift-JIS │ │ (paginated) │ │ (PCI) │ │ │ │
│ │ 50 req/s max │ │ 200 req/s │ │ mTLS │ │ Continuous │ │
│ └──────────────┘ └───────────────┘ └──────────┘ │ replication │ │
│ └──────────────┘ │
│ │
│ ┌──────────────────────────────────────────────────────────────────────────────┐ │
│ │ Data Synchronization Layer │ │
│ │ │ │
│ │ ┌────────────┐ ┌──────────────┐ ┌──────────────┐ ┌─────────────┐ │ │
│ │ │ DMS CDC │───►│ S3 Staging │───►│ Glue ETL │───►│ DynamoDB / │ │ │
│ │ │ (Real-time)│ │ (Checkpoint)│ │ (Transform) │ │ OpenSearch │ │ │
│ │ └────────────┘ └──────────────┘ └──────────────┘ └─────────────┘ │ │
│ │ │ │
│ │ Nightly batch: Full catalog sync (Glue) ──► S3 ──► OpenSearch re-index │ │
│ └──────────────────────────────────────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────────────────────────────────────┘
Data Flow for "Is Volume 42 of One Piece in stock?"
1. Customer sends WebSocket message
2. API Gateway -> ECS Fargate orchestrator
3. Orchestrator checks Redis cache (hit? return in <200ms)
4. Cache miss -> Bedrock Haiku classifies intent: "inventory_check"
5. Orchestrator routes to ACL -> Legacy ERP adapter
6. ACL translates: REST GET /inventory?isbn=978-4088... -> SOAP envelope (Shift-JIS)
7. Legacy ERP responds with XML stock data (50 req/s limit, ~800ms p99)
8. ACL translates response: XML -> JSON, Shift-JIS -> UTF-8
9. Orchestrator enriches with OpenSearch (related volumes, reviews)
10. Bedrock Sonnet generates natural language answer with stock info
11. Response cached in Redis (TTL=300s for inventory data)
12. Total latency budget: ~2.4s (within 3s target)
Production Code -- Enterprise Connectivity Components
1. API Adapter for Legacy Inventory System
"""
enterprise_connectivity/adapters/legacy_inventory_adapter.py
Adapter that wraps the legacy ERP's SOAP/XML inventory API behind a clean
async interface. Handles Shift-JIS encoding, SOAP envelope construction,
circuit breaking, and response normalization.
MangaAssist production component -- sits inside the Anti-Corruption Layer.
"""
import asyncio
import hashlib
import logging
import time
from dataclasses import dataclass, field
from enum import Enum
from typing import Optional
from xml.etree import ElementTree as ET
import aiohttp
from aiohttp import ClientTimeout
logger = logging.getLogger(__name__)
# ---------------------------------------------------------------------------
# Domain value objects
# ---------------------------------------------------------------------------
class StockStatus(Enum):
IN_STOCK = "in_stock"
LOW_STOCK = "low_stock"
OUT_OF_STOCK = "out_of_stock"
PREORDER = "preorder"
UNKNOWN = "unknown"
@dataclass(frozen=True)
class InventoryItem:
"""Clean domain object returned by the adapter -- no legacy coupling."""
isbn: str
title: str
quantity_available: int
warehouse_location: str
stock_status: StockStatus
last_restock_date: Optional[str] = None
price_jpy: Optional[int] = None
estimated_restock_days: Optional[int] = None
@dataclass
class CircuitBreakerState:
"""Tracks failures to implement circuit-breaker pattern."""
failure_count: int = 0
last_failure_time: float = 0.0
is_open: bool = False
half_open_deadline: float = 0.0
# Thresholds
failure_threshold: int = 5
recovery_timeout_seconds: float = 30.0
def record_failure(self) -> None:
self.failure_count += 1
self.last_failure_time = time.monotonic()
if self.failure_count >= self.failure_threshold:
self.is_open = True
self.half_open_deadline = (
time.monotonic() + self.recovery_timeout_seconds
)
logger.warning(
"Circuit breaker OPEN after %d failures. "
"Will half-open at +%.0fs.",
self.failure_count,
self.recovery_timeout_seconds,
)
def record_success(self) -> None:
if self.is_open:
logger.info("Circuit breaker CLOSED after successful probe.")
self.failure_count = 0
self.is_open = False
def allow_request(self) -> bool:
if not self.is_open:
return True
if time.monotonic() >= self.half_open_deadline:
logger.info("Circuit breaker HALF-OPEN -- allowing probe request.")
return True # half-open: allow one probe
return False
# ---------------------------------------------------------------------------
# SOAP envelope helpers
# ---------------------------------------------------------------------------
SOAP_ENVELOPE_TEMPLATE = """<?xml version="1.0" encoding="Shift_JIS"?>
<soapenv:Envelope
xmlns:soapenv="http://schemas.xmlsoap.org/soap/envelope/"
xmlns:inv="http://legacy-erp.manga-store.co.jp/inventory/v1">
<soapenv:Header>
<inv:AuthToken>{auth_token}</inv:AuthToken>
</soapenv:Header>
<soapenv:Body>
<inv:GetStockLevel>
<inv:ISBN>{isbn}</inv:ISBN>
<inv:WarehouseCode>{warehouse_code}</inv:WarehouseCode>
</inv:GetStockLevel>
</soapenv:Body>
</soapenv:Envelope>"""
SOAP_BATCH_ENVELOPE_TEMPLATE = """<?xml version="1.0" encoding="Shift_JIS"?>
<soapenv:Envelope
xmlns:soapenv="http://schemas.xmlsoap.org/soap/envelope/"
xmlns:inv="http://legacy-erp.manga-store.co.jp/inventory/v1">
<soapenv:Header>
<inv:AuthToken>{auth_token}</inv:AuthToken>
</soapenv:Header>
<soapenv:Body>
<inv:GetBatchStockLevels>
{isbn_elements}
</inv:GetBatchStockLevels>
</soapenv:Body>
</soapenv:Envelope>"""
def _build_soap_request(isbn: str, warehouse: str, token: str) -> bytes:
"""Build SOAP envelope and encode to Shift-JIS bytes."""
xml_str = SOAP_ENVELOPE_TEMPLATE.format(
auth_token=token,
isbn=isbn,
warehouse_code=warehouse,
)
return xml_str.encode("shift_jis")
def _build_batch_soap_request(
isbns: list[str], warehouse: str, token: str
) -> bytes:
"""Build batch SOAP envelope for up to 20 ISBNs per call."""
isbn_elements = "\n ".join(
f"<inv:ISBN>{isbn}</inv:ISBN>" for isbn in isbns
)
xml_str = SOAP_BATCH_ENVELOPE_TEMPLATE.format(
auth_token=token,
isbn_elements=isbn_elements,
)
return xml_str.encode("shift_jis")
def _parse_stock_response(raw_xml: bytes) -> list[dict]:
"""
Parse legacy ERP XML response (Shift-JIS encoded) into Python dicts.
Handles both single-item and batch responses.
"""
text = raw_xml.decode("shift_jis", errors="replace")
root = ET.fromstring(text)
ns = {
"soapenv": "http://schemas.xmlsoap.org/soap/envelope/",
"inv": "http://legacy-erp.manga-store.co.jp/inventory/v1",
}
items = root.findall(".//inv:StockItem", ns)
results = []
for item in items:
results.append({
"isbn": _text(item, "inv:ISBN", ns),
"title": _text(item, "inv:Title", ns),
"quantity": int(_text(item, "inv:Quantity", ns) or "0"),
"warehouse": _text(item, "inv:WarehouseCode", ns),
"status_code": _text(item, "inv:StatusCode", ns),
"last_restock": _text(item, "inv:LastRestockDate", ns),
"price": _text(item, "inv:PriceJPY", ns),
"restock_eta_days": _text(item, "inv:RestockETADays", ns),
})
return results
def _text(element: ET.Element, path: str, ns: dict) -> Optional[str]:
"""Safely extract text from an XML element."""
node = element.find(path, ns)
return node.text.strip() if node is not None and node.text else None
def _map_status_code(code: Optional[str]) -> StockStatus:
"""Map legacy numeric status codes to clean domain enum."""
mapping = {
"01": StockStatus.IN_STOCK,
"02": StockStatus.LOW_STOCK,
"03": StockStatus.OUT_OF_STOCK,
"04": StockStatus.PREORDER,
}
return mapping.get(code or "", StockStatus.UNKNOWN)
# ---------------------------------------------------------------------------
# Main adapter class
# ---------------------------------------------------------------------------
class LegacyInventoryAdapter:
"""
Production adapter for the legacy manga inventory ERP.
Features:
- SOAP/XML <-> Python dict translation
- Shift-JIS <-> UTF-8 encoding
- Circuit breaker (opens after 5 consecutive failures, 30s recovery)
- Concurrency limiter (respects legacy 50 req/s ceiling)
- Retry with exponential back-off (max 3 attempts)
- Request deduplication within a short window
- Batch query support (up to 20 ISBNs per SOAP call)
Usage:
adapter = LegacyInventoryAdapter(
endpoint="https://erp.manga-store.co.jp/soap/inventory",
auth_token="<token>",
)
item = await adapter.get_stock("978-4-08-882751-6")
"""
MAX_RETRIES = 3
BASE_BACKOFF_SECONDS = 0.5
REQUEST_TIMEOUT_SECONDS = 5.0
MAX_CONCURRENT_REQUESTS = 50 # legacy system ceiling
BATCH_SIZE = 20
DEDUP_WINDOW_SECONDS = 2.0
def __init__(
self,
endpoint: str,
auth_token: str,
warehouse_code: str = "TK01",
) -> None:
self._endpoint = endpoint
self._auth_token = auth_token
self._warehouse = warehouse_code
self._circuit = CircuitBreakerState()
self._semaphore = asyncio.Semaphore(self.MAX_CONCURRENT_REQUESTS)
self._session: Optional[aiohttp.ClientSession] = None
# In-flight dedup: isbn -> (timestamp, Future)
self._inflight: dict[str, tuple[float, asyncio.Future]] = {}
async def _get_session(self) -> aiohttp.ClientSession:
if self._session is None or self._session.closed:
self._session = aiohttp.ClientSession(
timeout=ClientTimeout(total=self.REQUEST_TIMEOUT_SECONDS),
headers={"Content-Type": "text/xml; charset=Shift_JIS"},
)
return self._session
# -- Public API ----------------------------------------------------------
async def get_stock(self, isbn: str) -> InventoryItem:
"""
Query stock level for a single ISBN.
Returns a clean InventoryItem domain object.
"""
if not self._circuit.allow_request():
raise LegacySystemUnavailableError(
"Circuit breaker is OPEN for legacy ERP."
)
# Dedup: if same ISBN is already in flight, wait for that result
now = time.monotonic()
if isbn in self._inflight:
ts, fut = self._inflight[isbn]
if now - ts < self.DEDUP_WINDOW_SECONDS:
logger.debug("Dedup hit for ISBN %s, reusing in-flight.", isbn)
return await fut
future: asyncio.Future = asyncio.get_event_loop().create_future()
self._inflight[isbn] = (now, future)
try:
result = await self._execute_with_retry(isbn)
future.set_result(result)
return result
except Exception as exc:
future.set_exception(exc)
raise
finally:
self._inflight.pop(isbn, None)
async def get_stock_batch(self, isbns: list[str]) -> list[InventoryItem]:
"""
Batch query for multiple ISBNs (up to BATCH_SIZE per call).
Splits into chunks automatically.
"""
if not self._circuit.allow_request():
raise LegacySystemUnavailableError(
"Circuit breaker is OPEN for legacy ERP."
)
results: list[InventoryItem] = []
chunks = [
isbns[i : i + self.BATCH_SIZE]
for i in range(0, len(isbns), self.BATCH_SIZE)
]
for chunk in chunks:
body = _build_batch_soap_request(
chunk, self._warehouse, self._auth_token
)
raw = await self._send_soap_request(body)
parsed = _parse_stock_response(raw)
results.extend(self._to_domain_objects(parsed))
return results
async def health_check(self) -> dict:
"""Lightweight probe -- calls the ERP healthcheck endpoint."""
session = await self._get_session()
try:
async with session.get(
self._endpoint.replace("/soap/inventory", "/health"),
timeout=ClientTimeout(total=3.0),
) as resp:
ok = resp.status == 200
self._circuit.record_success() if ok else self._circuit.record_failure()
return {
"healthy": ok,
"status_code": resp.status,
"circuit_open": self._circuit.is_open,
}
except Exception as exc:
self._circuit.record_failure()
return {
"healthy": False,
"error": str(exc),
"circuit_open": self._circuit.is_open,
}
async def close(self) -> None:
if self._session and not self._session.closed:
await self._session.close()
# -- Private helpers -----------------------------------------------------
async def _execute_with_retry(self, isbn: str) -> InventoryItem:
body = _build_soap_request(isbn, self._warehouse, self._auth_token)
last_exc: Optional[Exception] = None
for attempt in range(1, self.MAX_RETRIES + 1):
try:
raw = await self._send_soap_request(body)
parsed = _parse_stock_response(raw)
if not parsed:
raise LegacyDataError(f"No stock data for ISBN {isbn}")
self._circuit.record_success()
return self._to_domain_objects(parsed)[0]
except (aiohttp.ClientError, asyncio.TimeoutError) as exc:
last_exc = exc
self._circuit.record_failure()
backoff = self.BASE_BACKOFF_SECONDS * (2 ** (attempt - 1))
logger.warning(
"Legacy ERP attempt %d/%d failed: %s. Retrying in %.1fs.",
attempt, self.MAX_RETRIES, exc, backoff,
)
await asyncio.sleep(backoff)
raise LegacySystemUnavailableError(
f"Legacy ERP unreachable after {self.MAX_RETRIES} attempts."
) from last_exc
async def _send_soap_request(self, body: bytes) -> bytes:
session = await self._get_session()
async with self._semaphore:
async with session.post(self._endpoint, data=body) as resp:
if resp.status != 200:
raise LegacySoapFaultError(
f"SOAP fault: HTTP {resp.status}"
)
return await resp.read()
@staticmethod
def _to_domain_objects(parsed: list[dict]) -> list[InventoryItem]:
items = []
for row in parsed:
items.append(InventoryItem(
isbn=row["isbn"] or "",
title=row["title"] or "",
quantity_available=row["quantity"],
warehouse_location=row["warehouse"] or "",
stock_status=_map_status_code(row["status_code"]),
last_restock_date=row.get("last_restock"),
price_jpy=int(row["price"]) if row.get("price") else None,
estimated_restock_days=(
int(row["restock_eta_days"])
if row.get("restock_eta_days") else None
),
))
return items
# ---------------------------------------------------------------------------
# Custom exceptions
# ---------------------------------------------------------------------------
class LegacySystemUnavailableError(Exception):
"""Raised when the legacy ERP is unreachable or circuit is open."""
class LegacySoapFaultError(Exception):
"""Raised when the legacy ERP returns a SOAP fault."""
class LegacyDataError(Exception):
"""Raised when the response data is missing or unparseable."""
2. EventBridge Integration -- Inventory Event Pipeline
"""
enterprise_connectivity/events/inventory_event_pipeline.py
EventBridge integration that publishes and consumes inventory events.
Bridges the gap between legacy ERP batch inventory updates and the
real-time needs of the MangaAssist chatbot.
Events flow:
Legacy ERP -> DMS CDC -> Lambda -> EventBridge -> (fan-out)
-> SQS -> Lambda -> DynamoDB (product catalog)
-> SQS -> Lambda -> OpenSearch (vector re-index)
-> SQS -> Lambda -> Redis cache invalidation
"""
import json
import logging
import os
import time
from dataclasses import asdict, dataclass
from datetime import datetime, timezone
from typing import Any, Optional
import boto3
from botocore.config import Config
logger = logging.getLogger(__name__)
# ---------------------------------------------------------------------------
# Configuration
# ---------------------------------------------------------------------------
EVENT_BUS_NAME = os.environ.get(
"INVENTORY_EVENT_BUS", "manga-assist-inventory"
)
EVENT_SOURCE = "manga-assist.inventory"
AWS_REGION = os.environ.get("AWS_REGION", "ap-northeast-1")
boto_config = Config(
region_name=AWS_REGION,
retries={"max_attempts": 3, "mode": "adaptive"},
)
eventbridge = boto3.client("events", config=boto_config)
sqs = boto3.client("sqs", config=boto_config)
# ---------------------------------------------------------------------------
# Event schema
# ---------------------------------------------------------------------------
@dataclass
class InventoryChangeEvent:
"""
Canonical event schema for inventory changes.
Published to EventBridge for fan-out to multiple consumers.
"""
event_type: str # "inventory.updated" | "inventory.restocked" | "inventory.depleted"
isbn: str
title: str
previous_quantity: int
new_quantity: int
warehouse_code: str
change_source: str # "erp_cdc" | "manual_adjustment" | "order_fulfillment"
timestamp_utc: str
correlation_id: str
metadata: dict = None
def __post_init__(self):
if self.metadata is None:
self.metadata = {}
def to_eventbridge_entry(self) -> dict:
"""Format for EventBridge PutEvents API."""
return {
"Source": EVENT_SOURCE,
"DetailType": self.event_type,
"Detail": json.dumps(asdict(self), default=str),
"EventBusName": EVENT_BUS_NAME,
"Time": datetime.fromisoformat(self.timestamp_utc),
}
@classmethod
def from_cdc_record(cls, cdc_record: dict, correlation_id: str) -> "InventoryChangeEvent":
"""
Factory: build from a DMS CDC change record.
CDC records arrive as: {"data": {"ISBN": ..., "QTY": ..., ...}, "metadata": {...}}
"""
data = cdc_record.get("data", {})
meta = cdc_record.get("metadata", {})
old_qty = int(data.get("OLD_QTY", 0))
new_qty = int(data.get("QTY", 0))
if new_qty == 0:
event_type = "inventory.depleted"
elif old_qty == 0 and new_qty > 0:
event_type = "inventory.restocked"
else:
event_type = "inventory.updated"
return cls(
event_type=event_type,
isbn=data.get("ISBN", ""),
title=data.get("TITLE", ""),
previous_quantity=old_qty,
new_quantity=new_qty,
warehouse_code=data.get("WAREHOUSE_CD", "TK01"),
change_source="erp_cdc",
timestamp_utc=datetime.now(timezone.utc).isoformat(),
correlation_id=correlation_id,
metadata={
"cdc_operation": meta.get("operation", "unknown"),
"cdc_timestamp": meta.get("timestamp", ""),
"table_name": meta.get("table-name", ""),
},
)
# ---------------------------------------------------------------------------
# Publisher
# ---------------------------------------------------------------------------
class InventoryEventPublisher:
"""
Publishes inventory change events to EventBridge.
Supports batching (up to 10 entries per PutEvents call).
"""
MAX_BATCH = 10 # EventBridge limit
def __init__(self, event_bus: str = EVENT_BUS_NAME):
self._event_bus = event_bus
self._buffer: list[dict] = []
def enqueue(self, event: InventoryChangeEvent) -> None:
"""Add event to buffer. Auto-flushes when buffer is full."""
self._buffer.append(event.to_eventbridge_entry())
if len(self._buffer) >= self.MAX_BATCH:
self.flush()
def flush(self) -> dict:
"""Send buffered events to EventBridge."""
if not self._buffer:
return {"sent": 0}
batch = self._buffer[: self.MAX_BATCH]
self._buffer = self._buffer[self.MAX_BATCH :]
response = eventbridge.put_events(Entries=batch)
failed = response.get("FailedEntryCount", 0)
if failed > 0:
logger.error(
"EventBridge PutEvents: %d/%d entries failed.",
failed, len(batch),
)
# Re-enqueue failed entries for retry
for i, entry_resp in enumerate(response.get("Entries", [])):
if entry_resp.get("ErrorCode"):
self._buffer.append(batch[i])
logger.error(
" Failed entry %d: %s - %s",
i,
entry_resp.get("ErrorCode"),
entry_resp.get("ErrorMessage"),
)
else:
logger.info("Published %d events to EventBridge.", len(batch))
return {"sent": len(batch) - failed, "failed": failed}
def flush_all(self) -> dict:
"""Flush all remaining buffered events."""
total_sent = 0
total_failed = 0
while self._buffer:
result = self.flush()
total_sent += result.get("sent", 0)
total_failed += result.get("failed", 0)
return {"total_sent": total_sent, "total_failed": total_failed}
# ---------------------------------------------------------------------------
# Consumer -- Lambda handler for downstream processing
# ---------------------------------------------------------------------------
def lambda_handler_inventory_sync(event: dict, context: Any) -> dict:
"""
Lambda function triggered by SQS (fed by EventBridge rule).
Updates DynamoDB product catalog with new stock levels.
EventBridge Rule:
Source: manga-assist.inventory
DetailType: inventory.updated | inventory.restocked | inventory.depleted
Target: SQS queue -> this Lambda
"""
dynamodb = boto3.resource("dynamodb", config=boto_config)
table = dynamodb.Table(os.environ.get("PRODUCTS_TABLE", "manga-products"))
processed = 0
errors = 0
for record in event.get("Records", []):
try:
body = json.loads(record["body"])
# EventBridge wraps detail in "detail" key
detail = body if "isbn" in body else body.get("detail", {})
isbn = detail["isbn"]
new_qty = detail["new_quantity"]
event_type = detail["event_type"]
# Determine stock_status for DynamoDB
if new_qty == 0:
stock_status = "out_of_stock"
elif new_qty <= 5:
stock_status = "low_stock"
else:
stock_status = "in_stock"
table.update_item(
Key={"isbn": isbn},
UpdateExpression=(
"SET quantity_available = :qty, "
"stock_status = :status, "
"last_sync_utc = :ts, "
"last_event_type = :etype"
),
ExpressionAttributeValues={
":qty": new_qty,
":status": stock_status,
":ts": datetime.now(timezone.utc).isoformat(),
":etype": event_type,
},
)
processed += 1
logger.info(
"Updated DynamoDB: ISBN=%s qty=%d status=%s",
isbn, new_qty, stock_status,
)
except Exception as exc:
errors += 1
logger.error("Failed to process record: %s", exc)
return {"processed": processed, "errors": errors}
def lambda_handler_cache_invalidation(event: dict, context: Any) -> dict:
"""
Lambda function triggered by SQS (fed by EventBridge rule).
Invalidates Redis cache entries for changed inventory items.
This ensures the chatbot does not serve stale stock data after
an inventory update event.
"""
import redis
redis_client = redis.Redis(
host=os.environ.get("REDIS_HOST", "manga-cache.xxxxx.apne1.cache.amazonaws.com"),
port=6379,
ssl=True,
decode_responses=True,
)
invalidated = 0
for record in event.get("Records", []):
try:
body = json.loads(record["body"])
detail = body if "isbn" in body else body.get("detail", {})
isbn = detail["isbn"]
# Invalidate all cache keys containing this ISBN
pattern = f"inventory:*{isbn}*"
keys = list(redis_client.scan_iter(match=pattern, count=100))
if keys:
redis_client.delete(*keys)
invalidated += len(keys)
logger.info("Invalidated %d cache keys for ISBN %s.", len(keys), isbn)
except Exception as exc:
logger.error("Cache invalidation failed: %s", exc)
return {"invalidated": invalidated}
3. Data Synchronization Pipeline -- CDC and Batch Sync
"""
enterprise_connectivity/sync/data_sync_pipeline.py
Data synchronization pipeline for MangaAssist:
- Real-time CDC via DMS -> DynamoDB Streams -> Lambda
- Nightly batch ETL via Glue -> S3 -> OpenSearch re-index
- Conflict resolution and data quality validation
Keeps the chatbot's product catalog, vector embeddings, and cache
in sync with the legacy ERP as the source of truth.
"""
import json
import logging
import os
import hashlib
from datetime import datetime, timezone
from typing import Any, Optional
import boto3
from botocore.config import Config
logger = logging.getLogger(__name__)
AWS_REGION = os.environ.get("AWS_REGION", "ap-northeast-1")
boto_config = Config(
region_name=AWS_REGION,
retries={"max_attempts": 3, "mode": "adaptive"},
)
s3 = boto3.client("s3", config=boto_config)
glue = boto3.client("glue", config=boto_config)
dynamodb = boto3.resource("dynamodb", config=boto_config)
# ---------------------------------------------------------------------------
# Data quality validation
# ---------------------------------------------------------------------------
class DataQualityValidator:
"""
Validates incoming data before it reaches DynamoDB / OpenSearch.
Prevents corrupt or incomplete legacy data from poisoning the chatbot.
"""
REQUIRED_FIELDS = {"isbn", "title", "quantity", "warehouse_code"}
ISBN_PATTERN_10 = r"^\d{9}[\dX]$"
ISBN_PATTERN_13 = r"^\d{13}$"
def __init__(self):
import re
self._isbn10_re = re.compile(self.ISBN_PATTERN_10)
self._isbn13_re = re.compile(self.ISBN_PATTERN_13)
self._validation_errors: list[dict] = []
def validate_record(self, record: dict) -> tuple[bool, list[str]]:
"""Validate a single record. Returns (is_valid, list_of_errors)."""
errors = []
# Check required fields
missing = self.REQUIRED_FIELDS - set(record.keys())
if missing:
errors.append(f"Missing required fields: {missing}")
# Validate ISBN format
isbn = record.get("isbn", "")
isbn_clean = isbn.replace("-", "")
if not (self._isbn10_re.match(isbn_clean) or self._isbn13_re.match(isbn_clean)):
errors.append(f"Invalid ISBN format: {isbn}")
# Validate quantity is non-negative
qty = record.get("quantity")
if qty is not None and (not isinstance(qty, (int, float)) or qty < 0):
errors.append(f"Invalid quantity: {qty}")
# Validate title is non-empty and reasonable length
title = record.get("title", "")
if not title or len(title) > 500:
errors.append(f"Invalid title length: {len(title)}")
is_valid = len(errors) == 0
if not is_valid:
self._validation_errors.append({
"record": record,
"errors": errors,
"timestamp": datetime.now(timezone.utc).isoformat(),
})
return is_valid, errors
def get_error_report(self) -> dict:
"""Summary of all validation errors in this batch."""
return {
"total_errors": len(self._validation_errors),
"errors": self._validation_errors[-100:], # last 100
}
# ---------------------------------------------------------------------------
# Real-time CDC sync handler
# ---------------------------------------------------------------------------
class CDCSyncHandler:
"""
Processes DynamoDB Streams events triggered by DMS CDC writes.
Re-indexes changed records in OpenSearch for vector search.
"""
def __init__(self, opensearch_endpoint: str, index_name: str = "manga-catalog"):
self._os_endpoint = opensearch_endpoint
self._index_name = index_name
self._validator = DataQualityValidator()
self._bedrock = boto3.client("bedrock-runtime", config=boto_config)
def handle_stream_event(self, event: dict) -> dict:
"""
Lambda handler for DynamoDB Streams events.
Processes INSERT and MODIFY events to keep OpenSearch in sync.
"""
processed = 0
skipped = 0
errors = 0
for record in event.get("Records", []):
event_name = record.get("eventName", "")
if event_name not in ("INSERT", "MODIFY"):
skipped += 1
continue
try:
new_image = record.get("dynamodb", {}).get("NewImage", {})
item = self._deserialize_dynamodb_item(new_image)
is_valid, validation_errors = self._validator.validate_record(item)
if not is_valid:
logger.warning(
"Skipping invalid record ISBN=%s: %s",
item.get("isbn"), validation_errors,
)
skipped += 1
continue
# Generate embedding for vector search
embedding = self._generate_embedding(item)
# Upsert into OpenSearch
self._upsert_opensearch(item, embedding)
processed += 1
except Exception as exc:
errors += 1
logger.error("CDC sync error: %s", exc)
return {
"processed": processed,
"skipped": skipped,
"errors": errors,
"validation_report": self._validator.get_error_report(),
}
def _generate_embedding(self, item: dict) -> list[float]:
"""Generate text embedding via Bedrock for OpenSearch vector field."""
text = f"{item.get('title', '')} {item.get('author', '')} {item.get('genre', '')}"
response = self._bedrock.invoke_model(
modelId="amazon.titan-embed-text-v2:0",
body=json.dumps({"inputText": text}),
)
body = json.loads(response["body"].read())
return body.get("embedding", [])
def _upsert_opensearch(self, item: dict, embedding: list[float]) -> None:
"""Upsert document into OpenSearch Serverless."""
from opensearchpy import OpenSearch, RequestsHttpConnection
from requests_aws4auth import AWS4Auth
credentials = boto3.Session().get_credentials()
awsauth = AWS4Auth(
credentials.access_key,
credentials.secret_key,
AWS_REGION,
"aoss",
session_token=credentials.token,
)
client = OpenSearch(
hosts=[{"host": self._os_endpoint, "port": 443}],
http_auth=awsauth,
use_ssl=True,
connection_class=RequestsHttpConnection,
)
doc = {
"isbn": item["isbn"],
"title": item["title"],
"author": item.get("author", ""),
"genre": item.get("genre", ""),
"quantity_available": item.get("quantity", 0),
"stock_status": item.get("stock_status", "unknown"),
"price_jpy": item.get("price_jpy"),
"embedding": embedding,
"last_sync_utc": datetime.now(timezone.utc).isoformat(),
}
client.index(
index=self._index_name,
id=item["isbn"],
body=doc,
)
@staticmethod
def _deserialize_dynamodb_item(image: dict) -> dict:
"""Convert DynamoDB stream format to plain dict."""
deserializer = boto3.dynamodb.types.TypeDeserializer()
return {k: deserializer.deserialize(v) for k, v in image.items()}
# ---------------------------------------------------------------------------
# Batch ETL orchestrator
# ---------------------------------------------------------------------------
class BatchSyncOrchestrator:
"""
Orchestrates nightly full catalog sync:
1. Export from legacy ERP (via DMS full-load task)
2. Stage in S3
3. Run Glue ETL for transformation + quality checks
4. Load into DynamoDB and trigger OpenSearch re-index
Provides checkpoint/restart for fault tolerance.
"""
def __init__(
self,
s3_bucket: str,
glue_job_name: str,
products_table: str = "manga-products",
):
self._bucket = s3_bucket
self._glue_job = glue_job_name
self._table_name = products_table
def start_full_sync(self) -> dict:
"""Kick off the nightly batch sync pipeline."""
run_id = datetime.now(timezone.utc).strftime("%Y%m%d_%H%M%S")
s3_prefix = f"batch-sync/{run_id}/"
logger.info("Starting batch sync run_id=%s", run_id)
# Step 1: Start Glue job
glue_run = glue.start_job_run(
JobName=self._glue_job,
Arguments={
"--S3_BUCKET": self._bucket,
"--S3_PREFIX": s3_prefix,
"--RUN_ID": run_id,
"--TARGET_TABLE": self._table_name,
},
)
return {
"run_id": run_id,
"glue_job_run_id": glue_run["JobRunId"],
"s3_staging": f"s3://{self._bucket}/{s3_prefix}",
"status": "started",
}
def check_sync_status(self, glue_run_id: str) -> dict:
"""Poll Glue job status."""
response = glue.get_job_run(
JobName=self._glue_job,
RunId=glue_run_id,
)
run = response["JobRun"]
return {
"status": run["JobRunState"],
"started": str(run.get("StartedOn", "")),
"completed": str(run.get("CompletedOn", "")),
"error": run.get("ErrorMessage", ""),
"execution_time_sec": run.get("ExecutionTime", 0),
}
def validate_sync_results(self, run_id: str) -> dict:
"""
Post-sync validation: compare record counts between
legacy ERP export and DynamoDB to detect data loss.
"""
# Count records in S3 staging
s3_prefix = f"batch-sync/{run_id}/transformed/"
paginator = s3.get_paginator("list_objects_v2")
s3_count = 0
for page in paginator.paginate(Bucket=self._bucket, Prefix=s3_prefix):
s3_count += page.get("KeyCount", 0)
# Count records in DynamoDB
table = dynamodb.Table(self._table_name)
dynamo_count = table.item_count # approximate, updated every ~6 hours
discrepancy = abs(s3_count - dynamo_count)
threshold = max(10, int(s3_count * 0.01)) # 1% tolerance
return {
"run_id": run_id,
"s3_record_count": s3_count,
"dynamodb_record_count": dynamo_count,
"discrepancy": discrepancy,
"within_tolerance": discrepancy <= threshold,
"tolerance_threshold": threshold,
}
Key Takeaways
| # | Takeaway | MangaAssist Application |
|---|---|---|
| 1 | Anti-corruption layers isolate FM domains from legacy coupling | ACL translates SOAP/XML + Shift-JIS from the ERP into clean Python domain objects before Bedrock prompt construction |
| 2 | Circuit breakers protect throughput when legacy systems degrade | If the ERP fails 5x in a row, the breaker opens for 30s -- chatbot falls back to cached data instead of blocking |
| 3 | EventBridge enables loose coupling via publish-subscribe | Inventory changes fan out to DynamoDB, OpenSearch, and Redis cache invalidation without point-to-point wiring |
| 4 | CDC (Change Data Capture) keeps read stores in near-real-time sync | DMS streams ERP database changes -> DynamoDB Streams -> Lambda -> OpenSearch re-index pipeline |
| 5 | Batch ETL provides a safety net for full reconciliation | Nightly Glue job re-syncs the entire manga catalog; post-sync validation catches any CDC drift |
| 6 | Request deduplication prevents wasted legacy capacity | Concurrent chatbot queries for the same ISBN share a single in-flight ERP call |
| 7 | Data quality gates stop corrupt data before it reaches the chatbot | Validator rejects records with invalid ISBNs, negative quantities, or missing fields |
| 8 | Encoding normalization is critical for JP content | Shift-JIS to UTF-8 conversion at the adapter boundary -- all downstream systems operate in UTF-8 only |
Exam-Relevant Connections
- API Gateway + VTL transforms can perform lightweight request/response mapping without Lambda, reducing latency for simple format translations.
- AppSync pipeline resolvers can aggregate data from DynamoDB (products), OpenSearch (search), and the legacy ERP (stock) in a single GraphQL query -- relevant for Skill 2.3.1's "seamless incorporation."
- DMS + Kinesis Data Streams is the AWS-native answer to CDC from on-premises databases; contrast with third-party tools like Debezium.
- EventBridge Schema Registry auto-discovers event schemas from the bus -- important for governance when multiple teams publish events.
- SQS FIFO queues guarantee exactly-once processing and ordering for transactional events (e.g., payment confirmations) -- standard SQS only guarantees at-least-once.