LOCAL PREVIEW View on GitHub

Legacy System Integration Patterns for FM-Powered Applications

MangaAssist context: JP Manga store chatbot on AWS — Bedrock Claude 3 (Sonnet at $3/$15 per 1M tokens input/output, Haiku at $0.25/$1.25), OpenSearch Serverless (vector store), DynamoDB (sessions/products), ECS Fargate (orchestrator), API Gateway WebSocket, ElastiCache Redis. Target: useful answer in under 3 seconds, 1M messages/day scale.


Skill Mapping

Certification Task Skill This File
AWS AI Practitioner (AIP-C01) Task 2.3 — Describe methods to integrate FM into applications Skill 2.3.1 — Identify enterprise connectivity solutions Legacy ERP/CRM/mainframe integration, anti-corruption layers, adapter patterns, SOAP-to-REST conversion, message format translation

Skill Scope

Skill 2.3.1 requires understanding how Foundation Models integrate with existing enterprise systems. This file focuses on the integration layer between modern GenAI services and legacy backends — ERP inventory systems, CRM platforms, mainframe order processors — that still power the business. The key challenge: legacy systems use SOAP/XML, fixed-width formats, Shift-JIS encoding, and throttled APIs, while FM pipelines expect JSON, UTF-8, and sub-second responses.


Mind Map

mindmap
  root((Legacy System Integration))
    Anti-Corruption Layer
      Schema Translation
        Legacy XML to Domain Objects
        Shift-JIS to UTF-8
        Fixed-width to JSON
      Error Normalization
        SOAP faults to HTTP status codes
        Mainframe return codes to exceptions
        Timeout standardization
      Contract Versioning
        Interface stability guarantees
        Backward-compatible adapters
        Deprecation schedules
    Adapter Patterns
      ERP Inventory Adapter
        SOAP envelope builder
        Batch query coalescing
        Rate-limited request queue
      CRM Customer Adapter
        REST wrapper over ODBC
        Field mapping engine
        PII masking at boundary
      Mainframe Order Adapter
        MQ Series bridge
        COBOL copybook parser
        Transaction coordinator
    SOAP-to-REST Translation
      WSDL Analysis
        Operation to endpoint mapping
        Complex type flattening
        Namespace resolution
      Runtime Proxy
        Envelope construction
        Header injection
        Response unwrapping
      Schema Validation
        Request contract enforcement
        Response integrity checks
        Version negotiation
    Message Format Translation
      Canonical Data Model
        Domain-driven field names
        Standard types and units
        Extensible schema design
      Encoding Pipeline
        Character set detection
        Lossy conversion handling
        BOM management
      Date-Currency Harmonization
        JP era to ISO 8601
        JPY formatting rules
        Timezone normalization
    Integration Middleware
      Request Queue
        Priority lanes
        Deduplication window
        Backpressure signaling
      Response Cache
        TTL per data type
        Cache invalidation events
        Stale-while-revalidate
      Health Monitoring
        Heartbeat probes
        Latency percentile tracking
        Degradation alerting

Architecture — Legacy Integration Layer

flowchart TB
    subgraph Client["Client Layer"]
        Browser["Browser / Mobile"]
    end

    subgraph Gateway["API Layer"]
        APIGW["API Gateway WebSocket"]
        Fargate["ECS Fargate Orchestrator"]
    end

    subgraph ACL["Anti-Corruption Layer"]
        Router["Integration Router"]
        ERPAdapter["ERP Adapter<br/>SOAP/XML, Shift-JIS<br/>50 req/s limit"]
        CRMAdapter["CRM Adapter<br/>REST/JSON<br/>OAuth2"]
        MFAdapter["Mainframe Adapter<br/>MQ Series<br/>COBOL copybook"]
        CDM["Canonical Data Model<br/>Transform Engine"]
    end

    subgraph Legacy["Legacy Systems"]
        ERP["Legacy ERP<br/>Inventory + Pricing<br/>Oracle DB"]
        CRM["CRM System<br/>Customer Profiles<br/>SQL Server"]
        MF["Mainframe<br/>Order Processing<br/>VSAM/DB2"]
    end

    subgraph Cache["Caching Layer"]
        Redis["ElastiCache Redis<br/>Response Cache"]
        DDB["DynamoDB<br/>Session + Products"]
    end

    Browser --> APIGW --> Fargate
    Fargate --> Router
    Router --> ERPAdapter --> ERP
    Router --> CRMAdapter --> CRM
    Router --> MFAdapter --> MF
    ERPAdapter --> CDM
    CRMAdapter --> CDM
    MFAdapter --> CDM
    CDM --> Fargate
    Fargate <--> Redis
    Fargate <--> DDB

1. Anti-Corruption Layer (ACL) — Deep Dive

The ACL is a Domain-Driven Design pattern that prevents legacy system models, naming conventions, and data formats from corrupting the clean domain model used by the FM pipeline. Without an ACL, legacy coupling leaks into prompts, embeddings, and downstream services.

Why ACL Matters for GenAI

Without ACL With ACL
Prompt contains raw SOAP field names like <inv:ZaikoSuuryo> Prompt receives quantity_available: 42
Shift-JIS mojibake in Claude responses All strings UTF-8-clean before Bedrock
Legacy error codes ERR-0x4F2A shown to users Friendly domain errors: InventoryUnavailableError
Model version change breaks legacy parsers Versioned interface contracts isolate changes
Legacy downtime cascades to chatbot 500s Circuit breaker returns cached fallback

Production Python — Anti-Corruption Layer Gateway

