Legacy System Integration Patterns for FM-Powered Applications
MangaAssist context: JP Manga store chatbot on AWS — Bedrock Claude 3 (Sonnet at $3/$15 per 1M tokens input/output, Haiku at $0.25/$1.25), OpenSearch Serverless (vector store), DynamoDB (sessions/products), ECS Fargate (orchestrator), API Gateway WebSocket, ElastiCache Redis. Target: useful answer in under 3 seconds, 1M messages/day scale.
Skill Mapping
| Certification | Task | Skill | This File |
|---|---|---|---|
| AWS AI Practitioner (AIP-C01) | Task 2.3 — Describe methods to integrate FM into applications | Skill 2.3.1 — Identify enterprise connectivity solutions | Legacy ERP/CRM/mainframe integration, anti-corruption layers, adapter patterns, SOAP-to-REST conversion, message format translation |
Skill Scope
Skill 2.3.1 requires understanding how Foundation Models integrate with existing enterprise systems. This file focuses on the integration layer between modern GenAI services and legacy backends — ERP inventory systems, CRM platforms, mainframe order processors — that still power the business. The key challenge: legacy systems use SOAP/XML, fixed-width formats, Shift-JIS encoding, and throttled APIs, while FM pipelines expect JSON, UTF-8, and sub-second responses.
Mind Map
mindmap
root((Legacy System Integration))
Anti-Corruption Layer
Schema Translation
Legacy XML to Domain Objects
Shift-JIS to UTF-8
Fixed-width to JSON
Error Normalization
SOAP faults to HTTP status codes
Mainframe return codes to exceptions
Timeout standardization
Contract Versioning
Interface stability guarantees
Backward-compatible adapters
Deprecation schedules
Adapter Patterns
ERP Inventory Adapter
SOAP envelope builder
Batch query coalescing
Rate-limited request queue
CRM Customer Adapter
REST wrapper over ODBC
Field mapping engine
PII masking at boundary
Mainframe Order Adapter
MQ Series bridge
COBOL copybook parser
Transaction coordinator
SOAP-to-REST Translation
WSDL Analysis
Operation to endpoint mapping
Complex type flattening
Namespace resolution
Runtime Proxy
Envelope construction
Header injection
Response unwrapping
Schema Validation
Request contract enforcement
Response integrity checks
Version negotiation
Message Format Translation
Canonical Data Model
Domain-driven field names
Standard types and units
Extensible schema design
Encoding Pipeline
Character set detection
Lossy conversion handling
BOM management
Date-Currency Harmonization
JP era to ISO 8601
JPY formatting rules
Timezone normalization
Integration Middleware
Request Queue
Priority lanes
Deduplication window
Backpressure signaling
Response Cache
TTL per data type
Cache invalidation events
Stale-while-revalidate
Health Monitoring
Heartbeat probes
Latency percentile tracking
Degradation alerting
Architecture — Legacy Integration Layer
flowchart TB
subgraph Client["Client Layer"]
Browser["Browser / Mobile"]
end
subgraph Gateway["API Layer"]
APIGW["API Gateway WebSocket"]
Fargate["ECS Fargate Orchestrator"]
end
subgraph ACL["Anti-Corruption Layer"]
Router["Integration Router"]
ERPAdapter["ERP Adapter<br/>SOAP/XML, Shift-JIS<br/>50 req/s limit"]
CRMAdapter["CRM Adapter<br/>REST/JSON<br/>OAuth2"]
MFAdapter["Mainframe Adapter<br/>MQ Series<br/>COBOL copybook"]
CDM["Canonical Data Model<br/>Transform Engine"]
end
subgraph Legacy["Legacy Systems"]
ERP["Legacy ERP<br/>Inventory + Pricing<br/>Oracle DB"]
CRM["CRM System<br/>Customer Profiles<br/>SQL Server"]
MF["Mainframe<br/>Order Processing<br/>VSAM/DB2"]
end
subgraph Cache["Caching Layer"]
Redis["ElastiCache Redis<br/>Response Cache"]
DDB["DynamoDB<br/>Session + Products"]
end
Browser --> APIGW --> Fargate
Fargate --> Router
Router --> ERPAdapter --> ERP
Router --> CRMAdapter --> CRM
Router --> MFAdapter --> MF
ERPAdapter --> CDM
CRMAdapter --> CDM
MFAdapter --> CDM
CDM --> Fargate
Fargate <--> Redis
Fargate <--> DDB
1. Anti-Corruption Layer (ACL) — Deep Dive
The ACL is a Domain-Driven Design pattern that prevents legacy system models, naming conventions, and data formats from corrupting the clean domain model used by the FM pipeline. Without an ACL, legacy coupling leaks into prompts, embeddings, and downstream services.
Why ACL Matters for GenAI
| Without ACL | With ACL |
|---|---|
Prompt contains raw SOAP field names like <inv:ZaikoSuuryo> |
Prompt receives quantity_available: 42 |
| Shift-JIS mojibake in Claude responses | All strings UTF-8-clean before Bedrock |
Legacy error codes ERR-0x4F2A shown to users |
Friendly domain errors: InventoryUnavailableError |
| Model version change breaks legacy parsers | Versioned interface contracts isolate changes |
| Legacy downtime cascades to chatbot 500s | Circuit breaker returns cached fallback |
Production Python — Anti-Corruption Layer Gateway
"""
enterprise_connectivity/acl/acl_gateway.py
Anti-Corruption Layer gateway that routes requests to the appropriate
legacy adapter, applies the Canonical Data Model transform, and
enforces circuit breaker / caching policies.
MangaAssist production component.
"""
import asyncio
import logging
import time
from dataclasses import dataclass, field
from enum import Enum
from typing import Any, Optional
import aiohttp
import redis.asyncio as aioredis
logger = logging.getLogger(__name__)
class LegacySystem(Enum):
"""Enumeration of backend legacy systems."""
ERP_INVENTORY = "erp_inventory"
CRM_CUSTOMER = "crm_customer"
MAINFRAME_ORDER = "mainframe_order"
class RequestPriority(Enum):
"""Priority lanes for the request queue."""
CRITICAL = 1 # Payment, order confirmation
HIGH = 2 # Inventory checks for active chat
NORMAL = 3 # Recommendations, browsing
LOW = 4 # Analytics, batch enrichment
@dataclass
class ACLRequest:
"""Normalized request entering the ACL."""
system: LegacySystem
operation: str
parameters: dict
priority: RequestPriority = RequestPriority.NORMAL
correlation_id: str = ""
timeout_seconds: float = 5.0
@dataclass
class ACLResponse:
"""Normalized response exiting the ACL — always clean domain objects."""
success: bool
data: dict
source_system: LegacySystem
latency_ms: float
from_cache: bool = False
warnings: list = field(default_factory=list)
class CircuitBreaker:
"""Per-system circuit breaker with half-open probe support."""
def __init__(
self,
system_name: str,
failure_threshold: int = 5,
recovery_timeout: float = 30.0,
):
self.system_name = system_name
self.failure_threshold = failure_threshold
self.recovery_timeout = recovery_timeout
self.failure_count = 0
self.last_failure_time = 0.0
self.state = "closed" # closed | open | half-open
def allow_request(self) -> bool:
if self.state == "closed":
return True
if self.state == "open":
if time.monotonic() - self.last_failure_time >= self.recovery_timeout:
self.state = "half-open"
logger.info("[%s] Circuit breaker HALF-OPEN", self.system_name)
return True
return False
return True # half-open allows one probe
def record_success(self) -> None:
if self.state == "half-open":
logger.info("[%s] Circuit breaker CLOSED", self.system_name)
self.failure_count = 0
self.state = "closed"
def record_failure(self) -> None:
self.failure_count += 1
self.last_failure_time = time.monotonic()
if self.failure_count >= self.failure_threshold:
self.state = "open"
logger.warning(
"[%s] Circuit breaker OPEN after %d failures",
self.system_name,
self.failure_count,
)
class AntiCorruptionLayerGateway:
"""
Central gateway for all legacy system interactions.
Responsibilities:
- Route to correct adapter based on LegacySystem enum
- Enforce circuit breaker per backend
- Check Redis cache before hitting legacy
- Transform response through Canonical Data Model
- Emit CloudWatch metrics for every call
"""
def __init__(self, redis_client: aioredis.Redis, adapters: dict):
self._redis = redis_client
self._adapters = adapters # LegacySystem -> adapter instance
self._breakers: dict[LegacySystem, CircuitBreaker] = {
system: CircuitBreaker(system.value)
for system in LegacySystem
}
# TTL per system (seconds) — ERP data is volatile, CRM is stable
self._cache_ttl = {
LegacySystem.ERP_INVENTORY: 300, # 5 min for stock levels
LegacySystem.CRM_CUSTOMER: 3600, # 1 hour for profiles
LegacySystem.MAINFRAME_ORDER: 600, # 10 min for order status
}
def _cache_key(self, request: ACLRequest) -> str:
"""Deterministic cache key from request parameters."""
import hashlib, json
payload = json.dumps(
{"sys": request.system.value, "op": request.operation,
"params": request.parameters},
sort_keys=True,
)
return f"acl:{hashlib.sha256(payload.encode()).hexdigest()[:16]}"
async def execute(self, request: ACLRequest) -> ACLResponse:
"""
Execute a legacy system request through the ACL pipeline:
1. Check cache
2. Check circuit breaker
3. Call adapter
4. Transform via CDM
5. Cache result
"""
start = time.monotonic()
# --- Step 1: Cache lookup ---
cache_key = self._cache_key(request)
cached = await self._redis.get(cache_key)
if cached:
import json
data = json.loads(cached)
latency = (time.monotonic() - start) * 1000
logger.info(
"[%s] Cache HIT for %s (%.1fms)",
request.system.value, request.operation, latency,
)
return ACLResponse(
success=True, data=data, source_system=request.system,
latency_ms=latency, from_cache=True,
)
# --- Step 2: Circuit breaker check ---
breaker = self._breakers[request.system]
if not breaker.allow_request():
latency = (time.monotonic() - start) * 1000
logger.warning(
"[%s] Circuit breaker OPEN — returning fallback",
request.system.value,
)
return ACLResponse(
success=False, data={"error": "circuit_breaker_open"},
source_system=request.system, latency_ms=latency,
warnings=["Legacy system temporarily unavailable"],
)
# --- Step 3: Call adapter ---
adapter = self._adapters[request.system]
try:
raw_response = await asyncio.wait_for(
adapter.call(request.operation, request.parameters),
timeout=request.timeout_seconds,
)
breaker.record_success()
except asyncio.TimeoutError:
breaker.record_failure()
latency = (time.monotonic() - start) * 1000
return ACLResponse(
success=False,
data={"error": "timeout", "timeout_s": request.timeout_seconds},
source_system=request.system, latency_ms=latency,
warnings=[f"Legacy call timed out after {request.timeout_seconds}s"],
)
except Exception as exc:
breaker.record_failure()
latency = (time.monotonic() - start) * 1000
logger.exception("[%s] Adapter error", request.system.value)
return ACLResponse(
success=False, data={"error": str(exc)},
source_system=request.system, latency_ms=latency,
)
# --- Step 4: Transform via CDM ---
transformed = adapter.to_canonical(raw_response)
# --- Step 5: Cache result ---
import json
ttl = self._cache_ttl.get(request.system, 300)
await self._redis.setex(cache_key, ttl, json.dumps(transformed))
latency = (time.monotonic() - start) * 1000
logger.info(
"[%s] %s completed in %.1fms",
request.system.value, request.operation, latency,
)
return ACLResponse(
success=True, data=transformed, source_system=request.system,
latency_ms=latency,
)
2. Adapter Pattern — ERP, CRM, and Mainframe
Each legacy system gets a dedicated adapter that encapsulates all communication details — protocol, encoding, authentication, error mapping — behind a uniform interface.
Adapter Comparison Table
| Aspect | ERP Inventory Adapter | CRM Customer Adapter | Mainframe Order Adapter |
|---|---|---|---|
| Protocol | SOAP 1.1 over HTTPS | REST/JSON over HTTPS | IBM MQ Series |
| Encoding | Shift-JIS | UTF-8 | EBCDIC (COBOL copybook) |
| Auth | WS-Security token | OAuth2 client credentials | Mutual TLS + API key |
| Rate limit | 50 req/s | 200 req/s | 20 msg/s per queue |
| Latency (p99) | 800ms | 200ms | 1200ms |
| Payload format | XML with namespaces | JSON | Fixed-width 80-byte records |
| Error model | SOAP Fault element | HTTP status + error JSON | MQ completion code |
| Retry strategy | Exponential backoff, max 3 | Retry on 429/503, max 5 | Re-enqueue with delay |
Production Python — CRM Customer Adapter
"""
enterprise_connectivity/adapters/crm_customer_adapter.py
Adapter for the CRM system. Wraps OAuth2-authenticated REST API behind
the standard adapter interface. Handles field mapping, PII masking,
and JP-specific customer data normalization.
MangaAssist production component.
"""
import asyncio
import logging
import time
from dataclasses import dataclass
from typing import Any, Optional
import aiohttp
logger = logging.getLogger(__name__)
@dataclass(frozen=True)
class CustomerProfile:
"""Clean domain object — no CRM-specific field names."""
customer_id: str
display_name: str
email_masked: str # PII masked at boundary
membership_tier: str # "free" | "premium" | "vip"
preferred_genres: list[str]
purchase_count_90d: int
lifetime_value_jpy: int
locale: str # "ja-JP", "en-US"
created_at: str # ISO 8601
# CRM field -> domain field mapping
CRM_FIELD_MAP = {
"CustomerID": "customer_id",
"FullName": "display_name",
"EmailAddress": "email_masked",
"MembershipLevel": "membership_tier",
"FavoriteCategories": "preferred_genres",
"OrderCount90Days": "purchase_count_90d",
"TotalSpend": "lifetime_value_jpy",
"Language": "locale",
"RegistrationDate": "created_at",
}
# CRM membership codes -> domain tier names
MEMBERSHIP_MAP = {
"STD": "free",
"PRM": "premium",
"VIP": "vip",
"TRIAL": "free",
}
def _mask_email(email: str) -> str:
"""Mask email for PII safety: john.doe@example.com -> j***e@example.com"""
if "@" not in email:
return "***@***.***"
local, domain = email.rsplit("@", 1)
if len(local) <= 2:
masked_local = "*" * len(local)
else:
masked_local = local[0] + "***" + local[-1]
return f"{masked_local}@{domain}"
class CRMCustomerAdapter:
"""
Adapter for CRM REST API.
The CRM exposes endpoints:
GET /api/v2/customers/{id}
GET /api/v2/customers/{id}/preferences
GET /api/v2/customers/search?email=...
Authentication: OAuth2 client_credentials flow.
Rate limit: 200 req/s with 429 back-off.
"""
def __init__(
self,
base_url: str,
client_id: str,
client_secret: str,
token_url: str,
):
self._base_url = base_url.rstrip("/")
self._client_id = client_id
self._client_secret = client_secret
self._token_url = token_url
self._access_token: Optional[str] = None
self._token_expiry: float = 0.0
self._session: Optional[aiohttp.ClientSession] = None
async def _ensure_session(self) -> aiohttp.ClientSession:
if self._session is None or self._session.closed:
self._session = aiohttp.ClientSession(
timeout=aiohttp.ClientTimeout(total=10),
)
return self._session
async def _get_token(self) -> str:
"""Obtain or refresh OAuth2 access token."""
if self._access_token and time.time() < self._token_expiry - 60:
return self._access_token
session = await self._ensure_session()
async with session.post(
self._token_url,
data={
"grant_type": "client_credentials",
"client_id": self._client_id,
"client_secret": self._client_secret,
"scope": "customers.read",
},
) as resp:
resp.raise_for_status()
body = await resp.json()
self._access_token = body["access_token"]
self._token_expiry = time.time() + body.get("expires_in", 3600)
return self._access_token
async def call(self, operation: str, params: dict) -> dict:
"""
Standard adapter interface.
Operations: get_customer, search_customer, get_preferences
"""
token = await self._get_token()
session = await self._ensure_session()
headers = {"Authorization": f"Bearer {token}", "Accept": "application/json"}
if operation == "get_customer":
url = f"{self._base_url}/api/v2/customers/{params['customer_id']}"
async with session.get(url, headers=headers) as resp:
if resp.status == 429:
retry_after = int(resp.headers.get("Retry-After", "2"))
await asyncio.sleep(retry_after)
return await self.call(operation, params) # one retry
resp.raise_for_status()
return await resp.json()
elif operation == "get_preferences":
cid = params["customer_id"]
url = f"{self._base_url}/api/v2/customers/{cid}/preferences"
async with session.get(url, headers=headers) as resp:
resp.raise_for_status()
return await resp.json()
raise ValueError(f"Unknown CRM operation: {operation}")
def to_canonical(self, raw: dict) -> dict:
"""Transform CRM response into canonical domain model."""
result = {}
for crm_field, domain_field in CRM_FIELD_MAP.items():
value = raw.get(crm_field)
if value is None:
continue
# Apply transforms per field
if domain_field == "email_masked":
value = _mask_email(value)
elif domain_field == "membership_tier":
value = MEMBERSHIP_MAP.get(value, "free")
elif domain_field == "preferred_genres":
if isinstance(value, str):
value = [g.strip() for g in value.split(",")]
result[domain_field] = value
return result
3. SOAP-to-REST Translation
Many enterprise systems still expose SOAP/XML APIs. The translation layer converts modern REST calls into SOAP envelopes and vice versa, making legacy services accessible to the FM pipeline without forcing upstream changes.
Translation Flow
flowchart LR
subgraph REST["REST Interface"]
REQ["GET /inventory?isbn=978-4088..."]
RES["JSON Response<br/>{quantity: 42, status: in_stock}"]
end
subgraph Translator["SOAP-to-REST Translator"]
Parse["Parse REST params"]
Build["Build SOAP Envelope<br/>+ Shift-JIS encode"]
Send["POST to WSDL endpoint"]
Unwrap["Unwrap SOAP response"]
Convert["XML → JSON<br/>Shift-JIS → UTF-8"]
end
subgraph SOAP["Legacy SOAP Service"]
Endpoint["SOAP 1.1 Endpoint<br/>/ws/InventoryService"]
end
REQ --> Parse --> Build --> Send --> Endpoint
Endpoint --> Unwrap --> Convert --> RES
WSDL-to-Endpoint Mapping Table
| REST Endpoint | HTTP Method | SOAP Operation | SOAP Action |
|---|---|---|---|
/inventory/{isbn} |
GET | GetStockLevel |
urn:InventoryService#GetStockLevel |
/inventory/batch |
POST | GetBatchStockLevels |
urn:InventoryService#GetBatchStockLevels |
/inventory/{isbn}/reserve |
POST | ReserveStock |
urn:InventoryService#ReserveStock |
/pricing/{isbn} |
GET | GetCurrentPrice |
urn:PricingService#GetCurrentPrice |
/pricing/bulk |
POST | GetBulkPricing |
urn:PricingService#GetBulkPricing |
Production Python — SOAP-to-REST Proxy
"""
enterprise_connectivity/translators/soap_rest_proxy.py
Runtime proxy that exposes legacy SOAP services as REST endpoints.
Handles envelope construction, namespace resolution, encoding,
and response flattening.
MangaAssist production component — deployed as a FastAPI sidecar
on ECS Fargate alongside the orchestrator.
"""
import logging
import re
from dataclasses import dataclass
from typing import Any, Optional
from xml.etree import ElementTree as ET
import aiohttp
logger = logging.getLogger(__name__)
# Namespace map for the legacy ERP WSDL
NS = {
"soapenv": "http://schemas.xmlsoap.org/soap/envelope/",
"inv": "http://legacy-erp.manga-store.co.jp/inventory/v1",
"prc": "http://legacy-erp.manga-store.co.jp/pricing/v1",
}
@dataclass
class SOAPOperation:
"""Mapping between a REST endpoint and a SOAP operation."""
rest_path: str
rest_method: str
soap_action: str
soap_operation: str
namespace_prefix: str
request_builder: str # method name for building SOAP body
response_parser: str # method name for parsing SOAP response
# Operation registry
OPERATIONS = {
"get_stock": SOAPOperation(
rest_path="/inventory/{isbn}",
rest_method="GET",
soap_action="urn:InventoryService#GetStockLevel",
soap_operation="GetStockLevel",
namespace_prefix="inv",
request_builder="_build_stock_request",
response_parser="_parse_stock_response",
),
"get_batch_stock": SOAPOperation(
rest_path="/inventory/batch",
rest_method="POST",
soap_action="urn:InventoryService#GetBatchStockLevels",
soap_operation="GetBatchStockLevels",
namespace_prefix="inv",
request_builder="_build_batch_stock_request",
response_parser="_parse_batch_stock_response",
),
"get_price": SOAPOperation(
rest_path="/pricing/{isbn}",
rest_method="GET",
soap_action="urn:PricingService#GetCurrentPrice",
soap_operation="GetCurrentPrice",
namespace_prefix="prc",
request_builder="_build_price_request",
response_parser="_parse_price_response",
),
}
class SOAPRestProxy:
"""
Translates REST requests into SOAP calls and back.
Flow:
1. Match incoming REST path/method to a SOAPOperation
2. Build SOAP envelope with Shift-JIS encoding
3. POST to legacy endpoint with SOAPAction header
4. Parse SOAP response XML, extract data
5. Return flat JSON dict
"""
def __init__(self, legacy_endpoint: str, auth_token: str):
self._endpoint = legacy_endpoint
self._auth_token = auth_token
self._session: Optional[aiohttp.ClientSession] = None
async def _ensure_session(self) -> aiohttp.ClientSession:
if self._session is None or self._session.closed:
self._session = aiohttp.ClientSession(
timeout=aiohttp.ClientTimeout(total=8),
)
return self._session
def _wrap_envelope(self, body_xml: str, ns_prefix: str) -> bytes:
"""Wrap a SOAP body in the full envelope with auth header."""
ns_uri = NS.get(ns_prefix, NS["inv"])
envelope = f"""<?xml version="1.0" encoding="Shift_JIS"?>
<soapenv:Envelope
xmlns:soapenv="{NS['soapenv']}"
xmlns:{ns_prefix}="{ns_uri}">
<soapenv:Header>
<{ns_prefix}:AuthToken>{self._auth_token}</{ns_prefix}:AuthToken>
</soapenv:Header>
<soapenv:Body>
{body_xml}
</soapenv:Body>
</soapenv:Envelope>"""
return envelope.encode("shift_jis")
# --- Request builders ---
def _build_stock_request(self, params: dict) -> str:
isbn = params["isbn"]
wh = params.get("warehouse", "TKY-01")
return (
f"<inv:GetStockLevel>"
f"<inv:ISBN>{isbn}</inv:ISBN>"
f"<inv:WarehouseCode>{wh}</inv:WarehouseCode>"
f"</inv:GetStockLevel>"
)
def _build_batch_stock_request(self, params: dict) -> str:
items = "".join(
f"<inv:ISBN>{isbn}</inv:ISBN>" for isbn in params["isbns"]
)
return f"<inv:GetBatchStockLevels>{items}</inv:GetBatchStockLevels>"
def _build_price_request(self, params: dict) -> str:
isbn = params["isbn"]
return (
f"<prc:GetCurrentPrice>"
f"<prc:ISBN>{isbn}</prc:ISBN>"
f"</prc:GetCurrentPrice>"
)
# --- Response parsers ---
def _parse_stock_response(self, root: ET.Element) -> dict:
"""Extract stock data from SOAP response body."""
body = root.find(".//soapenv:Body", NS)
if body is None:
return {"error": "no_body"}
result = body.find(".//inv:StockResult", NS)
if result is None:
return {"error": "no_stock_result"}
return {
"isbn": self._text(result, "inv:ISBN"),
"title": self._text(result, "inv:Title"),
"quantity_available": int(self._text(result, "inv:Quantity") or "0"),
"warehouse": self._text(result, "inv:WarehouseCode"),
"status": self._text(result, "inv:StockStatus"),
"price_jpy": int(self._text(result, "inv:PriceJPY") or "0"),
"last_restock": self._text(result, "inv:LastRestockDate"),
}
def _parse_batch_stock_response(self, root: ET.Element) -> dict:
body = root.find(".//soapenv:Body", NS)
items = body.findall(".//inv:StockResult", NS) if body else []
return {
"items": [
{
"isbn": self._text(item, "inv:ISBN"),
"quantity": int(self._text(item, "inv:Quantity") or "0"),
"status": self._text(item, "inv:StockStatus"),
}
for item in items
]
}
def _parse_price_response(self, root: ET.Element) -> dict:
body = root.find(".//soapenv:Body", NS)
result = body.find(".//prc:PriceResult", NS) if body else None
if result is None:
return {"error": "no_price_result"}
return {
"isbn": self._text(result, "prc:ISBN"),
"price_jpy": int(self._text(result, "prc:Amount") or "0"),
"currency": "JPY",
"discount_pct": float(self._text(result, "prc:DiscountPercent") or "0"),
}
def _text(self, element: ET.Element, tag: str) -> Optional[str]:
child = element.find(tag, NS)
return child.text if child is not None else None
# --- Main execution ---
async def execute(self, operation_key: str, params: dict) -> dict:
"""Execute a REST-to-SOAP translated call."""
op = OPERATIONS.get(operation_key)
if not op:
raise ValueError(f"Unknown operation: {operation_key}")
# Build SOAP body
builder = getattr(self, op.request_builder)
body_xml = builder(params)
# Wrap in envelope and encode
envelope_bytes = self._wrap_envelope(body_xml, op.namespace_prefix)
# Send to legacy SOAP endpoint
session = await self._ensure_session()
headers = {
"Content-Type": "text/xml; charset=Shift_JIS",
"SOAPAction": op.soap_action,
}
async with session.post(
self._endpoint, data=envelope_bytes, headers=headers,
) as resp:
raw_bytes = await resp.read()
# Decode Shift-JIS response to UTF-8 string
try:
xml_str = raw_bytes.decode("shift_jis")
except UnicodeDecodeError:
xml_str = raw_bytes.decode("utf-8", errors="replace")
if resp.status != 200:
logger.error(
"SOAP fault: status=%d body=%s",
resp.status, xml_str[:500],
)
return {"error": "soap_fault", "status": resp.status}
# Parse XML and extract data
root = ET.fromstring(xml_str)
parser = getattr(self, op.response_parser)
return parser(root)
4. Message Format Translation — Canonical Data Model
The Canonical Data Model (CDM) provides a single, clean representation of domain objects that all systems converge to. Legacy systems translate into CDM at the adapter boundary; FM prompts consume CDM exclusively.
CDM Design Principles
flowchart TB
subgraph Sources["Legacy Sources (Different Formats)"]
ERP["ERP<br/>XML + Shift-JIS<br/>JP field names"]
CRM["CRM<br/>JSON + UTF-8<br/>English field names"]
MF["Mainframe<br/>Fixed-width + EBCDIC<br/>COBOL copybook"]
end
subgraph CDM["Canonical Data Model"]
Norm["Normalized Domain Objects<br/>- UTF-8 strings<br/>- ISO 8601 dates<br/>- Snake_case fields<br/>- Standard enums"]
end
subgraph Consumers["Consumers"]
Prompt["Bedrock Prompt Builder"]
Search["OpenSearch Indexer"]
Cache["Redis Cache"]
API["WebSocket Response"]
end
ERP -->|"Adapter A"| CDM
CRM -->|"Adapter B"| CDM
MF -->|"Adapter C"| CDM
CDM --> Prompt
CDM --> Search
CDM --> Cache
CDM --> API
Encoding Pipeline — Shift-JIS to UTF-8
| Step | Input | Output | Notes |
|---|---|---|---|
| 1. Detect encoding | Raw bytes | Charset name | Check HTTP header, XML declaration, BOM |
| 2. Decode | Raw bytes | Python str (UTF-16 internal) | Handle shift_jis, euc-jp, iso-2022-jp |
| 3. Normalize Unicode | str | NFC-normalized str | Collapse half-width katakana to full-width |
| 4. Validate | str | Validated str | Reject control chars, surrogate pairs |
| 5. Encode output | str | UTF-8 bytes | All downstream systems expect UTF-8 |
Production Python — Format Translation Engine
"""
enterprise_connectivity/translators/format_translator.py
Canonical Data Model transformation engine. Handles character encoding,
date/currency normalization, and field mapping for all legacy systems.
MangaAssist production component.
"""
import logging
import re
import unicodedata
from dataclasses import dataclass
from datetime import datetime, timezone
from typing import Any, Optional
logger = logging.getLogger(__name__)
# -----------------------------------------------------------------------
# Encoding utilities
# -----------------------------------------------------------------------
def normalize_encoding(raw: bytes, declared_charset: str = "shift_jis") -> str:
"""
Decode legacy bytes to Python str with fallback chain.
Japanese legacy systems may use Shift-JIS, EUC-JP, or ISO-2022-JP.
"""
codecs_to_try = [declared_charset, "shift_jis", "euc-jp", "utf-8"]
for codec in codecs_to_try:
try:
decoded = raw.decode(codec)
# NFC normalize: merge half-width kana -> full-width
normalized = unicodedata.normalize("NFC", decoded)
return normalized
except (UnicodeDecodeError, LookupError):
continue
# Last resort: decode as UTF-8 with replacement chars
logger.warning("All codec attempts failed; using UTF-8 with replacements")
return raw.decode("utf-8", errors="replace")
def sanitize_string(text: str) -> str:
"""Remove control characters except newline/tab."""
return re.sub(r"[\x00-\x08\x0b\x0c\x0e-\x1f\x7f]", "", text)
# -----------------------------------------------------------------------
# Date normalization
# -----------------------------------------------------------------------
# Japanese era mappings (Reiwa, Heisei, Showa)
JP_ERA_MAP = {
"令和": 2018, # Reiwa: year 1 = 2019, so base = 2018
"平成": 1988, # Heisei: year 1 = 1989
"昭和": 1925, # Showa: year 1 = 1926
}
JP_ERA_PATTERN = re.compile(
r"(令和|平成|昭和)(\d{1,2})年(\d{1,2})月(\d{1,2})日"
)
def normalize_date(raw_date: str) -> Optional[str]:
"""
Convert various date formats to ISO 8601.
Handles: JP era dates, YYYY/MM/DD, DD-MM-YYYY, epoch seconds.
"""
if not raw_date or raw_date.strip() == "":
return None
raw_date = raw_date.strip()
# JP era date: 令和6年3月15日 -> 2024-03-15
match = JP_ERA_PATTERN.match(raw_date)
if match:
era, year, month, day = match.groups()
western_year = JP_ERA_MAP[era] + int(year)
return f"{western_year:04d}-{int(month):02d}-{int(day):02d}"
# Slash format: 2024/03/15
if re.match(r"\d{4}/\d{1,2}/\d{1,2}", raw_date):
parts = raw_date.split("/")
return f"{int(parts[0]):04d}-{int(parts[1]):02d}-{int(parts[2]):02d}"
# Already ISO 8601
if re.match(r"\d{4}-\d{2}-\d{2}", raw_date):
return raw_date[:10]
# Epoch seconds
if raw_date.isdigit() and len(raw_date) >= 10:
dt = datetime.fromtimestamp(int(raw_date), tz=timezone.utc)
return dt.strftime("%Y-%m-%d")
logger.warning("Unrecognized date format: %s", raw_date)
return raw_date # pass through
# -----------------------------------------------------------------------
# Currency normalization
# -----------------------------------------------------------------------
def normalize_price_jpy(raw_price: Any) -> Optional[int]:
"""
Normalize Japanese Yen values.
Handles: "¥1,234", "1234円", 1234, "1234.00"
JPY is integer-only (no fractional yen).
"""
if raw_price is None:
return None
if isinstance(raw_price, (int, float)):
return int(raw_price)
text = str(raw_price).strip()
# Remove currency symbols and delimiters
cleaned = re.sub(r"[¥円,\s]", "", text)
# Remove trailing decimal (JPY has no fractions)
cleaned = re.sub(r"\.\d+$", "", cleaned)
try:
return int(cleaned)
except ValueError:
logger.warning("Cannot parse price: %s", raw_price)
return None
# -----------------------------------------------------------------------
# Canonical Data Model transformer
# -----------------------------------------------------------------------
@dataclass
class FieldMapping:
"""Describes how to map a source field to a canonical field."""
source_field: str
target_field: str
transform: str = "passthrough" # passthrough|date|price|encoding|enum
class CanonicalTransformer:
"""
Transforms raw legacy responses into Canonical Data Model objects.
Each legacy system has a registered set of field mappings.
"""
def __init__(self):
self._mappings: dict[str, list[FieldMapping]] = {}
self._enum_maps: dict[str, dict] = {}
def register_mapping(
self, system_name: str, mappings: list[FieldMapping],
) -> None:
self._mappings[system_name] = mappings
def register_enum(self, enum_name: str, value_map: dict) -> None:
self._enum_maps[enum_name] = value_map
def transform(self, system_name: str, raw: dict) -> dict:
"""Apply registered mappings to produce a canonical dict."""
mappings = self._mappings.get(system_name, [])
result = {}
for m in mappings:
value = raw.get(m.source_field)
if value is None:
continue
if m.transform == "date":
value = normalize_date(str(value))
elif m.transform == "price":
value = normalize_price_jpy(value)
elif m.transform == "encoding":
if isinstance(value, bytes):
value = normalize_encoding(value)
else:
value = sanitize_string(str(value))
elif m.transform == "enum":
enum_map = self._enum_maps.get(m.target_field, {})
value = enum_map.get(value, value)
result[m.target_field] = value
return result
# -----------------------------------------------------------------------
# Pre-configured transformer for MangaAssist
# -----------------------------------------------------------------------
def build_manga_assist_transformer() -> CanonicalTransformer:
"""Factory: returns a transformer pre-loaded with MangaAssist mappings."""
t = CanonicalTransformer()
# ERP inventory mappings
t.register_mapping("erp_inventory", [
FieldMapping("ISBN", "isbn", "passthrough"),
FieldMapping("Title", "title", "encoding"),
FieldMapping("Quantity", "quantity_available", "passthrough"),
FieldMapping("WarehouseCode", "warehouse", "passthrough"),
FieldMapping("StockStatus", "stock_status", "enum"),
FieldMapping("LastRestockDate", "last_restock_date", "date"),
FieldMapping("PriceJPY", "price_jpy", "price"),
])
# Stock status enum
t.register_enum("stock_status", {
"在庫あり": "in_stock",
"在庫少": "low_stock",
"在庫切れ": "out_of_stock",
"予約受付中": "preorder",
})
# CRM customer mappings
t.register_mapping("crm_customer", [
FieldMapping("CustomerID", "customer_id", "passthrough"),
FieldMapping("FullName", "display_name", "encoding"),
FieldMapping("MembershipLevel", "membership_tier", "enum"),
FieldMapping("FavoriteCategories", "preferred_genres", "passthrough"),
FieldMapping("RegistrationDate", "created_at", "date"),
])
t.register_enum("membership_tier", {
"STD": "free", "PRM": "premium", "VIP": "vip", "TRIAL": "free",
})
return t
5. Message Queue Integration — Backpressure and Deduplication
When legacy systems cannot absorb the chatbot's request volume (1M messages/day = ~11.6 req/s average, 50+ req/s peak), a request queue provides backpressure, deduplication, and priority routing.
Queue Architecture
flowchart LR
subgraph Producers["ECS Fargate Tasks"]
T1["Task 1"]
T2["Task 2"]
T3["Task N"]
end
subgraph Queue["SQS FIFO Queue"]
PRI["Priority Lanes<br/>CRITICAL | HIGH | NORMAL | LOW"]
DEDUP["Dedup Window<br/>5-min content hash"]
DLQ["Dead Letter Queue<br/>after 3 failures"]
end
subgraph Consumer["Queue Consumer Lambda"]
Batch["Batch up to 10 msgs"]
Route["Route to adapter"]
Respond["Write result to DynamoDB"]
end
subgraph Legacy["Legacy ERP<br/>50 req/s max"]
ERP["SOAP Endpoint"]
end
T1 --> PRI
T2 --> PRI
T3 --> PRI
PRI --> DEDUP --> Consumer
DEDUP --> DLQ
Batch --> Route --> ERP
Route --> Respond
Production Python — Queue-Based Legacy Gateway
"""
enterprise_connectivity/queue/legacy_queue_gateway.py
SQS-based request queue that provides backpressure, deduplication,
and priority-based routing for legacy system calls.
MangaAssist production component.
"""
import hashlib
import json
import logging
import time
from typing import Optional
import boto3
logger = logging.getLogger(__name__)
sqs = boto3.client("sqs")
dynamodb = boto3.resource("dynamodb")
class LegacyQueueGateway:
"""
Manages asynchronous legacy system requests via SQS FIFO.
Deduplication: content-based hash within a 5-minute window.
Priority: uses MessageGroupId to create separate processing lanes.
Backpressure: ApproximateNumberOfMessages metric triggers scaling.
"""
def __init__(
self,
queue_url: str,
dlq_url: str,
result_table_name: str,
):
self._queue_url = queue_url
self._dlq_url = dlq_url
self._result_table = dynamodb.Table(result_table_name)
def _dedup_id(self, operation: str, params: dict) -> str:
"""Content-based deduplication ID (SHA-256 of operation + params)."""
payload = json.dumps(
{"op": operation, "params": params}, sort_keys=True,
)
return hashlib.sha256(payload.encode()).hexdigest()[:128]
def enqueue_request(
self,
operation: str,
params: dict,
priority: str = "NORMAL",
correlation_id: str = "",
) -> dict:
"""
Enqueue a legacy system request.
Returns the SQS MessageId for tracking.
"""
message_body = json.dumps({
"operation": operation,
"params": params,
"priority": priority,
"correlation_id": correlation_id,
"enqueued_at": time.time(),
})
response = sqs.send_message(
QueueUrl=self._queue_url,
MessageBody=message_body,
MessageGroupId=f"legacy-{priority}",
MessageDeduplicationId=self._dedup_id(operation, params),
MessageAttributes={
"Priority": {
"DataType": "String",
"StringValue": priority,
},
"Operation": {
"DataType": "String",
"StringValue": operation,
},
},
)
logger.info(
"Enqueued %s (priority=%s, dedup=%s) -> MessageId=%s",
operation, priority,
self._dedup_id(operation, params)[:8],
response["MessageId"],
)
return {"message_id": response["MessageId"]}
def poll_result(self, correlation_id: str) -> Optional[dict]:
"""
Poll DynamoDB for the result of an async legacy call.
The consumer Lambda writes results here after processing.
"""
try:
response = self._result_table.get_item(
Key={"correlation_id": correlation_id},
)
return response.get("Item")
except Exception:
return None
def get_queue_depth(self) -> dict:
"""Return queue metrics for backpressure decisions."""
attrs = sqs.get_queue_attributes(
QueueUrl=self._queue_url,
AttributeNames=[
"ApproximateNumberOfMessages",
"ApproximateNumberOfMessagesNotVisible",
"ApproximateNumberOfMessagesDelayed",
],
)["Attributes"]
return {
"pending": int(attrs.get("ApproximateNumberOfMessages", 0)),
"in_flight": int(attrs.get("ApproximateNumberOfMessagesNotVisible", 0)),
"delayed": int(attrs.get("ApproximateNumberOfMessagesDelayed", 0)),
}
Integration Pattern Comparison
| Pattern | Best For | Latency | Coupling | Complexity | MangaAssist Use |
|---|---|---|---|---|---|
| Direct Adapter | Low-latency sync calls | Low (< 1s) | Medium | Low | Inventory check during chat |
| ACL + CDM | Multiple legacy backends | Medium (1-2s) | Low | Medium | All legacy interactions |
| SOAP-to-REST Proxy | Teams that cannot modify legacy API | Medium | Low | Medium | ERP inventory + pricing |
| Message Queue | High-volume async processing | High (seconds) | Very Low | Medium | Batch catalog updates |
| CDC + Streaming | Near-real-time data sync | Low (sub-second lag) | Very Low | High | DMS -> DynamoDB pipeline |
| Batch ETL | Full reconciliation | Very High (hours) | None | Low | Nightly catalog refresh |
Key Takeaways
| # | Takeaway | MangaAssist Application |
|---|---|---|
| 1 | Anti-corruption layers prevent legacy models from contaminating FM prompts | ACL transforms SOAP/Shift-JIS ERP responses into clean UTF-8 domain objects before Bedrock sees them |
| 2 | Adapter pattern normalizes heterogeneous backends behind a uniform interface | ERP (SOAP), CRM (REST), Mainframe (MQ) all expose identical call() + to_canonical() methods |
| 3 | SOAP-to-REST proxies modernize legacy without modifying the legacy system | Sidecar proxy on ECS converts REST inventory requests to SOAP envelopes and back |
| 4 | Canonical Data Model gives FM prompts a single, consistent vocabulary | All systems output isbn, title, quantity_available regardless of source format |
| 5 | Character encoding normalization is essential for JP content | Shift-JIS and EUC-JP decoded at adapter boundary; everything downstream is NFC-normalized UTF-8 |
| 6 | Circuit breakers protect chatbot latency when legacy degrades | 5 failures -> 30s open -> half-open probe -> auto-recovery, with Redis cache fallback |
| 7 | SQS FIFO queues add backpressure against rate-limited legacy systems | 50 req/s ERP limit enforced via message groups + content deduplication |
| 8 | PII masking at the adapter boundary prevents sensitive data from reaching the FM | Email addresses masked before entering Redis cache or Bedrock prompt context |
Exam-Relevant Connections
- API Gateway VTL transforms can perform lightweight XML-to-JSON conversion without Lambda, suitable for simple field remapping.
- AWS AppSync pipeline resolvers chain multiple data sources (DynamoDB, OpenSearch, legacy adapter) in a single GraphQL query.
- Step Functions orchestrate multi-step legacy integrations with built-in error handling, retries, and timeout management.
- EventBridge Pipes connect SQS queues to processing targets with built-in filtering and enrichment, reducing Lambda glue code.
- AWS DMS is the managed CDC solution for replicating legacy Oracle/MySQL data to DynamoDB, avoiding custom polling adapters.