MCP Client Patterns and Consistent Tool Access
MangaAssist context: JP Manga store chatbot on AWS — Bedrock Claude 3 (Sonnet at $3/$15 per 1M tokens input/output, Haiku at $0.25/$1.25), OpenSearch Serverless (vector store), DynamoDB (sessions/products), ECS Fargate (orchestrator), API Gateway WebSocket, ElastiCache Redis. Target: useful answer in under 3 seconds, 1M messages/day scale.
Skill Mapping
| Attribute | Value |
|---|---|
| Certification | AWS Certified AI Practitioner (AIP-C01) |
| Domain | 2 — Implementation and Integration of Foundation Models |
| Task | 2.1 — Select and implement appropriate FM integration approaches |
| Skill | 2.1.7 — Develop model extension frameworks to enhance FM capabilities (Lambda for stateless MCP servers, ECS for complex MCP servers, MCP client libraries for consistent access patterns) |
MCP Client Architecture Mindmap
mindmap
root((MCP Client<br/>Patterns))
MCPClient Core
Connection Lifecycle
Initialize Handshake
Capability Negotiation
Session Management
Graceful Shutdown
Transport Abstraction
Lambda Direct Invoke
HTTP REST Calls
WebSocket Persistent
gRPC Internal
Request Pipeline
Request ID Generation
Envelope Construction
Timeout Management
Response Correlation
Tool Discovery
ToolDiscoveryClient
Server Enumeration
Schema Fetching
Capability Caching
Refresh Scheduling
Registry Integration
DynamoDB Backed
Redis Cached
Version Tracking
Health Filtering
Schema Validation
JSON Schema Checking
Parameter Type Coercion
Required Field Checks
Default Injection
Request Building
MCPRequestBuilder
Method Selection
Parameter Marshaling
Argument Validation
Envelope Sealing
Batch Operations
Parallel Tool Calls
Dependency Resolution
Fan-Out Patterns
Result Aggregation
Response Parsing
MCPResponseParser
Content Extraction
Error Classification
Type Deserialization
Metadata Extraction
Error Handling
Retryable Errors
Fatal Errors
Partial Failures
Timeout Handling
Resilience Patterns
Circuit Breaker
Per-Server State
Failure Thresholds
Half-Open Probing
Recovery Timing
Retry Logic
Exponential Backoff
Jitter Addition
Max Attempt Limits
Idempotency Guards
Fallback Strategies
Cached Results
Degraded Responses
Alternative Tools
Graceful Omission
MCP Client Request Flow in MangaAssist
flowchart TB
subgraph Orchestrator["ECS Fargate Orchestrator"]
BedrockCall[Bedrock Claude 3<br/>Returns tool_use Block]
MCPClient[MCPClient<br/>Unified Interface]
ReqBuilder[MCPRequestBuilder<br/>Envelope Construction]
Discovery[ToolDiscoveryClient<br/>Server Resolution]
RespParser[MCPResponseParser<br/>Content Extraction]
CircuitBkr[Circuit Breaker<br/>Per-Server]
RetryMgr[Retry Manager<br/>Exponential Backoff]
end
subgraph Transport["Transport Layer"]
LambdaInvoke[Lambda Direct Invoke<br/>boto3 lambda.invoke]
HTTPClient[HTTP Client<br/>aiohttp POST]
WSClient[WebSocket Client<br/>aiohttp ws_connect]
end
subgraph MCPServers["MCP Servers"]
LambdaSearch[Lambda: manga_search]
LambdaPrice[Lambda: price_lookup]
ECSOrder[ECS: order_processing]
ECSTranslate[ECS: translation]
end
BedrockCall -->|tool_use: manga_search| MCPClient
MCPClient -->|Resolve server| Discovery
Discovery -->|Server endpoint + transport| MCPClient
MCPClient -->|Check circuit| CircuitBkr
CircuitBkr -->|CLOSED: proceed| ReqBuilder
CircuitBkr -->|OPEN: fail fast| RespParser
ReqBuilder -->|JSON-RPC envelope| RetryMgr
RetryMgr -->|Lambda transport| LambdaInvoke
RetryMgr -->|HTTP transport| HTTPClient
RetryMgr -->|WS transport| WSClient
LambdaInvoke --> LambdaSearch
LambdaInvoke --> LambdaPrice
HTTPClient --> ECSOrder
WSClient --> ECSTranslate
LambdaSearch -->|MCP response| RespParser
LambdaPrice -->|MCP response| RespParser
ECSOrder -->|MCP response| RespParser
ECSTranslate -->|MCP response| RespParser
RespParser -->|Extracted content| BedrockCall
style Orchestrator fill:#e3f2fd,stroke:#1565c0
style Transport fill:#fff3e0,stroke:#e65100
style MCPServers fill:#e8f5e9,stroke:#2e7d32
1. MCPClient — Unified Tool Access Interface
1.1 Design Philosophy
The MCPClient is the single entry point for all MCP tool interactions in MangaAssist. The ECS Fargate orchestrator calls client.call_tool(tool_name, arguments) and the client handles everything: discovering which server hosts the tool, selecting the right transport, building the MCP envelope, executing with retries, parsing the response, and managing circuit breakers.
This abstraction is critical because the orchestrator should never need to know that manga_search runs on Lambda while order_processing runs on ECS. When the team migrates a tool from Lambda to ECS (for example, to add streaming support to search), the orchestrator code does not change.
1.2 MCPClient Implementation
"""
MCPClient: The unified MCP client library for MangaAssist.
Provides consistent tool access regardless of server type or transport.
"""
import json
import time
import asyncio
import logging
import hashlib
from typing import Any, Dict, List, Optional, Tuple
from enum import Enum
from dataclasses import dataclass, field
import aiohttp
import boto3
logger = logging.getLogger("mcp-client")
logger.setLevel(logging.INFO)
class TransportType(Enum):
"""Supported MCP transport types."""
LAMBDA = "lambda"
HTTP = "http"
WEBSOCKET = "websocket"
class ToolCallStatus(Enum):
"""Outcome status for a tool call."""
SUCCESS = "success"
ERROR = "error"
TIMEOUT = "timeout"
CIRCUIT_OPEN = "circuit_open"
TOOL_NOT_FOUND = "tool_not_found"
TRANSPORT_ERROR = "transport_error"
@dataclass
class ToolCallResult:
"""Structured result from an MCP tool call."""
tool_name: str
status: ToolCallStatus
content: List[Dict[str, Any]] = field(default_factory=list)
is_error: bool = False
latency_ms: float = 0.0
server_name: str = ""
attempt_count: int = 1
request_id: str = ""
error_message: Optional[str] = None
metadata: Dict[str, Any] = field(default_factory=dict)
def get_text(self) -> str:
"""Extract the text content from the first content block."""
for block in self.content:
if block.get("type") == "text":
return block.get("text", "")
return ""
def get_parsed_json(self) -> Optional[Dict[str, Any]]:
"""Parse text content as JSON, returning None on failure."""
text = self.get_text()
if text:
try:
return json.loads(text)
except (json.JSONDecodeError, TypeError):
return None
return None
@dataclass
class ServerEndpoint:
"""Configuration for an MCP server endpoint."""
name: str
transport: TransportType
url: Optional[str] = None
lambda_function_name: Optional[str] = None
lambda_qualifier: Optional[str] = None
timeout_seconds: float = 5.0
max_retries: int = 2
retry_base_delay_ms: float = 100.0
retry_max_delay_ms: float = 2000.0
class MCPClient:
"""
Unified MCP client for the MangaAssist orchestrator.
Responsibilities:
1. Server and tool discovery via ToolDiscoveryClient.
2. Request construction via MCPRequestBuilder.
3. Transport-agnostic invocation with retry and circuit breaking.
4. Response parsing via MCPResponseParser.
5. Metrics emission for observability.
Usage:
client = MCPClient(region="ap-northeast-1")
await client.initialize()
result = await client.call_tool("manga_search", {"query": "One Piece"})
print(result.get_text())
await client.close()
"""
def __init__(self, region: str = "ap-northeast-1"):
self.region = region
self._endpoints: Dict[str, ServerEndpoint] = {}
self._tool_to_server: Dict[str, str] = {}
self._tool_definitions: Dict[str, Dict[str, Any]] = {}
self._circuit_breakers: Dict[str, "CircuitBreaker"] = {}
self._request_builder = MCPRequestBuilder()
self._response_parser = MCPResponseParser()
self._lambda_client: Optional[boto3.client] = None
self._http_session: Optional[aiohttp.ClientSession] = None
self._ws_connections: Dict[str, aiohttp.ClientWebSocketResponse] = {}
self._metrics: Dict[str, Any] = {
"total_calls": 0,
"successful_calls": 0,
"failed_calls": 0,
"total_latency_ms": 0.0,
}
self._initialized = False
async def initialize(self) -> None:
"""Initialize transport clients and connections."""
self._lambda_client = boto3.client("lambda", region_name=self.region)
self._http_session = aiohttp.ClientSession(
timeout=aiohttp.ClientTimeout(total=30),
connector=aiohttp.TCPConnector(
limit=100,
ttl_dns_cache=300,
keepalive_timeout=60,
),
)
self._initialized = True
logger.info("MCPClient initialized for region %s", self.region)
async def close(self) -> None:
"""Clean up all connections and sessions."""
for ws in self._ws_connections.values():
if not ws.closed:
await ws.close()
self._ws_connections.clear()
if self._http_session:
await self._http_session.close()
self._initialized = False
logger.info("MCPClient closed")
def register_server(self, endpoint: ServerEndpoint) -> None:
"""Register an MCP server endpoint."""
self._endpoints[endpoint.name] = endpoint
self._circuit_breakers[endpoint.name] = CircuitBreaker(
name=endpoint.name,
failure_threshold=5,
recovery_timeout_seconds=30.0,
)
logger.info(
"Registered server: %s (%s)",
endpoint.name, endpoint.transport.value,
)
def register_tool_mapping(
self, tool_name: str, server_name: str,
definition: Optional[Dict[str, Any]] = None,
) -> None:
"""Map a tool name to its hosting server."""
self._tool_to_server[tool_name] = server_name
if definition:
self._tool_definitions[tool_name] = definition
async def call_tool(
self,
tool_name: str,
arguments: Dict[str, Any],
timeout_override: Optional[float] = None,
) -> ToolCallResult:
"""
Call an MCP tool by name.
This is the primary API. It:
1. Resolves the tool to a server endpoint.
2. Checks the circuit breaker for that server.
3. Builds the MCP JSON-RPC request envelope.
4. Sends via the appropriate transport with retries.
5. Parses the response and returns a structured result.
"""
if not self._initialized:
raise RuntimeError("MCPClient not initialized. Call initialize() first.")
start_time = time.time()
self._metrics["total_calls"] += 1
# Step 1: Resolve server
server_name = self._tool_to_server.get(tool_name)
if not server_name:
return ToolCallResult(
tool_name=tool_name,
status=ToolCallStatus.TOOL_NOT_FOUND,
is_error=True,
error_message=f"No server registered for tool: {tool_name}",
)
endpoint = self._endpoints.get(server_name)
if not endpoint:
return ToolCallResult(
tool_name=tool_name,
status=ToolCallStatus.TOOL_NOT_FOUND,
is_error=True,
error_message=f"Server endpoint not found: {server_name}",
)
# Step 2: Check circuit breaker
cb = self._circuit_breakers.get(server_name)
if cb and not cb.allow_request():
self._metrics["failed_calls"] += 1
return ToolCallResult(
tool_name=tool_name,
status=ToolCallStatus.CIRCUIT_OPEN,
is_error=True,
server_name=server_name,
error_message=(
f"Circuit breaker OPEN for {server_name}. "
f"Failures: {cb.failure_count}, "
f"Recovery in {cb.time_until_recovery_seconds:.0f}s"
),
)
# Step 3: Build request
request_id = self._request_builder.next_request_id()
mcp_request = self._request_builder.build_tool_call(
request_id=request_id,
tool_name=tool_name,
arguments=arguments,
)
# Step 4: Execute with retries
timeout = timeout_override or endpoint.timeout_seconds
last_exception = None
attempt_count = 0
for attempt in range(endpoint.max_retries + 1):
attempt_count = attempt + 1
try:
raw_response = await self._send_request(
endpoint, mcp_request, timeout
)
# Step 5: Parse response
result = self._response_parser.parse(
raw_response, tool_name, request_id
)
elapsed_ms = (time.time() - start_time) * 1000
result.latency_ms = elapsed_ms
result.server_name = server_name
result.attempt_count = attempt_count
result.request_id = request_id
if cb:
if result.is_error:
cb.record_failure()
else:
cb.record_success()
if result.is_error:
self._metrics["failed_calls"] += 1
else:
self._metrics["successful_calls"] += 1
self._metrics["total_latency_ms"] += elapsed_ms
return result
except asyncio.TimeoutError:
last_exception = TimeoutError(
f"Tool {tool_name} timed out after {timeout}s"
)
logger.warning(
"Timeout on attempt %d/%d for %s/%s",
attempt + 1, endpoint.max_retries + 1,
server_name, tool_name,
)
except Exception as exc:
last_exception = exc
logger.warning(
"Error on attempt %d/%d for %s/%s: %s",
attempt + 1, endpoint.max_retries + 1,
server_name, tool_name, exc,
)
if attempt < endpoint.max_retries:
delay = min(
endpoint.retry_base_delay_ms * (2 ** attempt),
endpoint.retry_max_delay_ms,
) / 1000.0
await asyncio.sleep(delay)
# All retries exhausted
if cb:
cb.record_failure()
elapsed_ms = (time.time() - start_time) * 1000
self._metrics["failed_calls"] += 1
return ToolCallResult(
tool_name=tool_name,
status=ToolCallStatus.TIMEOUT if isinstance(last_exception, TimeoutError) else ToolCallStatus.TRANSPORT_ERROR,
is_error=True,
latency_ms=elapsed_ms,
server_name=server_name,
attempt_count=attempt_count,
request_id=request_id,
error_message=str(last_exception),
)
async def call_tools_parallel(
self,
calls: List[Tuple[str, Dict[str, Any]]],
) -> List[ToolCallResult]:
"""
Execute multiple tool calls in parallel.
Returns results in the same order as the input calls.
"""
tasks = [
self.call_tool(tool_name, arguments)
for tool_name, arguments in calls
]
return await asyncio.gather(*tasks)
async def _send_request(
self,
endpoint: ServerEndpoint,
request: Dict[str, Any],
timeout: float,
) -> Dict[str, Any]:
"""Send an MCP request via the appropriate transport."""
if endpoint.transport == TransportType.LAMBDA:
return await self._send_lambda(endpoint, request, timeout)
elif endpoint.transport == TransportType.HTTP:
return await self._send_http(endpoint, request, timeout)
elif endpoint.transport == TransportType.WEBSOCKET:
return await self._send_websocket(endpoint, request, timeout)
else:
raise ValueError(f"Unsupported transport: {endpoint.transport}")
async def _send_lambda(
self, endpoint: ServerEndpoint,
request: Dict, timeout: float,
) -> Dict[str, Any]:
"""Invoke a Lambda MCP server via boto3."""
loop = asyncio.get_event_loop()
invoke_params = {
"FunctionName": endpoint.lambda_function_name,
"InvocationType": "RequestResponse",
"Payload": json.dumps(request),
}
if endpoint.lambda_qualifier:
invoke_params["Qualifier"] = endpoint.lambda_qualifier
response = await loop.run_in_executor(
None,
lambda: self._lambda_client.invoke(**invoke_params),
)
payload = json.loads(response["Payload"].read())
# Handle API Gateway proxy response format
if isinstance(payload, dict) and "body" in payload:
return json.loads(payload["body"])
return payload
async def _send_http(
self, endpoint: ServerEndpoint,
request: Dict, timeout: float,
) -> Dict[str, Any]:
"""Send to an HTTP MCP server via aiohttp."""
url = f"{endpoint.url}/mcp/http"
async with self._http_session.post(
url,
json=request,
timeout=aiohttp.ClientTimeout(total=timeout),
headers={"X-MCP-Client": "mangaassist-orchestrator"},
) as response:
response.raise_for_status()
return await response.json()
async def _send_websocket(
self, endpoint: ServerEndpoint,
request: Dict, timeout: float,
) -> Dict[str, Any]:
"""Send to a WebSocket MCP server."""
ws = self._ws_connections.get(endpoint.name)
if not ws or ws.closed:
ws = await self._http_session.ws_connect(
f"{endpoint.url}/mcp",
heartbeat=30.0,
max_msg_size=10 * 1024 * 1024,
)
self._ws_connections[endpoint.name] = ws
# Initialize handshake
init_req = self._request_builder.build_initialize()
await ws.send_json(init_req)
init_resp = await asyncio.wait_for(ws.receive_json(), timeout=5.0)
logger.info(
"WebSocket session initialized for %s", endpoint.name
)
await ws.send_json(request)
response = await asyncio.wait_for(ws.receive_json(), timeout=timeout)
return response
def get_metrics(self) -> Dict[str, Any]:
"""Return client-level metrics."""
total = self._metrics["total_calls"]
return {
**self._metrics,
"success_rate": (
self._metrics["successful_calls"] / total
if total > 0 else 0.0
),
"avg_latency_ms": (
self._metrics["total_latency_ms"] / total
if total > 0 else 0.0
),
"circuit_breaker_states": {
name: cb.state.value
for name, cb in self._circuit_breakers.items()
},
}
2. ToolDiscoveryClient — Dynamic Tool Resolution
2.1 Discovery Architecture
The ToolDiscoveryClient resolves tool names to server endpoints at runtime. In MangaAssist, this means the orchestrator can discover new tools (e.g., a "manga_preview" tool) without redeployment. Discovery uses a DynamoDB registry with Redis caching for sub-millisecond lookups.
2.2 ToolDiscoveryClient Implementation
"""
ToolDiscoveryClient: Discovers and caches MCP tool definitions
from the central registry and from individual MCP servers.
"""
import json
import time
import asyncio
import logging
import hashlib
from typing import Any, Dict, List, Optional, Tuple
from dataclasses import dataclass, field
import aiohttp
import boto3
import redis
logger = logging.getLogger("mcp-discovery")
@dataclass
class DiscoveredTool:
"""A tool discovered from an MCP server."""
name: str
description: str
input_schema: Dict[str, Any]
server_name: str
server_transport: str
server_url: Optional[str]
lambda_function_name: Optional[str]
version: str
schema_hash: str
discovered_at: float
healthy: bool = True
def to_dict(self) -> Dict[str, Any]:
return {
"name": self.name,
"description": self.description,
"inputSchema": self.input_schema,
"serverName": self.server_name,
"serverTransport": self.server_transport,
"version": self.version,
"schemaHash": self.schema_hash,
}
@dataclass
class DiscoveryConfig:
"""Configuration for the discovery client."""
registry_table: str = "mangaassist-mcp-registry"
redis_host: str = "mangaassist-redis.xxxxx.apne1.cache.amazonaws.com"
redis_port: int = 6379
cache_ttl_seconds: int = 60
refresh_interval_seconds: int = 300
region: str = "ap-northeast-1"
auto_refresh: bool = True
class ToolDiscoveryClient:
"""
Discovers MCP tools from the central registry and individual servers.
Discovery flow:
1. On startup, fetch all tool registrations from DynamoDB.
2. Cache them in Redis with TTL.
3. Periodically refresh (every 5 minutes by default).
4. On cache miss, fetch from DynamoDB and populate cache.
5. Optionally, call tools/list on individual servers for schema
verification.
Thread safety: This client is designed for async usage within
a single event loop (the ECS Fargate orchestrator).
"""
def __init__(self, config: Optional[DiscoveryConfig] = None):
self._config = config or DiscoveryConfig()
self._dynamodb = boto3.resource(
"dynamodb", region_name=self._config.region
)
self._table = self._dynamodb.Table(self._config.registry_table)
self._redis = redis.Redis(
host=self._config.redis_host,
port=self._config.redis_port,
decode_responses=True,
socket_timeout=1,
socket_connect_timeout=1,
)
self._local_cache: Dict[str, DiscoveredTool] = {}
self._last_full_refresh: float = 0
self._refresh_task: Optional[asyncio.Task] = None
self._schema_hashes: Dict[str, str] = {}
async def start(self) -> None:
"""Start the discovery client with initial fetch and background refresh."""
await self.refresh_all()
if self._config.auto_refresh:
self._refresh_task = asyncio.create_task(
self._refresh_loop()
)
logger.info(
"Discovery auto-refresh started (every %ds)",
self._config.refresh_interval_seconds,
)
async def stop(self) -> None:
"""Stop background refresh."""
if self._refresh_task:
self._refresh_task.cancel()
try:
await self._refresh_task
except asyncio.CancelledError:
pass
async def resolve_tool(self, tool_name: str) -> Optional[DiscoveredTool]:
"""
Resolve a tool name to its full discovery record.
Checks local cache -> Redis -> DynamoDB.
"""
# Level 1: Local in-memory cache
cached = self._local_cache.get(tool_name)
if cached and cached.healthy:
return cached
# Level 2: Redis cache
try:
redis_key = f"mcp:tool:{tool_name}"
redis_data = self._redis.get(redis_key)
if redis_data:
data = json.loads(redis_data)
tool = self._dict_to_discovered_tool(data)
if tool.healthy:
self._local_cache[tool_name] = tool
return tool
except redis.RedisError as exc:
logger.warning("Redis lookup failed for %s: %s", tool_name, exc)
# Level 3: DynamoDB
try:
response = self._table.get_item(Key={"tool_name": tool_name})
item = response.get("Item")
if item and item.get("status") == "active" and item.get("healthy", True):
tool = self._dynamo_item_to_discovered_tool(item)
self._local_cache[tool_name] = tool
# Populate Redis cache
try:
self._redis.setex(
f"mcp:tool:{tool_name}",
self._config.cache_ttl_seconds,
json.dumps(self._discovered_tool_to_dict(tool)),
)
except redis.RedisError:
pass
return tool
except Exception as exc:
logger.error("DynamoDB lookup failed for %s: %s", tool_name, exc)
return None
async def list_all_tools(self) -> List[DiscoveredTool]:
"""List all active, healthy tools."""
return [
tool for tool in self._local_cache.values()
if tool.healthy
]
async def refresh_all(self) -> int:
"""
Refresh the full tool registry from DynamoDB.
Returns the number of tools discovered.
"""
logger.info("Refreshing full tool registry from DynamoDB...")
try:
response = self._table.scan(
FilterExpression=boto3.dynamodb.conditions.Attr("status").eq("active"),
)
items = response.get("Items", [])
new_cache: Dict[str, DiscoveredTool] = {}
for item in items:
tool = self._dynamo_item_to_discovered_tool(item)
new_cache[tool.name] = tool
# Check for schema changes
old_hash = self._schema_hashes.get(tool.name)
if old_hash and old_hash != tool.schema_hash:
logger.warning(
"Schema changed for tool %s: %s -> %s",
tool.name, old_hash, tool.schema_hash,
)
self._schema_hashes[tool.name] = tool.schema_hash
self._local_cache = new_cache
self._last_full_refresh = time.time()
# Batch update Redis
try:
pipe = self._redis.pipeline()
for tool in new_cache.values():
pipe.setex(
f"mcp:tool:{tool.name}",
self._config.cache_ttl_seconds,
json.dumps(self._discovered_tool_to_dict(tool)),
)
pipe.execute()
except redis.RedisError as exc:
logger.warning("Redis batch update failed: %s", exc)
logger.info("Discovered %d active tools", len(new_cache))
return len(new_cache)
except Exception as exc:
logger.error("Full registry refresh failed: %s", exc)
return len(self._local_cache)
async def verify_server_tools(
self,
server_name: str,
server_url: str,
http_session: aiohttp.ClientSession,
) -> List[Dict[str, Any]]:
"""
Call tools/list on a specific server to verify its live tool
definitions match the registry. Useful for detecting drift.
"""
request = {
"jsonrpc": "2.0",
"id": "verify",
"method": "tools/list",
"params": {},
}
try:
async with http_session.post(
f"{server_url}/mcp/http",
json=request,
timeout=aiohttp.ClientTimeout(total=5),
) as response:
body = await response.json()
return body.get("result", {}).get("tools", [])
except Exception as exc:
logger.warning(
"Failed to verify tools on %s: %s", server_name, exc
)
return []
async def detect_drift(
self,
server_name: str,
server_url: str,
http_session: aiohttp.ClientSession,
) -> Dict[str, Any]:
"""
Compare registry definitions with live server definitions.
Returns a drift report.
"""
live_tools = await self.verify_server_tools(
server_name, server_url, http_session
)
live_map = {t["name"]: t for t in live_tools}
registry_tools = [
t for t in self._local_cache.values()
if t.server_name == server_name
]
registry_map = {t.name: t for t in registry_tools}
drift = {
"server": server_name,
"checked_at": time.time(),
"added": [],
"removed": [],
"schema_changed": [],
}
# Tools in live but not registry
for name in live_map:
if name not in registry_map:
drift["added"].append(name)
# Tools in registry but not live
for name in registry_map:
if name not in live_map:
drift["removed"].append(name)
# Schema changes
for name in set(live_map) & set(registry_map):
live_schema = json.dumps(
live_map[name].get("inputSchema", {}), sort_keys=True
)
live_hash = hashlib.sha256(live_schema.encode()).hexdigest()[:12]
if live_hash != registry_map[name].schema_hash:
drift["schema_changed"].append({
"tool": name,
"registry_hash": registry_map[name].schema_hash,
"live_hash": live_hash,
})
drift["has_drift"] = bool(
drift["added"] or drift["removed"] or drift["schema_changed"]
)
return drift
# --- Background refresh loop ---
async def _refresh_loop(self) -> None:
"""Periodically refresh the tool registry."""
while True:
await asyncio.sleep(self._config.refresh_interval_seconds)
try:
count = await self.refresh_all()
logger.info("Periodic refresh: %d tools", count)
except Exception as exc:
logger.error("Periodic refresh failed: %s", exc)
# --- Conversion helpers ---
@staticmethod
def _compute_schema_hash(schema: Dict[str, Any]) -> str:
"""Compute a deterministic hash of a tool's input schema."""
schema_str = json.dumps(schema, sort_keys=True)
return hashlib.sha256(schema_str.encode()).hexdigest()[:12]
def _dynamo_item_to_discovered_tool(
self, item: Dict[str, Any]
) -> DiscoveredTool:
"""Convert a DynamoDB item to a DiscoveredTool."""
tool_def = item.get("tool_definition", "{}")
if isinstance(tool_def, str):
tool_def = json.loads(tool_def)
input_schema = tool_def.get("inputSchema", {})
return DiscoveredTool(
name=item["tool_name"],
description=tool_def.get("description", ""),
input_schema=input_schema,
server_name=item.get("server_name", ""),
server_transport=item.get("server_transport", "http"),
server_url=item.get("server_url"),
lambda_function_name=item.get("lambda_function_name"),
version=item.get("version", "1.0.0"),
schema_hash=self._compute_schema_hash(input_schema),
discovered_at=time.time(),
healthy=item.get("healthy", True),
)
@staticmethod
def _dict_to_discovered_tool(data: Dict[str, Any]) -> DiscoveredTool:
"""Convert a cached dict to a DiscoveredTool."""
return DiscoveredTool(
name=data["name"],
description=data.get("description", ""),
input_schema=data.get("input_schema", {}),
server_name=data.get("server_name", ""),
server_transport=data.get("server_transport", "http"),
server_url=data.get("server_url"),
lambda_function_name=data.get("lambda_function_name"),
version=data.get("version", "1.0.0"),
schema_hash=data.get("schema_hash", ""),
discovered_at=data.get("discovered_at", 0),
healthy=data.get("healthy", True),
)
@staticmethod
def _discovered_tool_to_dict(tool: DiscoveredTool) -> Dict[str, Any]:
"""Convert a DiscoveredTool to a cacheable dict."""
return {
"name": tool.name,
"description": tool.description,
"input_schema": tool.input_schema,
"server_name": tool.server_name,
"server_transport": tool.server_transport,
"server_url": tool.server_url,
"lambda_function_name": tool.lambda_function_name,
"version": tool.version,
"schema_hash": tool.schema_hash,
"discovered_at": tool.discovered_at,
"healthy": tool.healthy,
}
3. MCPRequestBuilder — Envelope Construction
3.1 Request Envelope Design
Every MCP call from the orchestrator must be wrapped in a JSON-RPC 2.0 envelope. The MCPRequestBuilder handles request ID generation, method routing, parameter marshaling, and optional schema validation before the request leaves the client.
3.2 MCPRequestBuilder Implementation
"""
MCPRequestBuilder: Constructs MCP JSON-RPC 2.0 request envelopes.
Handles parameter validation, default injection, and envelope sealing.
"""
import json
import time
import logging
import hashlib
from typing import Any, Dict, List, Optional
from dataclasses import dataclass
logger = logging.getLogger("mcp-request-builder")
class MCPRequestBuilder:
"""
Builds well-formed MCP JSON-RPC 2.0 request envelopes.
Features:
- Monotonically increasing request IDs for correlation.
- Optional schema validation before sending.
- Default parameter injection from tool definitions.
- Request metadata for tracing.
"""
def __init__(self, client_name: str = "mangaassist-orchestrator"):
self._counter = 0
self._client_name = client_name
def next_request_id(self) -> str:
"""Generate the next unique request ID."""
self._counter += 1
return f"mcp-{self._counter:08d}-{int(time.time() * 1000) % 100000}"
def build_initialize(
self,
protocol_version: str = "2024-11-05",
) -> Dict[str, Any]:
"""Build an MCP initialize request."""
return {
"jsonrpc": "2.0",
"id": self.next_request_id(),
"method": "initialize",
"params": {
"protocolVersion": protocol_version,
"capabilities": {
"tools": {},
},
"clientInfo": {
"name": self._client_name,
"version": "1.0.0",
},
},
}
def build_tools_list(self) -> Dict[str, Any]:
"""Build a tools/list request to discover available tools."""
return {
"jsonrpc": "2.0",
"id": self.next_request_id(),
"method": "tools/list",
"params": {},
}
def build_tool_call(
self,
request_id: str,
tool_name: str,
arguments: Dict[str, Any],
) -> Dict[str, Any]:
"""
Build a tools/call request envelope.
Args:
request_id: Pre-generated unique ID for this request.
tool_name: Name of the MCP tool to invoke.
arguments: Tool-specific arguments.
Returns:
Complete MCP JSON-RPC 2.0 request envelope.
"""
return {
"jsonrpc": "2.0",
"id": request_id,
"method": "tools/call",
"params": {
"name": tool_name,
"arguments": arguments,
},
}
def build_ping(self) -> Dict[str, Any]:
"""Build a ping request for health checking."""
return {
"jsonrpc": "2.0",
"id": self.next_request_id(),
"method": "ping",
}
def validate_arguments(
self,
tool_name: str,
arguments: Dict[str, Any],
schema: Dict[str, Any],
) -> List[str]:
"""
Validate tool arguments against the input schema.
Returns a list of validation error messages (empty if valid).
"""
errors = []
# Check required fields
required = schema.get("required", [])
for field_name in required:
if field_name not in arguments:
errors.append(
f"Missing required argument: '{field_name}' "
f"for tool '{tool_name}'"
)
# Check property types
properties = schema.get("properties", {})
for arg_name, arg_value in arguments.items():
if arg_name not in properties:
logger.debug(
"Unknown argument '%s' for tool '%s' (may be accepted by server)",
arg_name, tool_name,
)
continue
prop_def = properties[arg_name]
expected_type = prop_def.get("type")
if expected_type == "string" and not isinstance(arg_value, str):
errors.append(
f"Argument '{arg_name}' should be string, "
f"got {type(arg_value).__name__}"
)
elif expected_type == "integer" and not isinstance(arg_value, int):
errors.append(
f"Argument '{arg_name}' should be integer, "
f"got {type(arg_value).__name__}"
)
elif expected_type == "boolean" and not isinstance(arg_value, bool):
errors.append(
f"Argument '{arg_name}' should be boolean, "
f"got {type(arg_value).__name__}"
)
elif expected_type == "array" and not isinstance(arg_value, list):
errors.append(
f"Argument '{arg_name}' should be array, "
f"got {type(arg_value).__name__}"
)
# Check enum constraints
enum_values = prop_def.get("enum")
if enum_values and arg_value not in enum_values:
errors.append(
f"Argument '{arg_name}' value '{arg_value}' "
f"not in allowed values: {enum_values}"
)
# Check numeric constraints
if expected_type == "integer":
minimum = prop_def.get("minimum")
maximum = prop_def.get("maximum")
if minimum is not None and isinstance(arg_value, (int, float)):
if arg_value < minimum:
errors.append(
f"Argument '{arg_name}' value {arg_value} "
f"below minimum {minimum}"
)
if maximum is not None and isinstance(arg_value, (int, float)):
if arg_value > maximum:
errors.append(
f"Argument '{arg_name}' value {arg_value} "
f"above maximum {maximum}"
)
return errors
def inject_defaults(
self,
arguments: Dict[str, Any],
schema: Dict[str, Any],
) -> Dict[str, Any]:
"""
Inject default values for missing optional arguments.
Returns a new dict with defaults applied.
"""
result = dict(arguments)
properties = schema.get("properties", {})
for prop_name, prop_def in properties.items():
if prop_name not in result and "default" in prop_def:
result[prop_name] = prop_def["default"]
return result
4. MCPResponseParser — Content Extraction and Error Handling
4.1 Response Parsing Strategy
MCP responses carry tool results as an array of content blocks. Each block can be text (JSON-encoded results), images, or resource references. The MCPResponseParser extracts and classifies responses, converting raw JSON-RPC envelopes into structured ToolCallResult objects.
4.2 MCPResponseParser Implementation
"""
MCPResponseParser: Parses MCP JSON-RPC 2.0 response envelopes.
Extracts content, classifies errors, and provides structured access.
"""
import json
import logging
from typing import Any, Dict, List, Optional, Tuple
from enum import Enum
logger = logging.getLogger("mcp-response-parser")
class MCPErrorCategory(Enum):
"""Classification of MCP errors for retry/handling decisions."""
RETRYABLE = "retryable" # Transient; retry may succeed
FATAL = "fatal" # Permanent; retry will not help
CLIENT_ERROR = "client_error" # Bad request from client side
SERVER_ERROR = "server_error" # Bug or crash in server
RATE_LIMITED = "rate_limited" # Back off and retry later
NOT_FOUND = "not_found" # Tool or resource missing
# JSON-RPC 2.0 error code ranges
_ERROR_CLASSIFICATION = {
-32700: MCPErrorCategory.CLIENT_ERROR, # Parse error
-32600: MCPErrorCategory.CLIENT_ERROR, # Invalid request
-32601: MCPErrorCategory.NOT_FOUND, # Method not found
-32602: MCPErrorCategory.CLIENT_ERROR, # Invalid params
-32603: MCPErrorCategory.SERVER_ERROR, # Internal error
-32000: MCPErrorCategory.RETRYABLE, # Tool execution error
-32001: MCPErrorCategory.RETRYABLE, # Tool timeout
-32002: MCPErrorCategory.NOT_FOUND, # Tool not found
-32003: MCPErrorCategory.RATE_LIMITED, # Rate limit exceeded
}
class MCPResponseParser:
"""
Parses MCP response envelopes into structured ToolCallResult objects.
Handles:
- Successful results with content blocks.
- MCP-level errors (isError: true in result).
- JSON-RPC level errors (error field in response).
- Malformed responses (missing fields, invalid JSON).
"""
def parse(
self,
raw_response: Dict[str, Any],
tool_name: str,
request_id: str,
) -> "ToolCallResult":
"""
Parse a raw MCP JSON-RPC response into a ToolCallResult.
The response may contain:
1. result.content[] — success case
2. result.isError: true — tool-level error
3. error{} — protocol-level error
"""
from typing import TYPE_CHECKING
# Importing ToolCallResult and ToolCallStatus from the MCPClient module
# In practice these would be in a shared types module
# Check for JSON-RPC error
if "error" in raw_response:
error = raw_response["error"]
error_code = error.get("code", -32603)
error_message = error.get("message", "Unknown error")
category = self.classify_error(error_code)
return ToolCallResult(
tool_name=tool_name,
status=(
ToolCallStatus.TOOL_NOT_FOUND
if category == MCPErrorCategory.NOT_FOUND
else ToolCallStatus.ERROR
),
is_error=True,
request_id=request_id,
error_message=f"[{error_code}] {error_message}",
metadata={
"error_code": error_code,
"error_category": category.value,
"retryable": category in (
MCPErrorCategory.RETRYABLE,
MCPErrorCategory.RATE_LIMITED,
),
},
)
# Extract result
result = raw_response.get("result", {})
# Check for tool-level error (isError flag)
is_error = result.get("isError", False)
content = result.get("content", [])
status = ToolCallStatus.ERROR if is_error else ToolCallStatus.SUCCESS
return ToolCallResult(
tool_name=tool_name,
status=status,
content=content,
is_error=is_error,
request_id=request_id,
error_message=(
self._extract_error_text(content) if is_error else None
),
)
def classify_error(self, error_code: int) -> MCPErrorCategory:
"""Classify a JSON-RPC error code into an actionable category."""
if error_code in _ERROR_CLASSIFICATION:
return _ERROR_CLASSIFICATION[error_code]
# Custom server error range
if -32099 <= error_code <= -32000:
return MCPErrorCategory.RETRYABLE
# Standard server error range
if -32768 <= error_code <= -32000:
return MCPErrorCategory.SERVER_ERROR
return MCPErrorCategory.FATAL
def is_retryable(self, raw_response: Dict[str, Any]) -> bool:
"""Check if a failed response is worth retrying."""
if "error" in raw_response:
code = raw_response["error"].get("code", 0)
category = self.classify_error(code)
return category in (
MCPErrorCategory.RETRYABLE,
MCPErrorCategory.RATE_LIMITED,
MCPErrorCategory.SERVER_ERROR,
)
# Tool-level errors are generally not retryable
# (bad arguments, missing data, etc.)
result = raw_response.get("result", {})
return not result.get("isError", False)
@staticmethod
def _extract_error_text(content: List[Dict[str, Any]]) -> str:
"""Extract error text from content blocks."""
for block in content:
if block.get("type") == "text":
return block.get("text", "Unknown error")
return "Unknown error (no text content)"
def extract_all_text(
self, content: List[Dict[str, Any]]
) -> str:
"""Concatenate all text content blocks."""
texts = []
for block in content:
if block.get("type") == "text":
text = block.get("text", "")
if text:
texts.append(text)
return "\n".join(texts)
def extract_json(
self, content: List[Dict[str, Any]]
) -> Optional[Dict[str, Any]]:
"""Extract and parse the first JSON text block."""
for block in content:
if block.get("type") == "text":
text = block.get("text", "")
try:
return json.loads(text)
except (json.JSONDecodeError, TypeError):
continue
return None
5. MangaAssist MCP Tool Definitions — JSON Examples
5.1 Complete Tool Definition Catalog
These JSON definitions show exactly what the MangaAssist MCP servers advertise via tools/list. The Bedrock Claude 3 model receives these definitions and uses them to decide which tools to call and how to construct arguments.
{
"tools": [
{
"name": "manga_search",
"description": "Search the MangaAssist catalog for manga titles, volumes, and series. Supports Japanese and English queries with hybrid vector and keyword search. Returns matching manga with titles, prices, availability, cover images, and relevance scores.",
"inputSchema": {
"type": "object",
"properties": {
"query": {
"type": "string",
"description": "Search query in Japanese or English (e.g., 'ワンピース 100巻' or 'One Piece volume 100')"
},
"language": {
"type": "string",
"enum": ["ja", "en"],
"default": "ja",
"description": "Query language for search optimization"
},
"max_results": {
"type": "integer",
"minimum": 1,
"maximum": 20,
"default": 5,
"description": "Maximum number of results to return"
},
"genre": {
"type": "string",
"enum": ["shonen", "shojo", "seinen", "josei", "kodomo"],
"description": "Optional genre filter"
},
"in_stock_only": {
"type": "boolean",
"default": false,
"description": "Filter to only in-stock items"
}
},
"required": ["query"]
}
},
{
"name": "price_lookup",
"description": "Look up current prices for specific manga by ID or ISBN. Returns retail price, any active discounts, bundle deals, and price history. Prices are in JPY.",
"inputSchema": {
"type": "object",
"properties": {
"manga_id": {
"type": "string",
"description": "MangaAssist product ID (e.g., 'MNG-00123456')"
},
"isbn": {
"type": "string",
"description": "ISBN-13 of the manga volume"
},
"include_history": {
"type": "boolean",
"default": false,
"description": "Include 30-day price history"
},
"include_bundles": {
"type": "boolean",
"default": true,
"description": "Include available bundle deals"
}
},
"oneOf": [
{"required": ["manga_id"]},
{"required": ["isbn"]}
]
}
},
{
"name": "inventory_check",
"description": "Check real-time stock availability for one or more manga items. Returns stock count, warehouse location, and estimated restock date if out of stock.",
"inputSchema": {
"type": "object",
"properties": {
"manga_ids": {
"type": "array",
"items": {"type": "string"},
"minItems": 1,
"maxItems": 50,
"description": "List of MangaAssist product IDs to check"
},
"warehouse": {
"type": "string",
"enum": ["tokyo", "osaka", "all"],
"default": "all",
"description": "Specific warehouse to check"
}
},
"required": ["manga_ids"]
}
},
{
"name": "add_to_cart",
"description": "Add a manga item to the customer's shopping cart. Validates stock availability before adding. Returns updated cart summary with totals.",
"inputSchema": {
"type": "object",
"properties": {
"manga_id": {
"type": "string",
"description": "MangaAssist product ID"
},
"quantity": {
"type": "integer",
"minimum": 1,
"maximum": 10,
"default": 1,
"description": "Quantity to add"
}
},
"required": ["manga_id"]
}
},
{
"name": "checkout",
"description": "Process checkout for the current cart. Validates stock, calculates total with 10% consumption tax, and creates an order. Returns order confirmation with estimated delivery date.",
"inputSchema": {
"type": "object",
"properties": {
"shipping_address_id": {
"type": "string",
"description": "Saved shipping address ID from user profile"
},
"payment_method_id": {
"type": "string",
"description": "Saved payment method ID from user profile"
},
"gift_wrap": {
"type": "boolean",
"default": false,
"description": "Add gift wrapping (+200 JPY)"
}
},
"required": ["shipping_address_id", "payment_method_id"]
}
},
{
"name": "manga_recommendations",
"description": "Generate personalized manga recommendations based on user reading history, preferences, and current trends. Uses collaborative filtering and content-based similarity through OpenSearch kNN.",
"inputSchema": {
"type": "object",
"properties": {
"user_id": {
"type": "string",
"description": "User ID for personalized recommendations"
},
"seed_titles": {
"type": "array",
"items": {"type": "string"},
"description": "Manga titles the user likes (for cold start users)"
},
"genre_preference": {
"type": "string",
"enum": ["shonen", "shojo", "seinen", "josei", "any"],
"default": "any",
"description": "Preferred genre filter"
},
"count": {
"type": "integer",
"minimum": 1,
"maximum": 20,
"default": 5,
"description": "Number of recommendations"
},
"exclude_owned": {
"type": "boolean",
"default": true,
"description": "Exclude manga already purchased"
}
},
"required": ["user_id"]
}
}
]
}
5.2 Example MCP Request/Response for MangaAssist
Request: Search for a manga
{
"jsonrpc": "2.0",
"id": "mcp-00000042-78231",
"method": "tools/call",
"params": {
"name": "manga_search",
"arguments": {
"query": "鬼滅の刃",
"language": "ja",
"max_results": 3,
"in_stock_only": true
}
}
}
Response: Search results
{
"jsonrpc": "2.0",
"id": "mcp-00000042-78231",
"result": {
"content": [
{
"type": "text",
"text": "{\"query\":\"鬼滅の刃\",\"language\":\"ja\",\"total_results\":3,\"results\":[{\"manga_id\":\"MNG-00045678\",\"title_ja\":\"鬼滅の刃 1\",\"title_en\":\"Demon Slayer Vol. 1\",\"author\":\"吾峠呼世晴\",\"genre\":\"shonen\",\"volume\":1,\"price_jpy\":484,\"in_stock\":true,\"cover_url\":\"https://cdn.mangaassist.jp/covers/kimetsu-01.jpg\",\"relevance_score\":9.82},{\"manga_id\":\"MNG-00045679\",\"title_ja\":\"鬼滅の刃 2\",\"title_en\":\"Demon Slayer Vol. 2\",\"author\":\"吾峠呼世晴\",\"genre\":\"shonen\",\"volume\":2,\"price_jpy\":484,\"in_stock\":true,\"cover_url\":\"https://cdn.mangaassist.jp/covers/kimetsu-02.jpg\",\"relevance_score\":9.45},{\"manga_id\":\"MNG-00045680\",\"title_ja\":\"鬼滅の刃 3\",\"title_en\":\"Demon Slayer Vol. 3\",\"author\":\"吾峠呼世晴\",\"genre\":\"shonen\",\"volume\":3,\"price_jpy\":484,\"in_stock\":true,\"cover_url\":\"https://cdn.mangaassist.jp/covers/kimetsu-03.jpg\",\"relevance_score\":9.21}]}"
}
],
"isError": false
}
}
Error Response Example: Tool not found
{
"jsonrpc": "2.0",
"id": "mcp-00000043-78232",
"error": {
"code": -32002,
"message": "Tool not found: manga_translate"
}
}
Error Response Example: Tool execution failed
{
"jsonrpc": "2.0",
"id": "mcp-00000044-78233",
"result": {
"content": [
{
"type": "text",
"text": "Manga MNG-99999999 not found in catalog"
}
],
"isError": true
}
}
6. Circuit Breaker Pattern at the MCP Level
6.1 Circuit Breaker Design
Each MCP server gets its own circuit breaker. When a server fails repeatedly, the circuit opens and subsequent calls fail immediately without consuming Lambda invocations or ECS capacity. This prevents cascade failures at MangaAssist's 1M messages/day scale.
6.2 Circuit Breaker Implementation
"""
CircuitBreaker: Per-server circuit breaker for MCP tool calls.
Prevents cascade failures when an MCP server becomes unhealthy.
"""
import time
import logging
from enum import Enum
from dataclasses import dataclass, field
from typing import Optional
logger = logging.getLogger("mcp-circuit-breaker")
class CircuitState(Enum):
"""Circuit breaker states."""
CLOSED = "closed" # Normal operation — requests flow
OPEN = "open" # Server unhealthy — requests blocked
HALF_OPEN = "half_open" # Probing — limited requests allowed
@dataclass
class CircuitBreaker:
"""
Circuit breaker with configurable thresholds and recovery timing.
State transitions:
- CLOSED -> OPEN: When failure_count >= failure_threshold
- OPEN -> HALF_OPEN: After recovery_timeout_seconds elapsed
- HALF_OPEN -> CLOSED: On successful probe request
- HALF_OPEN -> OPEN: On failed probe request
For MangaAssist, each MCP server (manga_search Lambda, order_processing
ECS, etc.) gets its own circuit breaker. This means if the recommendation
engine is down, search and checkout still work.
"""
name: str
failure_threshold: int = 5
recovery_timeout_seconds: float = 30.0
half_open_max_calls: int = 1
success_threshold: int = 2
_failure_count: int = field(default=0, init=False)
_success_count: int = field(default=0, init=False)
_state: CircuitState = field(default=CircuitState.CLOSED, init=False)
_last_failure_time: float = field(default=0.0, init=False)
_last_state_change: float = field(default_factory=time.time, init=False)
_half_open_calls: int = field(default=0, init=False)
_total_blocked: int = field(default=0, init=False)
@property
def state(self) -> CircuitState:
"""Get current state, with automatic OPEN -> HALF_OPEN transition."""
if self._state == CircuitState.OPEN:
elapsed = time.time() - self._last_failure_time
if elapsed >= self.recovery_timeout_seconds:
self._transition_to(CircuitState.HALF_OPEN)
return self._state
@property
def failure_count(self) -> int:
return self._failure_count
@property
def time_until_recovery_seconds(self) -> float:
"""Time until circuit moves from OPEN to HALF_OPEN."""
if self._state != CircuitState.OPEN:
return 0.0
elapsed = time.time() - self._last_failure_time
remaining = self.recovery_timeout_seconds - elapsed
return max(0.0, remaining)
def allow_request(self) -> bool:
"""Check if a request is allowed through the circuit."""
current_state = self.state # Triggers auto-transition
if current_state == CircuitState.CLOSED:
return True
if current_state == CircuitState.HALF_OPEN:
if self._half_open_calls < self.half_open_max_calls:
self._half_open_calls += 1
logger.info(
"Circuit %s HALF_OPEN: allowing probe request %d/%d",
self.name, self._half_open_calls, self.half_open_max_calls,
)
return True
return False
# OPEN
self._total_blocked += 1
return False
def record_success(self) -> None:
"""Record a successful request."""
if self._state == CircuitState.HALF_OPEN:
self._success_count += 1
if self._success_count >= self.success_threshold:
self._transition_to(CircuitState.CLOSED)
logger.info(
"Circuit %s CLOSED after %d successful probes",
self.name, self._success_count,
)
elif self._state == CircuitState.CLOSED:
# Reset failure count on success in closed state
self._failure_count = 0
def record_failure(self) -> None:
"""Record a failed request."""
self._failure_count += 1
self._last_failure_time = time.time()
if self._state == CircuitState.HALF_OPEN:
# Probe failed — go back to OPEN
self._transition_to(CircuitState.OPEN)
logger.warning(
"Circuit %s re-OPENED: probe request failed", self.name
)
elif self._state == CircuitState.CLOSED:
if self._failure_count >= self.failure_threshold:
self._transition_to(CircuitState.OPEN)
logger.warning(
"Circuit %s OPENED after %d consecutive failures",
self.name, self._failure_count,
)
def force_open(self) -> None:
"""Manually open the circuit (e.g., for maintenance)."""
self._transition_to(CircuitState.OPEN)
self._last_failure_time = time.time()
logger.warning("Circuit %s FORCE OPENED", self.name)
def force_close(self) -> None:
"""Manually close the circuit (e.g., after maintenance)."""
self._transition_to(CircuitState.CLOSED)
logger.info("Circuit %s FORCE CLOSED", self.name)
def get_stats(self) -> dict:
"""Return circuit breaker statistics."""
return {
"name": self.name,
"state": self.state.value,
"failure_count": self._failure_count,
"failure_threshold": self.failure_threshold,
"success_count": self._success_count,
"total_blocked": self._total_blocked,
"time_until_recovery_s": round(
self.time_until_recovery_seconds, 1
),
"last_state_change": self._last_state_change,
}
def _transition_to(self, new_state: CircuitState) -> None:
"""Transition to a new circuit state."""
old_state = self._state
self._state = new_state
self._last_state_change = time.time()
if new_state == CircuitState.CLOSED:
self._failure_count = 0
self._success_count = 0
elif new_state == CircuitState.HALF_OPEN:
self._half_open_calls = 0
self._success_count = 0
elif new_state == CircuitState.OPEN:
self._success_count = 0
logger.info(
"Circuit %s: %s -> %s",
self.name, old_state.value, new_state.value,
)
7. Retry and Backoff Patterns
7.1 Retry Strategy for MCP Tool Calls
"""
RetryManager: Configurable retry logic for MCP tool calls.
Uses exponential backoff with jitter and respects idempotency.
"""
import asyncio
import random
import time
import logging
from typing import Any, Callable, Dict, Optional
from dataclasses import dataclass
logger = logging.getLogger("mcp-retry")
@dataclass
class RetryConfig:
"""Configuration for retry behavior."""
max_retries: int = 2
base_delay_ms: float = 100.0
max_delay_ms: float = 5000.0
jitter_factor: float = 0.25
retry_on_timeout: bool = True
retry_on_server_error: bool = True
retry_on_rate_limit: bool = True
class RetryManager:
"""
Manages retry logic for MCP tool calls with exponential backoff.
Retry decisions are based on error classification:
- RETRYABLE errors: retry with exponential backoff.
- RATE_LIMITED errors: retry with longer backoff.
- CLIENT_ERROR errors: do not retry (bad request).
- FATAL errors: do not retry (permanent failure).
Jitter prevents thundering herd when many clients retry simultaneously
after an MCP server recovers from an outage.
"""
def __init__(self, config: Optional[RetryConfig] = None):
self.config = config or RetryConfig()
def compute_delay_ms(self, attempt: int) -> float:
"""
Compute the delay before the next retry.
Uses exponential backoff with decorrelated jitter.
"""
base = self.config.base_delay_ms * (2 ** attempt)
capped = min(base, self.config.max_delay_ms)
# Add jitter: +/- jitter_factor of the delay
jitter_range = capped * self.config.jitter_factor
jitter = random.uniform(-jitter_range, jitter_range)
delay = max(0, capped + jitter)
return delay
def should_retry(
self,
attempt: int,
error_category: str,
is_idempotent: bool = False,
) -> bool:
"""
Decide whether to retry based on attempt number,
error type, and idempotency.
"""
if attempt >= self.config.max_retries:
return False
if error_category == "retryable":
return self.config.retry_on_server_error
elif error_category == "rate_limited":
return self.config.retry_on_rate_limit
elif error_category == "server_error":
return self.config.retry_on_server_error
elif error_category == "client_error":
return False
elif error_category == "fatal":
return False
elif error_category == "not_found":
return False
# Default: retry if idempotent
return is_idempotent
async def execute_with_retry(
self,
operation: Callable,
tool_name: str,
is_idempotent: bool = False,
) -> Any:
"""
Execute an async operation with retry logic.
Returns the result or raises the last exception.
"""
last_exception = None
for attempt in range(self.config.max_retries + 1):
try:
return await operation()
except Exception as exc:
last_exception = exc
error_category = self._classify_exception(exc)
if not self.should_retry(
attempt, error_category, is_idempotent
):
raise
delay = self.compute_delay_ms(attempt)
logger.warning(
"Retry %d/%d for tool %s (delay=%.0fms, error=%s): %s",
attempt + 1,
self.config.max_retries,
tool_name,
delay,
error_category,
exc,
)
await asyncio.sleep(delay / 1000.0)
raise last_exception
@staticmethod
def _classify_exception(exc: Exception) -> str:
"""Classify an exception into a retry category."""
if isinstance(exc, asyncio.TimeoutError):
return "retryable"
elif isinstance(exc, ConnectionError):
return "retryable"
elif isinstance(exc, RuntimeError) and "rate limit" in str(exc).lower():
return "rate_limited"
elif isinstance(exc, ValueError):
return "client_error"
else:
return "server_error"
8. Integration Example — Complete MangaAssist Orchestrator Flow
8.1 End-to-End Tool Call in the Orchestrator
"""
Complete example: MangaAssist orchestrator using the MCP client library
to handle a user query that requires multiple tool calls.
"""
import asyncio
import json
import logging
# Assume all MCP client classes are imported from the modules above.
logger = logging.getLogger("mangaassist-orchestrator")
async def handle_user_message(user_message: str, user_id: str):
"""
Full orchestrator flow for a user message:
1. Call Bedrock Claude 3 with user message + tool definitions.
2. If Claude returns tool_use, invoke MCP tools via client library.
3. Feed tool results back to Claude for final response.
"""
# --- Initialize MCP client ---
client = MCPClient(region="ap-northeast-1")
await client.initialize()
# Register MCP server endpoints
client.register_server(ServerEndpoint(
name="manga-search-lambda",
transport=TransportType.LAMBDA,
lambda_function_name="mangaassist-mcp-catalog-search",
lambda_qualifier="prod",
timeout_seconds=5.0,
max_retries=2,
))
client.register_server(ServerEndpoint(
name="price-lookup-lambda",
transport=TransportType.LAMBDA,
lambda_function_name="mangaassist-mcp-price-lookup",
timeout_seconds=3.0,
max_retries=1,
))
client.register_server(ServerEndpoint(
name="order-processing-ecs",
transport=TransportType.WEBSOCKET,
url="ws://order-mcp.mangaassist.internal:8080",
timeout_seconds=10.0,
max_retries=1,
))
# Register tool-to-server mappings
client.register_tool_mapping("manga_search", "manga-search-lambda")
client.register_tool_mapping("price_lookup", "price-lookup-lambda")
client.register_tool_mapping("add_to_cart", "order-processing-ecs")
client.register_tool_mapping("checkout", "order-processing-ecs")
# --- Step 1: Call Bedrock with tools ---
# (Simplified — in production, use the Bedrock Converse API)
tool_use_block = {
"type": "tool_use",
"id": "toolu_01abc",
"name": "manga_search",
"input": {
"query": user_message,
"language": "ja",
"max_results": 5,
},
}
# --- Step 2: Invoke MCP tool via client library ---
result = await client.call_tool(
tool_name=tool_use_block["name"],
arguments=tool_use_block["input"],
)
logger.info(
"Tool call: %s, status=%s, latency=%.0fms, attempts=%d",
result.tool_name,
result.status.value,
result.latency_ms,
result.attempt_count,
)
if result.is_error:
logger.error("Tool error: %s", result.error_message)
tool_result_content = f"Tool {result.tool_name} failed: {result.error_message}"
else:
tool_result_content = result.get_text()
# --- Step 3: Feed results back to Claude ---
# The tool_result_content is passed as a tool_result message
# back to Bedrock Converse API for Claude to generate
# the final user-facing response.
logger.info(
"Client metrics: %s", json.dumps(client.get_metrics(), indent=2)
)
await client.close()
return tool_result_content
# --- Run the example ---
if __name__ == "__main__":
result = asyncio.run(
handle_user_message("鬼滅の刃は在庫ありますか?", "user-12345")
)
print(result)
9. Error Handling Decision Matrix
| Error Type | MCP Error Code | Category | Retry? | Circuit Breaker Impact | Fallback Strategy |
|---|---|---|---|---|---|
| Parse error | -32700 | Client | No | None | Fix request format |
| Invalid request | -32600 | Client | No | None | Fix request structure |
| Method not found | -32601 | Not found | No | None | Update tool registry |
| Invalid params | -32602 | Client | No | None | Fix arguments |
| Internal error | -32603 | Server | Yes (2x) | +1 failure | Retry with backoff |
| Tool execution error | -32000 | Retryable | Yes (2x) | +1 failure | Retry, then cached |
| Tool timeout | -32001 | Retryable | Yes (1x) | +1 failure | Return partial data |
| Tool not found | -32002 | Not found | No | None | Refresh tool registry |
| Rate limit exceeded | -32003 | Rate limited | Yes (3x) | None | Exponential backoff |
| Network timeout | N/A | Transport | Yes (2x) | +1 failure | Switch transport |
| Connection refused | N/A | Transport | Yes (1x) | +1 failure | Fail to user |
10. Key Takeaways
-
MCPClient is the single entry point: The orchestrator never directly invokes Lambda or calls ECS endpoints. It calls
client.call_tool("manga_search", args)and the client handles everything: transport selection, retries, circuit breaking, and response parsing. -
ToolDiscoveryClient enables dynamic tool addition: New MCP servers register in DynamoDB, the discovery client picks them up within 5 minutes, and the orchestrator can invoke new tools without redeployment. This decouples tool development from orchestrator releases.
-
MCPRequestBuilder enforces protocol correctness: Every request gets a unique ID, proper JSON-RPC 2.0 structure, and optional schema validation before it leaves the client. This catches bad arguments before they consume Lambda invocations.
-
MCPResponseParser classifies errors for retry decisions: Not all errors are retryable. Client errors (bad arguments) should not be retried. Server errors and timeouts should. Rate limit errors need longer backoff. The parser makes this classification automatic.
-
Per-server circuit breakers isolate failures: If the recommendation MCP server is down, the circuit opens for that server only. Search, price lookup, and ordering continue to work. At 1M messages/day, this isolation is the difference between a partial outage and a total outage.
-
Exponential backoff with jitter prevents thundering herds: When an MCP server recovers from an outage, all waiting clients would retry simultaneously without jitter. The decorrelated jitter spreads retries over time, giving the server a chance to warm up.
-
JSON tool definitions drive the FM's tool selection: The quality of tool descriptions and parameter documentation directly affects Claude 3's ability to select the right tool and construct valid arguments. Investing in clear, detailed tool definitions reduces failed tool calls and wasted tokens.
-
Three-level caching (local memory -> Redis -> DynamoDB) balances speed and freshness: Tool resolution hits the local cache in microseconds, Redis in ~1ms, and DynamoDB in ~5ms. The 60-second TTL ensures tools are fresh enough while avoiding per-request DynamoDB reads at scale.