"""
enterprise_connectivity/acl/acl_gateway.py

Anti-Corruption Layer gateway that routes requests to the appropriate
legacy adapter, applies the Canonical Data Model transform, and
enforces circuit breaker / caching policies.

MangaAssist production component.
"""

import asyncio
import logging
import time
from dataclasses import dataclass, field
from enum import Enum
from typing import Any, Optional

import aiohttp
import redis.asyncio as aioredis

logger = logging.getLogger(__name__)


class LegacySystem(Enum):
    """Enumeration of backend legacy systems."""
    ERP_INVENTORY = "erp_inventory"
    CRM_CUSTOMER = "crm_customer"
    MAINFRAME_ORDER = "mainframe_order"


class RequestPriority(Enum):
    """Priority lanes for the request queue."""
    CRITICAL = 1    # Payment, order confirmation
    HIGH = 2        # Inventory checks for active chat
    NORMAL = 3      # Recommendations, browsing
    LOW = 4         # Analytics, batch enrichment


@dataclass
class ACLRequest:
    """Normalized request entering the ACL."""
    system: LegacySystem
    operation: str
    parameters: dict
    priority: RequestPriority = RequestPriority.NORMAL
    correlation_id: str = ""
    timeout_seconds: float = 5.0


@dataclass
class ACLResponse:
    """Normalized response exiting the ACL — always clean domain objects."""
    success: bool
    data: dict
    source_system: LegacySystem
    latency_ms: float
    from_cache: bool = False
    warnings: list = field(default_factory=list)


class CircuitBreaker:
    """Per-system circuit breaker with half-open probe support."""

    def __init__(
        self,
        system_name: str,
        failure_threshold: int = 5,
        recovery_timeout: float = 30.0,
    ):
        self.system_name = system_name
        self.failure_threshold = failure_threshold
        self.recovery_timeout = recovery_timeout
        self.failure_count = 0
        self.last_failure_time = 0.0
        self.state = "closed"  # closed | open | half-open

    def allow_request(self) -> bool:
        if self.state == "closed":
            return True
        if self.state == "open":
            if time.monotonic() - self.last_failure_time >= self.recovery_timeout:
                self.state = "half-open"
                logger.info("[%s] Circuit breaker HALF-OPEN", self.system_name)
                return True
            return False
        return True  # half-open allows one probe

    def record_success(self) -> None:
        if self.state == "half-open":
            logger.info("[%s] Circuit breaker CLOSED", self.system_name)
        self.failure_count = 0
        self.state = "closed"

    def record_failure(self) -> None:
        self.failure_count += 1
        self.last_failure_time = time.monotonic()
        if self.failure_count >= self.failure_threshold:
            self.state = "open"
            logger.warning(
                "[%s] Circuit breaker OPEN after %d failures",
                self.system_name,
                self.failure_count,
            )


class AntiCorruptionLayerGateway:
    """
    Central gateway for all legacy system interactions.

    Responsibilities:
    - Route to correct adapter based on LegacySystem enum
    - Enforce circuit breaker per backend
    - Check Redis cache before hitting legacy
    - Transform response through Canonical Data Model
    - Emit CloudWatch metrics for every call
    """

    def __init__(self, redis_client: aioredis.Redis, adapters: dict):
        self._redis = redis_client
        self._adapters = adapters  # LegacySystem -> adapter instance
        self._breakers: dict[LegacySystem, CircuitBreaker] = {
            system: CircuitBreaker(system.value)
            for system in LegacySystem
        }
        # TTL per system (seconds) — ERP data is volatile, CRM is stable
        self._cache_ttl = {
            LegacySystem.ERP_INVENTORY: 300,     # 5 min for stock levels
            LegacySystem.CRM_CUSTOMER: 3600,     # 1 hour for profiles
            LegacySystem.MAINFRAME_ORDER: 600,   # 10 min for order status
        }

    def _cache_key(self, request: ACLRequest) -> str:
        """Deterministic cache key from request parameters."""
        import hashlib, json
        payload = json.dumps(
            {"sys": request.system.value, "op": request.operation,
             "params": request.parameters},
            sort_keys=True,
        )
        return f"acl:{hashlib.sha256(payload.encode()).hexdigest()[:16]}"

    async def execute(self, request: ACLRequest) -> ACLResponse:
        """
        Execute a legacy system request through the ACL pipeline:
        1. Check cache
        2. Check circuit breaker
        3. Call adapter
        4. Transform via CDM
        5. Cache result
        """
        start = time.monotonic()

        # --- Step 1: Cache lookup ---
        cache_key = self._cache_key(request)
        cached = await self._redis.get(cache_key)
        if cached:
            import json
            data = json.loads(cached)
            latency = (time.monotonic() - start) * 1000
            logger.info(
                "[%s] Cache HIT for %s (%.1fms)",
                request.system.value, request.operation, latency,
            )
            return ACLResponse(
                success=True, data=data, source_system=request.system,
                latency_ms=latency, from_cache=True,
            )

        # --- Step 2: Circuit breaker check ---
        breaker = self._breakers[request.system]
        if not breaker.allow_request():
            latency = (time.monotonic() - start) * 1000
            logger.warning(
                "[%s] Circuit breaker OPEN — returning fallback",
                request.system.value,
            )
            return ACLResponse(
                success=False, data={"error": "circuit_breaker_open"},
                source_system=request.system, latency_ms=latency,
                warnings=["Legacy system temporarily unavailable"],
            )

        # --- Step 3: Call adapter ---
        adapter = self._adapters[request.system]
        try:
            raw_response = await asyncio.wait_for(
                adapter.call(request.operation, request.parameters),
                timeout=request.timeout_seconds,
            )
            breaker.record_success()
        except asyncio.TimeoutError:
            breaker.record_failure()
            latency = (time.monotonic() - start) * 1000
            return ACLResponse(
                success=False,
                data={"error": "timeout", "timeout_s": request.timeout_seconds},
                source_system=request.system, latency_ms=latency,
                warnings=[f"Legacy call timed out after {request.timeout_seconds}s"],
            )
        except Exception as exc:
            breaker.record_failure()
            latency = (time.monotonic() - start) * 1000
            logger.exception("[%s] Adapter error", request.system.value)
            return ACLResponse(
                success=False, data={"error": str(exc)},
                source_system=request.system, latency_ms=latency,
            )

        # --- Step 4: Transform via CDM ---
        transformed = adapter.to_canonical(raw_response)

        # --- Step 5: Cache result ---
        import json
        ttl = self._cache_ttl.get(request.system, 300)
        await self._redis.setex(cache_key, ttl, json.dumps(transformed))

        latency = (time.monotonic() - start) * 1000
        logger.info(
            "[%s] %s completed in %.1fms",
            request.system.value, request.operation, latency,
        )
        return ACLResponse(
            success=True, data=transformed, source_system=request.system,
            latency_ms=latency,
        )

2. Adapter Pattern — ERP, CRM, and Mainframe

Each legacy system gets a dedicated adapter that encapsulates all communication details — protocol, encoding, authentication, error mapping — behind a uniform interface.

Adapter Comparison Table

Aspect ERP Inventory Adapter CRM Customer Adapter Mainframe Order Adapter
Protocol SOAP 1.1 over HTTPS REST/JSON over HTTPS IBM MQ Series
Encoding Shift-JIS UTF-8 EBCDIC (COBOL copybook)
Auth WS-Security token OAuth2 client credentials Mutual TLS + API key
Rate limit 50 req/s 200 req/s 20 msg/s per queue
Latency (p99) 800ms 200ms 1200ms
Payload format XML with namespaces JSON Fixed-width 80-byte records
Error model SOAP Fault element HTTP status + error JSON MQ completion code
Retry strategy Exponential backoff, max 3 Retry on 429/503, max 5 Re-enqueue with delay

Production Python — CRM Customer Adapter

"""
enterprise_connectivity/adapters/crm_customer_adapter.py

Adapter for the CRM system. Wraps OAuth2-authenticated REST API behind
the standard adapter interface. Handles field mapping, PII masking,
and JP-specific customer data normalization.

MangaAssist production component.
"""

import asyncio
import logging
import time
from dataclasses import dataclass
from typing import Any, Optional

import aiohttp

logger = logging.getLogger(__name__)


@dataclass(frozen=True)
class CustomerProfile:
    """Clean domain object — no CRM-specific field names."""
    customer_id: str
    display_name: str
    email_masked: str          # PII masked at boundary
    membership_tier: str       # "free" | "premium" | "vip"
    preferred_genres: list[str]
    purchase_count_90d: int
    lifetime_value_jpy: int
    locale: str                # "ja-JP", "en-US"
    created_at: str            # ISO 8601


# CRM field -> domain field mapping
CRM_FIELD_MAP = {
    "CustomerID": "customer_id",
    "FullName": "display_name",
    "EmailAddress": "email_masked",
    "MembershipLevel": "membership_tier",
    "FavoriteCategories": "preferred_genres",
    "OrderCount90Days": "purchase_count_90d",
    "TotalSpend": "lifetime_value_jpy",
    "Language": "locale",
    "RegistrationDate": "created_at",
}

# CRM membership codes -> domain tier names
MEMBERSHIP_MAP = {
    "STD": "free",
    "PRM": "premium",
    "VIP": "vip",
    "TRIAL": "free",
}


def _mask_email(email: str) -> str:
    """Mask email for PII safety: john.doe@example.com -> j***e@example.com"""
    if "@" not in email:
        return "***@***.***"
    local, domain = email.rsplit("@", 1)
    if len(local) <= 2:
        masked_local = "*" * len(local)
    else:
        masked_local = local[0] + "***" + local[-1]
    return f"{masked_local}@{domain}"


class CRMCustomerAdapter:
    """
    Adapter for CRM REST API.

    The CRM exposes endpoints:
      GET /api/v2/customers/{id}
      GET /api/v2/customers/{id}/preferences
      GET /api/v2/customers/search?email=...

    Authentication: OAuth2 client_credentials flow.
    Rate limit: 200 req/s with 429 back-off.
    """

    def __init__(
        self,
        base_url: str,
        client_id: str,
        client_secret: str,
        token_url: str,
    ):
        self._base_url = base_url.rstrip("/")
        self._client_id = client_id
        self._client_secret = client_secret
        self._token_url = token_url
        self._access_token: Optional[str] = None
        self._token_expiry: float = 0.0
        self._session: Optional[aiohttp.ClientSession] = None

    async def _ensure_session(self) -> aiohttp.ClientSession:
        if self._session is None or self._session.closed:
            self._session = aiohttp.ClientSession(
                timeout=aiohttp.ClientTimeout(total=10),
            )
        return self._session

    async def _get_token(self) -> str:
        """Obtain or refresh OAuth2 access token."""
        if self._access_token and time.time() < self._token_expiry - 60:
            return self._access_token

        session = await self._ensure_session()
        async with session.post(
            self._token_url,
            data={
                "grant_type": "client_credentials",
                "client_id": self._client_id,
                "client_secret": self._client_secret,
                "scope": "customers.read",
            },
        ) as resp:
            resp.raise_for_status()
            body = await resp.json()
            self._access_token = body["access_token"]
            self._token_expiry = time.time() + body.get("expires_in", 3600)
            return self._access_token

    async def call(self, operation: str, params: dict) -> dict:
        """
        Standard adapter interface.
        Operations: get_customer, search_customer, get_preferences
        """
        token = await self._get_token()
        session = await self._ensure_session()
        headers = {"Authorization": f"Bearer {token}", "Accept": "application/json"}

        if operation == "get_customer":
            url = f"{self._base_url}/api/v2/customers/{params['customer_id']}"
            async with session.get(url, headers=headers) as resp:
                if resp.status == 429:
                    retry_after = int(resp.headers.get("Retry-After", "2"))
                    await asyncio.sleep(retry_after)
                    return await self.call(operation, params)  # one retry
                resp.raise_for_status()
                return await resp.json()

        elif operation == "get_preferences":
            cid = params["customer_id"]
            url = f"{self._base_url}/api/v2/customers/{cid}/preferences"
            async with session.get(url, headers=headers) as resp:
                resp.raise_for_status()
                return await resp.json()

        raise ValueError(f"Unknown CRM operation: {operation}")

    def to_canonical(self, raw: dict) -> dict:
        """Transform CRM response into canonical domain model."""
        result = {}
        for crm_field, domain_field in CRM_FIELD_MAP.items():
            value = raw.get(crm_field)
            if value is None:
                continue
            # Apply transforms per field
            if domain_field == "email_masked":
                value = _mask_email(value)
            elif domain_field == "membership_tier":
                value = MEMBERSHIP_MAP.get(value, "free")
            elif domain_field == "preferred_genres":
                if isinstance(value, str):
                    value = [g.strip() for g in value.split(",")]
            result[domain_field] = value
        return result

3. SOAP-to-REST Translation

Many enterprise systems still expose SOAP/XML APIs. The translation layer converts modern REST calls into SOAP envelopes and vice versa, making legacy services accessible to the FM pipeline without forcing upstream changes.

Translation Flow

flowchart LR
    subgraph REST["REST Interface"]
        REQ["GET /inventory?isbn=978-4088..."]
        RES["JSON Response<br/>{quantity: 42, status: in_stock}"]
    end

    subgraph Translator["SOAP-to-REST Translator"]
        Parse["Parse REST params"]
        Build["Build SOAP Envelope<br/>+ Shift-JIS encode"]
        Send["POST to WSDL endpoint"]
        Unwrap["Unwrap SOAP response"]
        Convert["XML → JSON<br/>Shift-JIS → UTF-8"]
    end

    subgraph SOAP["Legacy SOAP Service"]
        Endpoint["SOAP 1.1 Endpoint<br/>/ws/InventoryService"]
    end

    REQ --> Parse --> Build --> Send --> Endpoint
    Endpoint --> Unwrap --> Convert --> RES

WSDL-to-Endpoint Mapping Table

REST Endpoint HTTP Method SOAP Operation SOAP Action
/inventory/{isbn} GET GetStockLevel urn:InventoryService#GetStockLevel
/inventory/batch POST GetBatchStockLevels urn:InventoryService#GetBatchStockLevels
/inventory/{isbn}/reserve POST ReserveStock urn:InventoryService#ReserveStock
/pricing/{isbn} GET GetCurrentPrice urn:PricingService#GetCurrentPrice
/pricing/bulk POST GetBulkPricing urn:PricingService#GetBulkPricing

Production Python — SOAP-to-REST Proxy

"""
enterprise_connectivity/translators/soap_rest_proxy.py

Runtime proxy that exposes legacy SOAP services as REST endpoints.
Handles envelope construction, namespace resolution, encoding,
and response flattening.

MangaAssist production component — deployed as a FastAPI sidecar
on ECS Fargate alongside the orchestrator.
"""

import logging
import re
from dataclasses import dataclass
from typing import Any, Optional
from xml.etree import ElementTree as ET

import aiohttp

logger = logging.getLogger(__name__)

# Namespace map for the legacy ERP WSDL
NS = {
    "soapenv": "http://schemas.xmlsoap.org/soap/envelope/",
    "inv": "http://legacy-erp.manga-store.co.jp/inventory/v1",
    "prc": "http://legacy-erp.manga-store.co.jp/pricing/v1",
}


@dataclass
class SOAPOperation:
    """Mapping between a REST endpoint and a SOAP operation."""
    rest_path: str
    rest_method: str
    soap_action: str
    soap_operation: str
    namespace_prefix: str
    request_builder: str     # method name for building SOAP body
    response_parser: str     # method name for parsing SOAP response


# Operation registry
OPERATIONS = {
    "get_stock": SOAPOperation(
        rest_path="/inventory/{isbn}",
        rest_method="GET",
        soap_action="urn:InventoryService#GetStockLevel",
        soap_operation="GetStockLevel",
        namespace_prefix="inv",
        request_builder="_build_stock_request",
        response_parser="_parse_stock_response",
    ),
    "get_batch_stock": SOAPOperation(
        rest_path="/inventory/batch",
        rest_method="POST",
        soap_action="urn:InventoryService#GetBatchStockLevels",
        soap_operation="GetBatchStockLevels",
        namespace_prefix="inv",
        request_builder="_build_batch_stock_request",
        response_parser="_parse_batch_stock_response",
    ),
    "get_price": SOAPOperation(
        rest_path="/pricing/{isbn}",
        rest_method="GET",
        soap_action="urn:PricingService#GetCurrentPrice",
        soap_operation="GetCurrentPrice",
        namespace_prefix="prc",
        request_builder="_build_price_request",
        response_parser="_parse_price_response",
    ),
}


class SOAPRestProxy:
    """
    Translates REST requests into SOAP calls and back.

    Flow:
    1. Match incoming REST path/method to a SOAPOperation
    2. Build SOAP envelope with Shift-JIS encoding
    3. POST to legacy endpoint with SOAPAction header
    4. Parse SOAP response XML, extract data
    5. Return flat JSON dict
    """

    def __init__(self, legacy_endpoint: str, auth_token: str):
        self._endpoint = legacy_endpoint
        self._auth_token = auth_token
        self._session: Optional[aiohttp.ClientSession] = None

    async def _ensure_session(self) -> aiohttp.ClientSession:
        if self._session is None or self._session.closed:
            self._session = aiohttp.ClientSession(
                timeout=aiohttp.ClientTimeout(total=8),
            )
        return self._session

    def _wrap_envelope(self, body_xml: str, ns_prefix: str) -> bytes:
        """Wrap a SOAP body in the full envelope with auth header."""
        ns_uri = NS.get(ns_prefix, NS["inv"])
        envelope = f"""<?xml version="1.0" encoding="Shift_JIS"?>
<soapenv:Envelope
    xmlns:soapenv="{NS['soapenv']}"
    xmlns:{ns_prefix}="{ns_uri}">
  <soapenv:Header>
    <{ns_prefix}:AuthToken>{self._auth_token}</{ns_prefix}:AuthToken>
  </soapenv:Header>
  <soapenv:Body>
    {body_xml}
  </soapenv:Body>
</soapenv:Envelope>"""
        return envelope.encode("shift_jis")

    # --- Request builders ---

    def _build_stock_request(self, params: dict) -> str:
        isbn = params["isbn"]
        wh = params.get("warehouse", "TKY-01")
        return (
            f"<inv:GetStockLevel>"
            f"<inv:ISBN>{isbn}</inv:ISBN>"
            f"<inv:WarehouseCode>{wh}</inv:WarehouseCode>"
            f"</inv:GetStockLevel>"
        )

    def _build_batch_stock_request(self, params: dict) -> str:
        items = "".join(
            f"<inv:ISBN>{isbn}</inv:ISBN>" for isbn in params["isbns"]
        )
        return f"<inv:GetBatchStockLevels>{items}</inv:GetBatchStockLevels>"

    def _build_price_request(self, params: dict) -> str:
        isbn = params["isbn"]
        return (
            f"<prc:GetCurrentPrice>"
            f"<prc:ISBN>{isbn}</prc:ISBN>"
            f"</prc:GetCurrentPrice>"
        )

    # --- Response parsers ---

    def _parse_stock_response(self, root: ET.Element) -> dict:
        """Extract stock data from SOAP response body."""
        body = root.find(".//soapenv:Body", NS)
        if body is None:
            return {"error": "no_body"}

        result = body.find(".//inv:StockResult", NS)
        if result is None:
            return {"error": "no_stock_result"}

        return {
            "isbn": self._text(result, "inv:ISBN"),
            "title": self._text(result, "inv:Title"),
            "quantity_available": int(self._text(result, "inv:Quantity") or "0"),
            "warehouse": self._text(result, "inv:WarehouseCode"),
            "status": self._text(result, "inv:StockStatus"),
            "price_jpy": int(self._text(result, "inv:PriceJPY") or "0"),
            "last_restock": self._text(result, "inv:LastRestockDate"),
        }

    def _parse_batch_stock_response(self, root: ET.Element) -> dict:
        body = root.find(".//soapenv:Body", NS)
        items = body.findall(".//inv:StockResult", NS) if body else []
        return {
            "items": [
                {
                    "isbn": self._text(item, "inv:ISBN"),
                    "quantity": int(self._text(item, "inv:Quantity") or "0"),
                    "status": self._text(item, "inv:StockStatus"),
                }
                for item in items
            ]
        }

    def _parse_price_response(self, root: ET.Element) -> dict:
        body = root.find(".//soapenv:Body", NS)
        result = body.find(".//prc:PriceResult", NS) if body else None
        if result is None:
            return {"error": "no_price_result"}
        return {
            "isbn": self._text(result, "prc:ISBN"),
            "price_jpy": int(self._text(result, "prc:Amount") or "0"),
            "currency": "JPY",
            "discount_pct": float(self._text(result, "prc:DiscountPercent") or "0"),
        }

    def _text(self, element: ET.Element, tag: str) -> Optional[str]:
        child = element.find(tag, NS)
        return child.text if child is not None else None

    # --- Main execution ---

    async def execute(self, operation_key: str, params: dict) -> dict:
        """Execute a REST-to-SOAP translated call."""
        op = OPERATIONS.get(operation_key)
        if not op:
            raise ValueError(f"Unknown operation: {operation_key}")

        # Build SOAP body
        builder = getattr(self, op.request_builder)
        body_xml = builder(params)

        # Wrap in envelope and encode
        envelope_bytes = self._wrap_envelope(body_xml, op.namespace_prefix)

        # Send to legacy SOAP endpoint
        session = await self._ensure_session()
        headers = {
            "Content-Type": "text/xml; charset=Shift_JIS",
            "SOAPAction": op.soap_action,
        }

        async with session.post(
            self._endpoint, data=envelope_bytes, headers=headers,
        ) as resp:
            raw_bytes = await resp.read()

            # Decode Shift-JIS response to UTF-8 string
            try:
                xml_str = raw_bytes.decode("shift_jis")
            except UnicodeDecodeError:
                xml_str = raw_bytes.decode("utf-8", errors="replace")

            if resp.status != 200:
                logger.error(
                    "SOAP fault: status=%d body=%s",
                    resp.status, xml_str[:500],
                )
                return {"error": "soap_fault", "status": resp.status}

            # Parse XML and extract data
            root = ET.fromstring(xml_str)
            parser = getattr(self, op.response_parser)
            return parser(root)

4. Message Format Translation — Canonical Data Model

The Canonical Data Model (CDM) provides a single, clean representation of domain objects that all systems converge to. Legacy systems translate into CDM at the adapter boundary; FM prompts consume CDM exclusively.

CDM Design Principles

flowchart TB
    subgraph Sources["Legacy Sources (Different Formats)"]
        ERP["ERP<br/>XML + Shift-JIS<br/>JP field names"]
        CRM["CRM<br/>JSON + UTF-8<br/>English field names"]
        MF["Mainframe<br/>Fixed-width + EBCDIC<br/>COBOL copybook"]
    end

    subgraph CDM["Canonical Data Model"]
        Norm["Normalized Domain Objects<br/>- UTF-8 strings<br/>- ISO 8601 dates<br/>- Snake_case fields<br/>- Standard enums"]
    end

    subgraph Consumers["Consumers"]
        Prompt["Bedrock Prompt Builder"]
        Search["OpenSearch Indexer"]
        Cache["Redis Cache"]
        API["WebSocket Response"]
    end

    ERP -->|"Adapter A"| CDM
    CRM -->|"Adapter B"| CDM
    MF -->|"Adapter C"| CDM
    CDM --> Prompt
    CDM --> Search
    CDM --> Cache
    CDM --> API

Encoding Pipeline — Shift-JIS to UTF-8

Step Input Output Notes
1. Detect encoding Raw bytes Charset name Check HTTP header, XML declaration, BOM
2. Decode Raw bytes Python str (UTF-16 internal) Handle shift_jis, euc-jp, iso-2022-jp
3. Normalize Unicode str NFC-normalized str Collapse half-width katakana to full-width
4. Validate str Validated str Reject control chars, surrogate pairs
5. Encode output str UTF-8 bytes All downstream systems expect UTF-8

Production Python — Format Translation Engine

"""
enterprise_connectivity/translators/format_translator.py

Canonical Data Model transformation engine. Handles character encoding,
date/currency normalization, and field mapping for all legacy systems.

MangaAssist production component.
"""

import logging
import re
import unicodedata
from dataclasses import dataclass
from datetime import datetime, timezone
from typing import Any, Optional

logger = logging.getLogger(__name__)


# -----------------------------------------------------------------------
# Encoding utilities
# -----------------------------------------------------------------------

def normalize_encoding(raw: bytes, declared_charset: str = "shift_jis") -> str:
    """
    Decode legacy bytes to Python str with fallback chain.
    Japanese legacy systems may use Shift-JIS, EUC-JP, or ISO-2022-JP.
    """
    codecs_to_try = [declared_charset, "shift_jis", "euc-jp", "utf-8"]
    for codec in codecs_to_try:
        try:
            decoded = raw.decode(codec)
            # NFC normalize: merge half-width kana -> full-width
            normalized = unicodedata.normalize("NFC", decoded)
            return normalized
        except (UnicodeDecodeError, LookupError):
            continue
    # Last resort: decode as UTF-8 with replacement chars
    logger.warning("All codec attempts failed; using UTF-8 with replacements")
    return raw.decode("utf-8", errors="replace")


def sanitize_string(text: str) -> str:
    """Remove control characters except newline/tab."""
    return re.sub(r"[\x00-\x08\x0b\x0c\x0e-\x1f\x7f]", "", text)


# -----------------------------------------------------------------------
# Date normalization
# -----------------------------------------------------------------------

# Japanese era mappings (Reiwa, Heisei, Showa)
JP_ERA_MAP = {
    "令和": 2018,   # Reiwa: year 1 = 2019, so base = 2018
    "平成": 1988,   # Heisei: year 1 = 1989
    "昭和": 1925,   # Showa: year 1 = 1926
}

JP_ERA_PATTERN = re.compile(
    r"(令和|平成|昭和)(\d{1,2})年(\d{1,2})月(\d{1,2})日"
)


def normalize_date(raw_date: str) -> Optional[str]:
    """
    Convert various date formats to ISO 8601.
    Handles: JP era dates, YYYY/MM/DD, DD-MM-YYYY, epoch seconds.
    """
    if not raw_date or raw_date.strip() == "":
        return None

    raw_date = raw_date.strip()

    # JP era date: 令和6年3月15日 -> 2024-03-15
    match = JP_ERA_PATTERN.match(raw_date)
    if match:
        era, year, month, day = match.groups()
        western_year = JP_ERA_MAP[era] + int(year)
        return f"{western_year:04d}-{int(month):02d}-{int(day):02d}"

    # Slash format: 2024/03/15
    if re.match(r"\d{4}/\d{1,2}/\d{1,2}", raw_date):
        parts = raw_date.split("/")
        return f"{int(parts[0]):04d}-{int(parts[1]):02d}-{int(parts[2]):02d}"

    # Already ISO 8601
    if re.match(r"\d{4}-\d{2}-\d{2}", raw_date):
        return raw_date[:10]

    # Epoch seconds
    if raw_date.isdigit() and len(raw_date) >= 10:
        dt = datetime.fromtimestamp(int(raw_date), tz=timezone.utc)
        return dt.strftime("%Y-%m-%d")

    logger.warning("Unrecognized date format: %s", raw_date)
    return raw_date  # pass through


# -----------------------------------------------------------------------
# Currency normalization
# -----------------------------------------------------------------------

def normalize_price_jpy(raw_price: Any) -> Optional[int]:
    """
    Normalize Japanese Yen values.
    Handles: "¥1,234", "1234円", 1234, "1234.00"
    JPY is integer-only (no fractional yen).
    """
    if raw_price is None:
        return None

    if isinstance(raw_price, (int, float)):
        return int(raw_price)

    text = str(raw_price).strip()
    # Remove currency symbols and delimiters
    cleaned = re.sub(r"[¥円,\s]", "", text)
    # Remove trailing decimal (JPY has no fractions)
    cleaned = re.sub(r"\.\d+$", "", cleaned)

    try:
        return int(cleaned)
    except ValueError:
        logger.warning("Cannot parse price: %s", raw_price)
        return None


# -----------------------------------------------------------------------
# Canonical Data Model transformer
# -----------------------------------------------------------------------

@dataclass
class FieldMapping:
    """Describes how to map a source field to a canonical field."""
    source_field: str
    target_field: str
    transform: str = "passthrough"  # passthrough|date|price|encoding|enum


class CanonicalTransformer:
    """
    Transforms raw legacy responses into Canonical Data Model objects.
    Each legacy system has a registered set of field mappings.
    """

    def __init__(self):
        self._mappings: dict[str, list[FieldMapping]] = {}
        self._enum_maps: dict[str, dict] = {}

    def register_mapping(
        self, system_name: str, mappings: list[FieldMapping],
    ) -> None:
        self._mappings[system_name] = mappings

    def register_enum(self, enum_name: str, value_map: dict) -> None:
        self._enum_maps[enum_name] = value_map

    def transform(self, system_name: str, raw: dict) -> dict:
        """Apply registered mappings to produce a canonical dict."""
        mappings = self._mappings.get(system_name, [])
        result = {}

        for m in mappings:
            value = raw.get(m.source_field)
            if value is None:
                continue

            if m.transform == "date":
                value = normalize_date(str(value))
            elif m.transform == "price":
                value = normalize_price_jpy(value)
            elif m.transform == "encoding":
                if isinstance(value, bytes):
                    value = normalize_encoding(value)
                else:
                    value = sanitize_string(str(value))
            elif m.transform == "enum":
                enum_map = self._enum_maps.get(m.target_field, {})
                value = enum_map.get(value, value)

            result[m.target_field] = value

        return result


# -----------------------------------------------------------------------
# Pre-configured transformer for MangaAssist
# -----------------------------------------------------------------------

def build_manga_assist_transformer() -> CanonicalTransformer:
    """Factory: returns a transformer pre-loaded with MangaAssist mappings."""
    t = CanonicalTransformer()

    # ERP inventory mappings
    t.register_mapping("erp_inventory", [
        FieldMapping("ISBN", "isbn", "passthrough"),
        FieldMapping("Title", "title", "encoding"),
        FieldMapping("Quantity", "quantity_available", "passthrough"),
        FieldMapping("WarehouseCode", "warehouse", "passthrough"),
        FieldMapping("StockStatus", "stock_status", "enum"),
        FieldMapping("LastRestockDate", "last_restock_date", "date"),
        FieldMapping("PriceJPY", "price_jpy", "price"),
    ])

    # Stock status enum
    t.register_enum("stock_status", {
        "在庫あり": "in_stock",
        "在庫少": "low_stock",
        "在庫切れ": "out_of_stock",
        "予約受付中": "preorder",
    })

    # CRM customer mappings
    t.register_mapping("crm_customer", [
        FieldMapping("CustomerID", "customer_id", "passthrough"),
        FieldMapping("FullName", "display_name", "encoding"),
        FieldMapping("MembershipLevel", "membership_tier", "enum"),
        FieldMapping("FavoriteCategories", "preferred_genres", "passthrough"),
        FieldMapping("RegistrationDate", "created_at", "date"),
    ])

    t.register_enum("membership_tier", {
        "STD": "free", "PRM": "premium", "VIP": "vip", "TRIAL": "free",
    })

    return t

5. Message Queue Integration — Backpressure and Deduplication

When legacy systems cannot absorb the chatbot's request volume (1M messages/day = ~11.6 req/s average, 50+ req/s peak), a request queue provides backpressure, deduplication, and priority routing.

Queue Architecture

flowchart LR
    subgraph Producers["ECS Fargate Tasks"]
        T1["Task 1"]
        T2["Task 2"]
        T3["Task N"]
    end

    subgraph Queue["SQS FIFO Queue"]
        PRI["Priority Lanes<br/>CRITICAL | HIGH | NORMAL | LOW"]
        DEDUP["Dedup Window<br/>5-min content hash"]
        DLQ["Dead Letter Queue<br/>after 3 failures"]
    end

    subgraph Consumer["Queue Consumer Lambda"]
        Batch["Batch up to 10 msgs"]
        Route["Route to adapter"]
        Respond["Write result to DynamoDB"]
    end

    subgraph Legacy["Legacy ERP<br/>50 req/s max"]
        ERP["SOAP Endpoint"]
    end

    T1 --> PRI
    T2 --> PRI
    T3 --> PRI
    PRI --> DEDUP --> Consumer
    DEDUP --> DLQ
    Batch --> Route --> ERP
    Route --> Respond

Production Python — Queue-Based Legacy Gateway

"""
enterprise_connectivity/queue/legacy_queue_gateway.py

SQS-based request queue that provides backpressure, deduplication,
and priority-based routing for legacy system calls.

MangaAssist production component.
"""

import hashlib
import json
import logging
import time
from typing import Optional

import boto3

logger = logging.getLogger(__name__)

sqs = boto3.client("sqs")
dynamodb = boto3.resource("dynamodb")


class LegacyQueueGateway:
    """
    Manages asynchronous legacy system requests via SQS FIFO.

    Deduplication: content-based hash within a 5-minute window.
    Priority: uses MessageGroupId to create separate processing lanes.
    Backpressure: ApproximateNumberOfMessages metric triggers scaling.
    """

    def __init__(
        self,
        queue_url: str,
        dlq_url: str,
        result_table_name: str,
    ):
        self._queue_url = queue_url
        self._dlq_url = dlq_url
        self._result_table = dynamodb.Table(result_table_name)

    def _dedup_id(self, operation: str, params: dict) -> str:
        """Content-based deduplication ID (SHA-256 of operation + params)."""
        payload = json.dumps(
            {"op": operation, "params": params}, sort_keys=True,
        )
        return hashlib.sha256(payload.encode()).hexdigest()[:128]

    def enqueue_request(
        self,
        operation: str,
        params: dict,
        priority: str = "NORMAL",
        correlation_id: str = "",
    ) -> dict:
        """
        Enqueue a legacy system request.
        Returns the SQS MessageId for tracking.
        """
        message_body = json.dumps({
            "operation": operation,
            "params": params,
            "priority": priority,
            "correlation_id": correlation_id,
            "enqueued_at": time.time(),
        })

        response = sqs.send_message(
            QueueUrl=self._queue_url,
            MessageBody=message_body,
            MessageGroupId=f"legacy-{priority}",
            MessageDeduplicationId=self._dedup_id(operation, params),
            MessageAttributes={
                "Priority": {
                    "DataType": "String",
                    "StringValue": priority,
                },
                "Operation": {
                    "DataType": "String",
                    "StringValue": operation,
                },
            },
        )

        logger.info(
            "Enqueued %s (priority=%s, dedup=%s) -> MessageId=%s",
            operation, priority,
            self._dedup_id(operation, params)[:8],
            response["MessageId"],
        )
        return {"message_id": response["MessageId"]}

    def poll_result(self, correlation_id: str) -> Optional[dict]:
        """
        Poll DynamoDB for the result of an async legacy call.
        The consumer Lambda writes results here after processing.
        """
        try:
            response = self._result_table.get_item(
                Key={"correlation_id": correlation_id},
            )
            return response.get("Item")
        except Exception:
            return None

    def get_queue_depth(self) -> dict:
        """Return queue metrics for backpressure decisions."""
        attrs = sqs.get_queue_attributes(
            QueueUrl=self._queue_url,
            AttributeNames=[
                "ApproximateNumberOfMessages",
                "ApproximateNumberOfMessagesNotVisible",
                "ApproximateNumberOfMessagesDelayed",
            ],
        )["Attributes"]

        return {
            "pending": int(attrs.get("ApproximateNumberOfMessages", 0)),
            "in_flight": int(attrs.get("ApproximateNumberOfMessagesNotVisible", 0)),
            "delayed": int(attrs.get("ApproximateNumberOfMessagesDelayed", 0)),
        }

Integration Pattern Comparison

Pattern Best For Latency Coupling Complexity MangaAssist Use
Direct Adapter Low-latency sync calls Low (< 1s) Medium Low Inventory check during chat
ACL + CDM Multiple legacy backends Medium (1-2s) Low Medium All legacy interactions
SOAP-to-REST Proxy Teams that cannot modify legacy API Medium Low Medium ERP inventory + pricing
Message Queue High-volume async processing High (seconds) Very Low Medium Batch catalog updates
CDC + Streaming Near-real-time data sync Low (sub-second lag) Very Low High DMS -> DynamoDB pipeline
Batch ETL Full reconciliation Very High (hours) None Low Nightly catalog refresh

Key Takeaways

# Takeaway MangaAssist Application
1 Anti-corruption layers prevent legacy models from contaminating FM prompts ACL transforms SOAP/Shift-JIS ERP responses into clean UTF-8 domain objects before Bedrock sees them
2 Adapter pattern normalizes heterogeneous backends behind a uniform interface ERP (SOAP), CRM (REST), Mainframe (MQ) all expose identical call() + to_canonical() methods
3 SOAP-to-REST proxies modernize legacy without modifying the legacy system Sidecar proxy on ECS converts REST inventory requests to SOAP envelopes and back
4 Canonical Data Model gives FM prompts a single, consistent vocabulary All systems output isbn, title, quantity_available regardless of source format
5 Character encoding normalization is essential for JP content Shift-JIS and EUC-JP decoded at adapter boundary; everything downstream is NFC-normalized UTF-8
6 Circuit breakers protect chatbot latency when legacy degrades 5 failures -> 30s open -> half-open probe -> auto-recovery, with Redis cache fallback
7 SQS FIFO queues add backpressure against rate-limited legacy systems 50 req/s ERP limit enforced via message groups + content deduplication
8 PII masking at the adapter boundary prevents sensitive data from reaching the FM Email addresses masked before entering Redis cache or Bedrock prompt context

Exam-Relevant Connections

  • API Gateway VTL transforms can perform lightweight XML-to-JSON conversion without Lambda, suitable for simple field remapping.
  • AWS AppSync pipeline resolvers chain multiple data sources (DynamoDB, OpenSearch, legacy adapter) in a single GraphQL query.
  • Step Functions orchestrate multi-step legacy integrations with built-in error handling, retries, and timeout management.
  • EventBridge Pipes connect SQS queues to processing targets with built-in filtering and enrichment, reducing Lambda glue code.
  • AWS DMS is the managed CDC solution for replicating legacy Oracle/MySQL data to DynamoDB, avoiding custom polling adapters.