LOCAL PREVIEW View on GitHub

MCP Client Patterns and Consistent Tool Access

MangaAssist context: JP Manga store chatbot on AWS — Bedrock Claude 3 (Sonnet at $3/$15 per 1M tokens input/output, Haiku at $0.25/$1.25), OpenSearch Serverless (vector store), DynamoDB (sessions/products), ECS Fargate (orchestrator), API Gateway WebSocket, ElastiCache Redis. Target: useful answer in under 3 seconds, 1M messages/day scale.


Skill Mapping

Attribute Value
Certification AWS Certified AI Practitioner (AIP-C01)
Domain 2 — Implementation and Integration of Foundation Models
Task 2.1 — Select and implement appropriate FM integration approaches
Skill 2.1.7 — Develop model extension frameworks to enhance FM capabilities (Lambda for stateless MCP servers, ECS for complex MCP servers, MCP client libraries for consistent access patterns)

MCP Client Architecture Mindmap

mindmap
  root((MCP Client<br/>Patterns))
    MCPClient Core
      Connection Lifecycle
        Initialize Handshake
        Capability Negotiation
        Session Management
        Graceful Shutdown
      Transport Abstraction
        Lambda Direct Invoke
        HTTP REST Calls
        WebSocket Persistent
        gRPC Internal
      Request Pipeline
        Request ID Generation
        Envelope Construction
        Timeout Management
        Response Correlation
    Tool Discovery
      ToolDiscoveryClient
        Server Enumeration
        Schema Fetching
        Capability Caching
        Refresh Scheduling
      Registry Integration
        DynamoDB Backed
        Redis Cached
        Version Tracking
        Health Filtering
      Schema Validation
        JSON Schema Checking
        Parameter Type Coercion
        Required Field Checks
        Default Injection
    Request Building
      MCPRequestBuilder
        Method Selection
        Parameter Marshaling
        Argument Validation
        Envelope Sealing
      Batch Operations
        Parallel Tool Calls
        Dependency Resolution
        Fan-Out Patterns
        Result Aggregation
    Response Parsing
      MCPResponseParser
        Content Extraction
        Error Classification
        Type Deserialization
        Metadata Extraction
      Error Handling
        Retryable Errors
        Fatal Errors
        Partial Failures
        Timeout Handling
    Resilience Patterns
      Circuit Breaker
        Per-Server State
        Failure Thresholds
        Half-Open Probing
        Recovery Timing
      Retry Logic
        Exponential Backoff
        Jitter Addition
        Max Attempt Limits
        Idempotency Guards
      Fallback Strategies
        Cached Results
        Degraded Responses
        Alternative Tools
        Graceful Omission

MCP Client Request Flow in MangaAssist

flowchart TB
    subgraph Orchestrator["ECS Fargate Orchestrator"]
        BedrockCall[Bedrock Claude 3<br/>Returns tool_use Block]
        MCPClient[MCPClient<br/>Unified Interface]
        ReqBuilder[MCPRequestBuilder<br/>Envelope Construction]
        Discovery[ToolDiscoveryClient<br/>Server Resolution]
        RespParser[MCPResponseParser<br/>Content Extraction]
        CircuitBkr[Circuit Breaker<br/>Per-Server]
        RetryMgr[Retry Manager<br/>Exponential Backoff]
    end

    subgraph Transport["Transport Layer"]
        LambdaInvoke[Lambda Direct Invoke<br/>boto3 lambda.invoke]
        HTTPClient[HTTP Client<br/>aiohttp POST]
        WSClient[WebSocket Client<br/>aiohttp ws_connect]
    end

    subgraph MCPServers["MCP Servers"]
        LambdaSearch[Lambda: manga_search]
        LambdaPrice[Lambda: price_lookup]
        ECSOrder[ECS: order_processing]
        ECSTranslate[ECS: translation]
    end

    BedrockCall -->|tool_use: manga_search| MCPClient
    MCPClient -->|Resolve server| Discovery
    Discovery -->|Server endpoint + transport| MCPClient
    MCPClient -->|Check circuit| CircuitBkr
    CircuitBkr -->|CLOSED: proceed| ReqBuilder
    CircuitBkr -->|OPEN: fail fast| RespParser
    ReqBuilder -->|JSON-RPC envelope| RetryMgr

    RetryMgr -->|Lambda transport| LambdaInvoke
    RetryMgr -->|HTTP transport| HTTPClient
    RetryMgr -->|WS transport| WSClient

    LambdaInvoke --> LambdaSearch
    LambdaInvoke --> LambdaPrice
    HTTPClient --> ECSOrder
    WSClient --> ECSTranslate

    LambdaSearch -->|MCP response| RespParser
    LambdaPrice -->|MCP response| RespParser
    ECSOrder -->|MCP response| RespParser
    ECSTranslate -->|MCP response| RespParser

    RespParser -->|Extracted content| BedrockCall

    style Orchestrator fill:#e3f2fd,stroke:#1565c0
    style Transport fill:#fff3e0,stroke:#e65100
    style MCPServers fill:#e8f5e9,stroke:#2e7d32

1. MCPClient — Unified Tool Access Interface

1.1 Design Philosophy

The MCPClient is the single entry point for all MCP tool interactions in MangaAssist. The ECS Fargate orchestrator calls client.call_tool(tool_name, arguments) and the client handles everything: discovering which server hosts the tool, selecting the right transport, building the MCP envelope, executing with retries, parsing the response, and managing circuit breakers.

This abstraction is critical because the orchestrator should never need to know that manga_search runs on Lambda while order_processing runs on ECS. When the team migrates a tool from Lambda to ECS (for example, to add streaming support to search), the orchestrator code does not change.

1.2 MCPClient Implementation

"""
MCPClient: The unified MCP client library for MangaAssist.
Provides consistent tool access regardless of server type or transport.
"""

import json
import time
import asyncio
import logging
import hashlib
from typing import Any, Dict, List, Optional, Tuple
from enum import Enum
from dataclasses import dataclass, field

import aiohttp
import boto3

logger = logging.getLogger("mcp-client")
logger.setLevel(logging.INFO)


class TransportType(Enum):
    """Supported MCP transport types."""
    LAMBDA = "lambda"
    HTTP = "http"
    WEBSOCKET = "websocket"


class ToolCallStatus(Enum):
    """Outcome status for a tool call."""
    SUCCESS = "success"
    ERROR = "error"
    TIMEOUT = "timeout"
    CIRCUIT_OPEN = "circuit_open"
    TOOL_NOT_FOUND = "tool_not_found"
    TRANSPORT_ERROR = "transport_error"


@dataclass
class ToolCallResult:
    """Structured result from an MCP tool call."""
    tool_name: str
    status: ToolCallStatus
    content: List[Dict[str, Any]] = field(default_factory=list)
    is_error: bool = False
    latency_ms: float = 0.0
    server_name: str = ""
    attempt_count: int = 1
    request_id: str = ""
    error_message: Optional[str] = None
    metadata: Dict[str, Any] = field(default_factory=dict)

    def get_text(self) -> str:
        """Extract the text content from the first content block."""
        for block in self.content:
            if block.get("type") == "text":
                return block.get("text", "")
        return ""

    def get_parsed_json(self) -> Optional[Dict[str, Any]]:
        """Parse text content as JSON, returning None on failure."""
        text = self.get_text()
        if text:
            try:
                return json.loads(text)
            except (json.JSONDecodeError, TypeError):
                return None
        return None


@dataclass
class ServerEndpoint:
    """Configuration for an MCP server endpoint."""
    name: str
    transport: TransportType
    url: Optional[str] = None
    lambda_function_name: Optional[str] = None
    lambda_qualifier: Optional[str] = None
    timeout_seconds: float = 5.0
    max_retries: int = 2
    retry_base_delay_ms: float = 100.0
    retry_max_delay_ms: float = 2000.0


class MCPClient:
    """
    Unified MCP client for the MangaAssist orchestrator.

    Responsibilities:
    1. Server and tool discovery via ToolDiscoveryClient.
    2. Request construction via MCPRequestBuilder.
    3. Transport-agnostic invocation with retry and circuit breaking.
    4. Response parsing via MCPResponseParser.
    5. Metrics emission for observability.

    Usage:
        client = MCPClient(region="ap-northeast-1")
        await client.initialize()
        result = await client.call_tool("manga_search", {"query": "One Piece"})
        print(result.get_text())
        await client.close()
    """

    def __init__(self, region: str = "ap-northeast-1"):
        self.region = region
        self._endpoints: Dict[str, ServerEndpoint] = {}
        self._tool_to_server: Dict[str, str] = {}
        self._tool_definitions: Dict[str, Dict[str, Any]] = {}
        self._circuit_breakers: Dict[str, "CircuitBreaker"] = {}
        self._request_builder = MCPRequestBuilder()
        self._response_parser = MCPResponseParser()
        self._lambda_client: Optional[boto3.client] = None
        self._http_session: Optional[aiohttp.ClientSession] = None
        self._ws_connections: Dict[str, aiohttp.ClientWebSocketResponse] = {}
        self._metrics: Dict[str, Any] = {
            "total_calls": 0,
            "successful_calls": 0,
            "failed_calls": 0,
            "total_latency_ms": 0.0,
        }
        self._initialized = False

    async def initialize(self) -> None:
        """Initialize transport clients and connections."""
        self._lambda_client = boto3.client("lambda", region_name=self.region)
        self._http_session = aiohttp.ClientSession(
            timeout=aiohttp.ClientTimeout(total=30),
            connector=aiohttp.TCPConnector(
                limit=100,
                ttl_dns_cache=300,
                keepalive_timeout=60,
            ),
        )
        self._initialized = True
        logger.info("MCPClient initialized for region %s", self.region)

    async def close(self) -> None:
        """Clean up all connections and sessions."""
        for ws in self._ws_connections.values():
            if not ws.closed:
                await ws.close()
        self._ws_connections.clear()
        if self._http_session:
            await self._http_session.close()
        self._initialized = False
        logger.info("MCPClient closed")

    def register_server(self, endpoint: ServerEndpoint) -> None:
        """Register an MCP server endpoint."""
        self._endpoints[endpoint.name] = endpoint
        self._circuit_breakers[endpoint.name] = CircuitBreaker(
            name=endpoint.name,
            failure_threshold=5,
            recovery_timeout_seconds=30.0,
        )
        logger.info(
            "Registered server: %s (%s)",
            endpoint.name, endpoint.transport.value,
        )

    def register_tool_mapping(
        self, tool_name: str, server_name: str,
        definition: Optional[Dict[str, Any]] = None,
    ) -> None:
        """Map a tool name to its hosting server."""
        self._tool_to_server[tool_name] = server_name
        if definition:
            self._tool_definitions[tool_name] = definition

    async def call_tool(
        self,
        tool_name: str,
        arguments: Dict[str, Any],
        timeout_override: Optional[float] = None,
    ) -> ToolCallResult:
        """
        Call an MCP tool by name.

        This is the primary API. It:
        1. Resolves the tool to a server endpoint.
        2. Checks the circuit breaker for that server.
        3. Builds the MCP JSON-RPC request envelope.
        4. Sends via the appropriate transport with retries.
        5. Parses the response and returns a structured result.
        """
        if not self._initialized:
            raise RuntimeError("MCPClient not initialized. Call initialize() first.")

        start_time = time.time()
        self._metrics["total_calls"] += 1

        # Step 1: Resolve server
        server_name = self._tool_to_server.get(tool_name)
        if not server_name:
            return ToolCallResult(
                tool_name=tool_name,
                status=ToolCallStatus.TOOL_NOT_FOUND,
                is_error=True,
                error_message=f"No server registered for tool: {tool_name}",
            )

        endpoint = self._endpoints.get(server_name)
        if not endpoint:
            return ToolCallResult(
                tool_name=tool_name,
                status=ToolCallStatus.TOOL_NOT_FOUND,
                is_error=True,
                error_message=f"Server endpoint not found: {server_name}",
            )

        # Step 2: Check circuit breaker
        cb = self._circuit_breakers.get(server_name)
        if cb and not cb.allow_request():
            self._metrics["failed_calls"] += 1
            return ToolCallResult(
                tool_name=tool_name,
                status=ToolCallStatus.CIRCUIT_OPEN,
                is_error=True,
                server_name=server_name,
                error_message=(
                    f"Circuit breaker OPEN for {server_name}. "
                    f"Failures: {cb.failure_count}, "
                    f"Recovery in {cb.time_until_recovery_seconds:.0f}s"
                ),
            )

        # Step 3: Build request
        request_id = self._request_builder.next_request_id()
        mcp_request = self._request_builder.build_tool_call(
            request_id=request_id,
            tool_name=tool_name,
            arguments=arguments,
        )

        # Step 4: Execute with retries
        timeout = timeout_override or endpoint.timeout_seconds
        last_exception = None
        attempt_count = 0

        for attempt in range(endpoint.max_retries + 1):
            attempt_count = attempt + 1
            try:
                raw_response = await self._send_request(
                    endpoint, mcp_request, timeout
                )

                # Step 5: Parse response
                result = self._response_parser.parse(
                    raw_response, tool_name, request_id
                )

                elapsed_ms = (time.time() - start_time) * 1000
                result.latency_ms = elapsed_ms
                result.server_name = server_name
                result.attempt_count = attempt_count
                result.request_id = request_id

                if cb:
                    if result.is_error:
                        cb.record_failure()
                    else:
                        cb.record_success()

                if result.is_error:
                    self._metrics["failed_calls"] += 1
                else:
                    self._metrics["successful_calls"] += 1
                self._metrics["total_latency_ms"] += elapsed_ms

                return result

            except asyncio.TimeoutError:
                last_exception = TimeoutError(
                    f"Tool {tool_name} timed out after {timeout}s"
                )
                logger.warning(
                    "Timeout on attempt %d/%d for %s/%s",
                    attempt + 1, endpoint.max_retries + 1,
                    server_name, tool_name,
                )
            except Exception as exc:
                last_exception = exc
                logger.warning(
                    "Error on attempt %d/%d for %s/%s: %s",
                    attempt + 1, endpoint.max_retries + 1,
                    server_name, tool_name, exc,
                )

            if attempt < endpoint.max_retries:
                delay = min(
                    endpoint.retry_base_delay_ms * (2 ** attempt),
                    endpoint.retry_max_delay_ms,
                ) / 1000.0
                await asyncio.sleep(delay)

        # All retries exhausted
        if cb:
            cb.record_failure()
        elapsed_ms = (time.time() - start_time) * 1000
        self._metrics["failed_calls"] += 1

        return ToolCallResult(
            tool_name=tool_name,
            status=ToolCallStatus.TIMEOUT if isinstance(last_exception, TimeoutError) else ToolCallStatus.TRANSPORT_ERROR,
            is_error=True,
            latency_ms=elapsed_ms,
            server_name=server_name,
            attempt_count=attempt_count,
            request_id=request_id,
            error_message=str(last_exception),
        )

    async def call_tools_parallel(
        self,
        calls: List[Tuple[str, Dict[str, Any]]],
    ) -> List[ToolCallResult]:
        """
        Execute multiple tool calls in parallel.
        Returns results in the same order as the input calls.
        """
        tasks = [
            self.call_tool(tool_name, arguments)
            for tool_name, arguments in calls
        ]
        return await asyncio.gather(*tasks)

    async def _send_request(
        self,
        endpoint: ServerEndpoint,
        request: Dict[str, Any],
        timeout: float,
    ) -> Dict[str, Any]:
        """Send an MCP request via the appropriate transport."""
        if endpoint.transport == TransportType.LAMBDA:
            return await self._send_lambda(endpoint, request, timeout)
        elif endpoint.transport == TransportType.HTTP:
            return await self._send_http(endpoint, request, timeout)
        elif endpoint.transport == TransportType.WEBSOCKET:
            return await self._send_websocket(endpoint, request, timeout)
        else:
            raise ValueError(f"Unsupported transport: {endpoint.transport}")

    async def _send_lambda(
        self, endpoint: ServerEndpoint,
        request: Dict, timeout: float,
    ) -> Dict[str, Any]:
        """Invoke a Lambda MCP server via boto3."""
        loop = asyncio.get_event_loop()
        invoke_params = {
            "FunctionName": endpoint.lambda_function_name,
            "InvocationType": "RequestResponse",
            "Payload": json.dumps(request),
        }
        if endpoint.lambda_qualifier:
            invoke_params["Qualifier"] = endpoint.lambda_qualifier

        response = await loop.run_in_executor(
            None,
            lambda: self._lambda_client.invoke(**invoke_params),
        )

        payload = json.loads(response["Payload"].read())

        # Handle API Gateway proxy response format
        if isinstance(payload, dict) and "body" in payload:
            return json.loads(payload["body"])
        return payload

    async def _send_http(
        self, endpoint: ServerEndpoint,
        request: Dict, timeout: float,
    ) -> Dict[str, Any]:
        """Send to an HTTP MCP server via aiohttp."""
        url = f"{endpoint.url}/mcp/http"
        async with self._http_session.post(
            url,
            json=request,
            timeout=aiohttp.ClientTimeout(total=timeout),
            headers={"X-MCP-Client": "mangaassist-orchestrator"},
        ) as response:
            response.raise_for_status()
            return await response.json()

    async def _send_websocket(
        self, endpoint: ServerEndpoint,
        request: Dict, timeout: float,
    ) -> Dict[str, Any]:
        """Send to a WebSocket MCP server."""
        ws = self._ws_connections.get(endpoint.name)
        if not ws or ws.closed:
            ws = await self._http_session.ws_connect(
                f"{endpoint.url}/mcp",
                heartbeat=30.0,
                max_msg_size=10 * 1024 * 1024,
            )
            self._ws_connections[endpoint.name] = ws

            # Initialize handshake
            init_req = self._request_builder.build_initialize()
            await ws.send_json(init_req)
            init_resp = await asyncio.wait_for(ws.receive_json(), timeout=5.0)
            logger.info(
                "WebSocket session initialized for %s", endpoint.name
            )

        await ws.send_json(request)
        response = await asyncio.wait_for(ws.receive_json(), timeout=timeout)
        return response

    def get_metrics(self) -> Dict[str, Any]:
        """Return client-level metrics."""
        total = self._metrics["total_calls"]
        return {
            **self._metrics,
            "success_rate": (
                self._metrics["successful_calls"] / total
                if total > 0 else 0.0
            ),
            "avg_latency_ms": (
                self._metrics["total_latency_ms"] / total
                if total > 0 else 0.0
            ),
            "circuit_breaker_states": {
                name: cb.state.value
                for name, cb in self._circuit_breakers.items()
            },
        }

2. ToolDiscoveryClient — Dynamic Tool Resolution

2.1 Discovery Architecture

The ToolDiscoveryClient resolves tool names to server endpoints at runtime. In MangaAssist, this means the orchestrator can discover new tools (e.g., a "manga_preview" tool) without redeployment. Discovery uses a DynamoDB registry with Redis caching for sub-millisecond lookups.

2.2 ToolDiscoveryClient Implementation

"""
ToolDiscoveryClient: Discovers and caches MCP tool definitions
from the central registry and from individual MCP servers.
"""

import json
import time
import asyncio
import logging
import hashlib
from typing import Any, Dict, List, Optional, Tuple
from dataclasses import dataclass, field

import aiohttp
import boto3
import redis

logger = logging.getLogger("mcp-discovery")


@dataclass
class DiscoveredTool:
    """A tool discovered from an MCP server."""
    name: str
    description: str
    input_schema: Dict[str, Any]
    server_name: str
    server_transport: str
    server_url: Optional[str]
    lambda_function_name: Optional[str]
    version: str
    schema_hash: str
    discovered_at: float
    healthy: bool = True

    def to_dict(self) -> Dict[str, Any]:
        return {
            "name": self.name,
            "description": self.description,
            "inputSchema": self.input_schema,
            "serverName": self.server_name,
            "serverTransport": self.server_transport,
            "version": self.version,
            "schemaHash": self.schema_hash,
        }


@dataclass
class DiscoveryConfig:
    """Configuration for the discovery client."""
    registry_table: str = "mangaassist-mcp-registry"
    redis_host: str = "mangaassist-redis.xxxxx.apne1.cache.amazonaws.com"
    redis_port: int = 6379
    cache_ttl_seconds: int = 60
    refresh_interval_seconds: int = 300
    region: str = "ap-northeast-1"
    auto_refresh: bool = True


class ToolDiscoveryClient:
    """
    Discovers MCP tools from the central registry and individual servers.

    Discovery flow:
    1. On startup, fetch all tool registrations from DynamoDB.
    2. Cache them in Redis with TTL.
    3. Periodically refresh (every 5 minutes by default).
    4. On cache miss, fetch from DynamoDB and populate cache.
    5. Optionally, call tools/list on individual servers for schema
       verification.

    Thread safety: This client is designed for async usage within
    a single event loop (the ECS Fargate orchestrator).
    """

    def __init__(self, config: Optional[DiscoveryConfig] = None):
        self._config = config or DiscoveryConfig()
        self._dynamodb = boto3.resource(
            "dynamodb", region_name=self._config.region
        )
        self._table = self._dynamodb.Table(self._config.registry_table)
        self._redis = redis.Redis(
            host=self._config.redis_host,
            port=self._config.redis_port,
            decode_responses=True,
            socket_timeout=1,
            socket_connect_timeout=1,
        )
        self._local_cache: Dict[str, DiscoveredTool] = {}
        self._last_full_refresh: float = 0
        self._refresh_task: Optional[asyncio.Task] = None
        self._schema_hashes: Dict[str, str] = {}

    async def start(self) -> None:
        """Start the discovery client with initial fetch and background refresh."""
        await self.refresh_all()
        if self._config.auto_refresh:
            self._refresh_task = asyncio.create_task(
                self._refresh_loop()
            )
            logger.info(
                "Discovery auto-refresh started (every %ds)",
                self._config.refresh_interval_seconds,
            )

    async def stop(self) -> None:
        """Stop background refresh."""
        if self._refresh_task:
            self._refresh_task.cancel()
            try:
                await self._refresh_task
            except asyncio.CancelledError:
                pass

    async def resolve_tool(self, tool_name: str) -> Optional[DiscoveredTool]:
        """
        Resolve a tool name to its full discovery record.
        Checks local cache -> Redis -> DynamoDB.
        """
        # Level 1: Local in-memory cache
        cached = self._local_cache.get(tool_name)
        if cached and cached.healthy:
            return cached

        # Level 2: Redis cache
        try:
            redis_key = f"mcp:tool:{tool_name}"
            redis_data = self._redis.get(redis_key)
            if redis_data:
                data = json.loads(redis_data)
                tool = self._dict_to_discovered_tool(data)
                if tool.healthy:
                    self._local_cache[tool_name] = tool
                    return tool
        except redis.RedisError as exc:
            logger.warning("Redis lookup failed for %s: %s", tool_name, exc)

        # Level 3: DynamoDB
        try:
            response = self._table.get_item(Key={"tool_name": tool_name})
            item = response.get("Item")
            if item and item.get("status") == "active" and item.get("healthy", True):
                tool = self._dynamo_item_to_discovered_tool(item)
                self._local_cache[tool_name] = tool
                # Populate Redis cache
                try:
                    self._redis.setex(
                        f"mcp:tool:{tool_name}",
                        self._config.cache_ttl_seconds,
                        json.dumps(self._discovered_tool_to_dict(tool)),
                    )
                except redis.RedisError:
                    pass
                return tool
        except Exception as exc:
            logger.error("DynamoDB lookup failed for %s: %s", tool_name, exc)

        return None

    async def list_all_tools(self) -> List[DiscoveredTool]:
        """List all active, healthy tools."""
        return [
            tool for tool in self._local_cache.values()
            if tool.healthy
        ]

    async def refresh_all(self) -> int:
        """
        Refresh the full tool registry from DynamoDB.
        Returns the number of tools discovered.
        """
        logger.info("Refreshing full tool registry from DynamoDB...")
        try:
            response = self._table.scan(
                FilterExpression=boto3.dynamodb.conditions.Attr("status").eq("active"),
            )
            items = response.get("Items", [])

            new_cache: Dict[str, DiscoveredTool] = {}
            for item in items:
                tool = self._dynamo_item_to_discovered_tool(item)
                new_cache[tool.name] = tool

                # Check for schema changes
                old_hash = self._schema_hashes.get(tool.name)
                if old_hash and old_hash != tool.schema_hash:
                    logger.warning(
                        "Schema changed for tool %s: %s -> %s",
                        tool.name, old_hash, tool.schema_hash,
                    )

                self._schema_hashes[tool.name] = tool.schema_hash

            self._local_cache = new_cache
            self._last_full_refresh = time.time()

            # Batch update Redis
            try:
                pipe = self._redis.pipeline()
                for tool in new_cache.values():
                    pipe.setex(
                        f"mcp:tool:{tool.name}",
                        self._config.cache_ttl_seconds,
                        json.dumps(self._discovered_tool_to_dict(tool)),
                    )
                pipe.execute()
            except redis.RedisError as exc:
                logger.warning("Redis batch update failed: %s", exc)

            logger.info("Discovered %d active tools", len(new_cache))
            return len(new_cache)

        except Exception as exc:
            logger.error("Full registry refresh failed: %s", exc)
            return len(self._local_cache)

    async def verify_server_tools(
        self,
        server_name: str,
        server_url: str,
        http_session: aiohttp.ClientSession,
    ) -> List[Dict[str, Any]]:
        """
        Call tools/list on a specific server to verify its live tool
        definitions match the registry. Useful for detecting drift.
        """
        request = {
            "jsonrpc": "2.0",
            "id": "verify",
            "method": "tools/list",
            "params": {},
        }
        try:
            async with http_session.post(
                f"{server_url}/mcp/http",
                json=request,
                timeout=aiohttp.ClientTimeout(total=5),
            ) as response:
                body = await response.json()
                return body.get("result", {}).get("tools", [])
        except Exception as exc:
            logger.warning(
                "Failed to verify tools on %s: %s", server_name, exc
            )
            return []

    async def detect_drift(
        self,
        server_name: str,
        server_url: str,
        http_session: aiohttp.ClientSession,
    ) -> Dict[str, Any]:
        """
        Compare registry definitions with live server definitions.
        Returns a drift report.
        """
        live_tools = await self.verify_server_tools(
            server_name, server_url, http_session
        )
        live_map = {t["name"]: t for t in live_tools}

        registry_tools = [
            t for t in self._local_cache.values()
            if t.server_name == server_name
        ]
        registry_map = {t.name: t for t in registry_tools}

        drift = {
            "server": server_name,
            "checked_at": time.time(),
            "added": [],
            "removed": [],
            "schema_changed": [],
        }

        # Tools in live but not registry
        for name in live_map:
            if name not in registry_map:
                drift["added"].append(name)

        # Tools in registry but not live
        for name in registry_map:
            if name not in live_map:
                drift["removed"].append(name)

        # Schema changes
        for name in set(live_map) & set(registry_map):
            live_schema = json.dumps(
                live_map[name].get("inputSchema", {}), sort_keys=True
            )
            live_hash = hashlib.sha256(live_schema.encode()).hexdigest()[:12]
            if live_hash != registry_map[name].schema_hash:
                drift["schema_changed"].append({
                    "tool": name,
                    "registry_hash": registry_map[name].schema_hash,
                    "live_hash": live_hash,
                })

        drift["has_drift"] = bool(
            drift["added"] or drift["removed"] or drift["schema_changed"]
        )
        return drift

    # --- Background refresh loop ---

    async def _refresh_loop(self) -> None:
        """Periodically refresh the tool registry."""
        while True:
            await asyncio.sleep(self._config.refresh_interval_seconds)
            try:
                count = await self.refresh_all()
                logger.info("Periodic refresh: %d tools", count)
            except Exception as exc:
                logger.error("Periodic refresh failed: %s", exc)

    # --- Conversion helpers ---

    @staticmethod
    def _compute_schema_hash(schema: Dict[str, Any]) -> str:
        """Compute a deterministic hash of a tool's input schema."""
        schema_str = json.dumps(schema, sort_keys=True)
        return hashlib.sha256(schema_str.encode()).hexdigest()[:12]

    def _dynamo_item_to_discovered_tool(
        self, item: Dict[str, Any]
    ) -> DiscoveredTool:
        """Convert a DynamoDB item to a DiscoveredTool."""
        tool_def = item.get("tool_definition", "{}")
        if isinstance(tool_def, str):
            tool_def = json.loads(tool_def)

        input_schema = tool_def.get("inputSchema", {})

        return DiscoveredTool(
            name=item["tool_name"],
            description=tool_def.get("description", ""),
            input_schema=input_schema,
            server_name=item.get("server_name", ""),
            server_transport=item.get("server_transport", "http"),
            server_url=item.get("server_url"),
            lambda_function_name=item.get("lambda_function_name"),
            version=item.get("version", "1.0.0"),
            schema_hash=self._compute_schema_hash(input_schema),
            discovered_at=time.time(),
            healthy=item.get("healthy", True),
        )

    @staticmethod
    def _dict_to_discovered_tool(data: Dict[str, Any]) -> DiscoveredTool:
        """Convert a cached dict to a DiscoveredTool."""
        return DiscoveredTool(
            name=data["name"],
            description=data.get("description", ""),
            input_schema=data.get("input_schema", {}),
            server_name=data.get("server_name", ""),
            server_transport=data.get("server_transport", "http"),
            server_url=data.get("server_url"),
            lambda_function_name=data.get("lambda_function_name"),
            version=data.get("version", "1.0.0"),
            schema_hash=data.get("schema_hash", ""),
            discovered_at=data.get("discovered_at", 0),
            healthy=data.get("healthy", True),
        )

    @staticmethod
    def _discovered_tool_to_dict(tool: DiscoveredTool) -> Dict[str, Any]:
        """Convert a DiscoveredTool to a cacheable dict."""
        return {
            "name": tool.name,
            "description": tool.description,
            "input_schema": tool.input_schema,
            "server_name": tool.server_name,
            "server_transport": tool.server_transport,
            "server_url": tool.server_url,
            "lambda_function_name": tool.lambda_function_name,
            "version": tool.version,
            "schema_hash": tool.schema_hash,
            "discovered_at": tool.discovered_at,
            "healthy": tool.healthy,
        }

3. MCPRequestBuilder — Envelope Construction

3.1 Request Envelope Design

Every MCP call from the orchestrator must be wrapped in a JSON-RPC 2.0 envelope. The MCPRequestBuilder handles request ID generation, method routing, parameter marshaling, and optional schema validation before the request leaves the client.

3.2 MCPRequestBuilder Implementation

"""
MCPRequestBuilder: Constructs MCP JSON-RPC 2.0 request envelopes.
Handles parameter validation, default injection, and envelope sealing.
"""

import json
import time
import logging
import hashlib
from typing import Any, Dict, List, Optional
from dataclasses import dataclass

logger = logging.getLogger("mcp-request-builder")


class MCPRequestBuilder:
    """
    Builds well-formed MCP JSON-RPC 2.0 request envelopes.

    Features:
    - Monotonically increasing request IDs for correlation.
    - Optional schema validation before sending.
    - Default parameter injection from tool definitions.
    - Request metadata for tracing.
    """

    def __init__(self, client_name: str = "mangaassist-orchestrator"):
        self._counter = 0
        self._client_name = client_name

    def next_request_id(self) -> str:
        """Generate the next unique request ID."""
        self._counter += 1
        return f"mcp-{self._counter:08d}-{int(time.time() * 1000) % 100000}"

    def build_initialize(
        self,
        protocol_version: str = "2024-11-05",
    ) -> Dict[str, Any]:
        """Build an MCP initialize request."""
        return {
            "jsonrpc": "2.0",
            "id": self.next_request_id(),
            "method": "initialize",
            "params": {
                "protocolVersion": protocol_version,
                "capabilities": {
                    "tools": {},
                },
                "clientInfo": {
                    "name": self._client_name,
                    "version": "1.0.0",
                },
            },
        }

    def build_tools_list(self) -> Dict[str, Any]:
        """Build a tools/list request to discover available tools."""
        return {
            "jsonrpc": "2.0",
            "id": self.next_request_id(),
            "method": "tools/list",
            "params": {},
        }

    def build_tool_call(
        self,
        request_id: str,
        tool_name: str,
        arguments: Dict[str, Any],
    ) -> Dict[str, Any]:
        """
        Build a tools/call request envelope.

        Args:
            request_id: Pre-generated unique ID for this request.
            tool_name: Name of the MCP tool to invoke.
            arguments: Tool-specific arguments.

        Returns:
            Complete MCP JSON-RPC 2.0 request envelope.
        """
        return {
            "jsonrpc": "2.0",
            "id": request_id,
            "method": "tools/call",
            "params": {
                "name": tool_name,
                "arguments": arguments,
            },
        }

    def build_ping(self) -> Dict[str, Any]:
        """Build a ping request for health checking."""
        return {
            "jsonrpc": "2.0",
            "id": self.next_request_id(),
            "method": "ping",
        }

    def validate_arguments(
        self,
        tool_name: str,
        arguments: Dict[str, Any],
        schema: Dict[str, Any],
    ) -> List[str]:
        """
        Validate tool arguments against the input schema.
        Returns a list of validation error messages (empty if valid).
        """
        errors = []

        # Check required fields
        required = schema.get("required", [])
        for field_name in required:
            if field_name not in arguments:
                errors.append(
                    f"Missing required argument: '{field_name}' "
                    f"for tool '{tool_name}'"
                )

        # Check property types
        properties = schema.get("properties", {})
        for arg_name, arg_value in arguments.items():
            if arg_name not in properties:
                logger.debug(
                    "Unknown argument '%s' for tool '%s' (may be accepted by server)",
                    arg_name, tool_name,
                )
                continue

            prop_def = properties[arg_name]
            expected_type = prop_def.get("type")

            if expected_type == "string" and not isinstance(arg_value, str):
                errors.append(
                    f"Argument '{arg_name}' should be string, "
                    f"got {type(arg_value).__name__}"
                )
            elif expected_type == "integer" and not isinstance(arg_value, int):
                errors.append(
                    f"Argument '{arg_name}' should be integer, "
                    f"got {type(arg_value).__name__}"
                )
            elif expected_type == "boolean" and not isinstance(arg_value, bool):
                errors.append(
                    f"Argument '{arg_name}' should be boolean, "
                    f"got {type(arg_value).__name__}"
                )
            elif expected_type == "array" and not isinstance(arg_value, list):
                errors.append(
                    f"Argument '{arg_name}' should be array, "
                    f"got {type(arg_value).__name__}"
                )

            # Check enum constraints
            enum_values = prop_def.get("enum")
            if enum_values and arg_value not in enum_values:
                errors.append(
                    f"Argument '{arg_name}' value '{arg_value}' "
                    f"not in allowed values: {enum_values}"
                )

            # Check numeric constraints
            if expected_type == "integer":
                minimum = prop_def.get("minimum")
                maximum = prop_def.get("maximum")
                if minimum is not None and isinstance(arg_value, (int, float)):
                    if arg_value < minimum:
                        errors.append(
                            f"Argument '{arg_name}' value {arg_value} "
                            f"below minimum {minimum}"
                        )
                if maximum is not None and isinstance(arg_value, (int, float)):
                    if arg_value > maximum:
                        errors.append(
                            f"Argument '{arg_name}' value {arg_value} "
                            f"above maximum {maximum}"
                        )

        return errors

    def inject_defaults(
        self,
        arguments: Dict[str, Any],
        schema: Dict[str, Any],
    ) -> Dict[str, Any]:
        """
        Inject default values for missing optional arguments.
        Returns a new dict with defaults applied.
        """
        result = dict(arguments)
        properties = schema.get("properties", {})

        for prop_name, prop_def in properties.items():
            if prop_name not in result and "default" in prop_def:
                result[prop_name] = prop_def["default"]

        return result

4. MCPResponseParser — Content Extraction and Error Handling

4.1 Response Parsing Strategy

MCP responses carry tool results as an array of content blocks. Each block can be text (JSON-encoded results), images, or resource references. The MCPResponseParser extracts and classifies responses, converting raw JSON-RPC envelopes into structured ToolCallResult objects.

4.2 MCPResponseParser Implementation

"""
MCPResponseParser: Parses MCP JSON-RPC 2.0 response envelopes.
Extracts content, classifies errors, and provides structured access.
"""

import json
import logging
from typing import Any, Dict, List, Optional, Tuple
from enum import Enum

logger = logging.getLogger("mcp-response-parser")


class MCPErrorCategory(Enum):
    """Classification of MCP errors for retry/handling decisions."""
    RETRYABLE = "retryable"         # Transient; retry may succeed
    FATAL = "fatal"                 # Permanent; retry will not help
    CLIENT_ERROR = "client_error"   # Bad request from client side
    SERVER_ERROR = "server_error"   # Bug or crash in server
    RATE_LIMITED = "rate_limited"   # Back off and retry later
    NOT_FOUND = "not_found"         # Tool or resource missing


# JSON-RPC 2.0 error code ranges
_ERROR_CLASSIFICATION = {
    -32700: MCPErrorCategory.CLIENT_ERROR,    # Parse error
    -32600: MCPErrorCategory.CLIENT_ERROR,    # Invalid request
    -32601: MCPErrorCategory.NOT_FOUND,       # Method not found
    -32602: MCPErrorCategory.CLIENT_ERROR,    # Invalid params
    -32603: MCPErrorCategory.SERVER_ERROR,    # Internal error
    -32000: MCPErrorCategory.RETRYABLE,       # Tool execution error
    -32001: MCPErrorCategory.RETRYABLE,       # Tool timeout
    -32002: MCPErrorCategory.NOT_FOUND,       # Tool not found
    -32003: MCPErrorCategory.RATE_LIMITED,    # Rate limit exceeded
}


class MCPResponseParser:
    """
    Parses MCP response envelopes into structured ToolCallResult objects.

    Handles:
    - Successful results with content blocks.
    - MCP-level errors (isError: true in result).
    - JSON-RPC level errors (error field in response).
    - Malformed responses (missing fields, invalid JSON).
    """

    def parse(
        self,
        raw_response: Dict[str, Any],
        tool_name: str,
        request_id: str,
    ) -> "ToolCallResult":
        """
        Parse a raw MCP JSON-RPC response into a ToolCallResult.

        The response may contain:
        1. result.content[] — success case
        2. result.isError: true — tool-level error
        3. error{} — protocol-level error
        """
        from typing import TYPE_CHECKING
        # Importing ToolCallResult and ToolCallStatus from the MCPClient module
        # In practice these would be in a shared types module

        # Check for JSON-RPC error
        if "error" in raw_response:
            error = raw_response["error"]
            error_code = error.get("code", -32603)
            error_message = error.get("message", "Unknown error")
            category = self.classify_error(error_code)

            return ToolCallResult(
                tool_name=tool_name,
                status=(
                    ToolCallStatus.TOOL_NOT_FOUND
                    if category == MCPErrorCategory.NOT_FOUND
                    else ToolCallStatus.ERROR
                ),
                is_error=True,
                request_id=request_id,
                error_message=f"[{error_code}] {error_message}",
                metadata={
                    "error_code": error_code,
                    "error_category": category.value,
                    "retryable": category in (
                        MCPErrorCategory.RETRYABLE,
                        MCPErrorCategory.RATE_LIMITED,
                    ),
                },
            )

        # Extract result
        result = raw_response.get("result", {})

        # Check for tool-level error (isError flag)
        is_error = result.get("isError", False)
        content = result.get("content", [])

        status = ToolCallStatus.ERROR if is_error else ToolCallStatus.SUCCESS

        return ToolCallResult(
            tool_name=tool_name,
            status=status,
            content=content,
            is_error=is_error,
            request_id=request_id,
            error_message=(
                self._extract_error_text(content) if is_error else None
            ),
        )

    def classify_error(self, error_code: int) -> MCPErrorCategory:
        """Classify a JSON-RPC error code into an actionable category."""
        if error_code in _ERROR_CLASSIFICATION:
            return _ERROR_CLASSIFICATION[error_code]

        # Custom server error range
        if -32099 <= error_code <= -32000:
            return MCPErrorCategory.RETRYABLE

        # Standard server error range
        if -32768 <= error_code <= -32000:
            return MCPErrorCategory.SERVER_ERROR

        return MCPErrorCategory.FATAL

    def is_retryable(self, raw_response: Dict[str, Any]) -> bool:
        """Check if a failed response is worth retrying."""
        if "error" in raw_response:
            code = raw_response["error"].get("code", 0)
            category = self.classify_error(code)
            return category in (
                MCPErrorCategory.RETRYABLE,
                MCPErrorCategory.RATE_LIMITED,
                MCPErrorCategory.SERVER_ERROR,
            )

        # Tool-level errors are generally not retryable
        # (bad arguments, missing data, etc.)
        result = raw_response.get("result", {})
        return not result.get("isError", False)

    @staticmethod
    def _extract_error_text(content: List[Dict[str, Any]]) -> str:
        """Extract error text from content blocks."""
        for block in content:
            if block.get("type") == "text":
                return block.get("text", "Unknown error")
        return "Unknown error (no text content)"

    def extract_all_text(
        self, content: List[Dict[str, Any]]
    ) -> str:
        """Concatenate all text content blocks."""
        texts = []
        for block in content:
            if block.get("type") == "text":
                text = block.get("text", "")
                if text:
                    texts.append(text)
        return "\n".join(texts)

    def extract_json(
        self, content: List[Dict[str, Any]]
    ) -> Optional[Dict[str, Any]]:
        """Extract and parse the first JSON text block."""
        for block in content:
            if block.get("type") == "text":
                text = block.get("text", "")
                try:
                    return json.loads(text)
                except (json.JSONDecodeError, TypeError):
                    continue
        return None

5. MangaAssist MCP Tool Definitions — JSON Examples

5.1 Complete Tool Definition Catalog

These JSON definitions show exactly what the MangaAssist MCP servers advertise via tools/list. The Bedrock Claude 3 model receives these definitions and uses them to decide which tools to call and how to construct arguments.

{
  "tools": [
    {
      "name": "manga_search",
      "description": "Search the MangaAssist catalog for manga titles, volumes, and series. Supports Japanese and English queries with hybrid vector and keyword search. Returns matching manga with titles, prices, availability, cover images, and relevance scores.",
      "inputSchema": {
        "type": "object",
        "properties": {
          "query": {
            "type": "string",
            "description": "Search query in Japanese or English (e.g., 'ワンピース 100巻' or 'One Piece volume 100')"
          },
          "language": {
            "type": "string",
            "enum": ["ja", "en"],
            "default": "ja",
            "description": "Query language for search optimization"
          },
          "max_results": {
            "type": "integer",
            "minimum": 1,
            "maximum": 20,
            "default": 5,
            "description": "Maximum number of results to return"
          },
          "genre": {
            "type": "string",
            "enum": ["shonen", "shojo", "seinen", "josei", "kodomo"],
            "description": "Optional genre filter"
          },
          "in_stock_only": {
            "type": "boolean",
            "default": false,
            "description": "Filter to only in-stock items"
          }
        },
        "required": ["query"]
      }
    },
    {
      "name": "price_lookup",
      "description": "Look up current prices for specific manga by ID or ISBN. Returns retail price, any active discounts, bundle deals, and price history. Prices are in JPY.",
      "inputSchema": {
        "type": "object",
        "properties": {
          "manga_id": {
            "type": "string",
            "description": "MangaAssist product ID (e.g., 'MNG-00123456')"
          },
          "isbn": {
            "type": "string",
            "description": "ISBN-13 of the manga volume"
          },
          "include_history": {
            "type": "boolean",
            "default": false,
            "description": "Include 30-day price history"
          },
          "include_bundles": {
            "type": "boolean",
            "default": true,
            "description": "Include available bundle deals"
          }
        },
        "oneOf": [
          {"required": ["manga_id"]},
          {"required": ["isbn"]}
        ]
      }
    },
    {
      "name": "inventory_check",
      "description": "Check real-time stock availability for one or more manga items. Returns stock count, warehouse location, and estimated restock date if out of stock.",
      "inputSchema": {
        "type": "object",
        "properties": {
          "manga_ids": {
            "type": "array",
            "items": {"type": "string"},
            "minItems": 1,
            "maxItems": 50,
            "description": "List of MangaAssist product IDs to check"
          },
          "warehouse": {
            "type": "string",
            "enum": ["tokyo", "osaka", "all"],
            "default": "all",
            "description": "Specific warehouse to check"
          }
        },
        "required": ["manga_ids"]
      }
    },
    {
      "name": "add_to_cart",
      "description": "Add a manga item to the customer's shopping cart. Validates stock availability before adding. Returns updated cart summary with totals.",
      "inputSchema": {
        "type": "object",
        "properties": {
          "manga_id": {
            "type": "string",
            "description": "MangaAssist product ID"
          },
          "quantity": {
            "type": "integer",
            "minimum": 1,
            "maximum": 10,
            "default": 1,
            "description": "Quantity to add"
          }
        },
        "required": ["manga_id"]
      }
    },
    {
      "name": "checkout",
      "description": "Process checkout for the current cart. Validates stock, calculates total with 10% consumption tax, and creates an order. Returns order confirmation with estimated delivery date.",
      "inputSchema": {
        "type": "object",
        "properties": {
          "shipping_address_id": {
            "type": "string",
            "description": "Saved shipping address ID from user profile"
          },
          "payment_method_id": {
            "type": "string",
            "description": "Saved payment method ID from user profile"
          },
          "gift_wrap": {
            "type": "boolean",
            "default": false,
            "description": "Add gift wrapping (+200 JPY)"
          }
        },
        "required": ["shipping_address_id", "payment_method_id"]
      }
    },
    {
      "name": "manga_recommendations",
      "description": "Generate personalized manga recommendations based on user reading history, preferences, and current trends. Uses collaborative filtering and content-based similarity through OpenSearch kNN.",
      "inputSchema": {
        "type": "object",
        "properties": {
          "user_id": {
            "type": "string",
            "description": "User ID for personalized recommendations"
          },
          "seed_titles": {
            "type": "array",
            "items": {"type": "string"},
            "description": "Manga titles the user likes (for cold start users)"
          },
          "genre_preference": {
            "type": "string",
            "enum": ["shonen", "shojo", "seinen", "josei", "any"],
            "default": "any",
            "description": "Preferred genre filter"
          },
          "count": {
            "type": "integer",
            "minimum": 1,
            "maximum": 20,
            "default": 5,
            "description": "Number of recommendations"
          },
          "exclude_owned": {
            "type": "boolean",
            "default": true,
            "description": "Exclude manga already purchased"
          }
        },
        "required": ["user_id"]
      }
    }
  ]
}

5.2 Example MCP Request/Response for MangaAssist

Request: Search for a manga

{
  "jsonrpc": "2.0",
  "id": "mcp-00000042-78231",
  "method": "tools/call",
  "params": {
    "name": "manga_search",
    "arguments": {
      "query": "鬼滅の刃",
      "language": "ja",
      "max_results": 3,
      "in_stock_only": true
    }
  }
}

Response: Search results

{
  "jsonrpc": "2.0",
  "id": "mcp-00000042-78231",
  "result": {
    "content": [
      {
        "type": "text",
        "text": "{\"query\":\"鬼滅の刃\",\"language\":\"ja\",\"total_results\":3,\"results\":[{\"manga_id\":\"MNG-00045678\",\"title_ja\":\"鬼滅の刃 1\",\"title_en\":\"Demon Slayer Vol. 1\",\"author\":\"吾峠呼世晴\",\"genre\":\"shonen\",\"volume\":1,\"price_jpy\":484,\"in_stock\":true,\"cover_url\":\"https://cdn.mangaassist.jp/covers/kimetsu-01.jpg\",\"relevance_score\":9.82},{\"manga_id\":\"MNG-00045679\",\"title_ja\":\"鬼滅の刃 2\",\"title_en\":\"Demon Slayer Vol. 2\",\"author\":\"吾峠呼世晴\",\"genre\":\"shonen\",\"volume\":2,\"price_jpy\":484,\"in_stock\":true,\"cover_url\":\"https://cdn.mangaassist.jp/covers/kimetsu-02.jpg\",\"relevance_score\":9.45},{\"manga_id\":\"MNG-00045680\",\"title_ja\":\"鬼滅の刃 3\",\"title_en\":\"Demon Slayer Vol. 3\",\"author\":\"吾峠呼世晴\",\"genre\":\"shonen\",\"volume\":3,\"price_jpy\":484,\"in_stock\":true,\"cover_url\":\"https://cdn.mangaassist.jp/covers/kimetsu-03.jpg\",\"relevance_score\":9.21}]}"
      }
    ],
    "isError": false
  }
}

Error Response Example: Tool not found

{
  "jsonrpc": "2.0",
  "id": "mcp-00000043-78232",
  "error": {
    "code": -32002,
    "message": "Tool not found: manga_translate"
  }
}

Error Response Example: Tool execution failed

{
  "jsonrpc": "2.0",
  "id": "mcp-00000044-78233",
  "result": {
    "content": [
      {
        "type": "text",
        "text": "Manga MNG-99999999 not found in catalog"
      }
    ],
    "isError": true
  }
}

6. Circuit Breaker Pattern at the MCP Level

6.1 Circuit Breaker Design

Each MCP server gets its own circuit breaker. When a server fails repeatedly, the circuit opens and subsequent calls fail immediately without consuming Lambda invocations or ECS capacity. This prevents cascade failures at MangaAssist's 1M messages/day scale.

6.2 Circuit Breaker Implementation

"""
CircuitBreaker: Per-server circuit breaker for MCP tool calls.
Prevents cascade failures when an MCP server becomes unhealthy.
"""

import time
import logging
from enum import Enum
from dataclasses import dataclass, field
from typing import Optional

logger = logging.getLogger("mcp-circuit-breaker")


class CircuitState(Enum):
    """Circuit breaker states."""
    CLOSED = "closed"         # Normal operation — requests flow
    OPEN = "open"             # Server unhealthy — requests blocked
    HALF_OPEN = "half_open"   # Probing — limited requests allowed


@dataclass
class CircuitBreaker:
    """
    Circuit breaker with configurable thresholds and recovery timing.

    State transitions:
    - CLOSED -> OPEN: When failure_count >= failure_threshold
    - OPEN -> HALF_OPEN: After recovery_timeout_seconds elapsed
    - HALF_OPEN -> CLOSED: On successful probe request
    - HALF_OPEN -> OPEN: On failed probe request

    For MangaAssist, each MCP server (manga_search Lambda, order_processing
    ECS, etc.) gets its own circuit breaker. This means if the recommendation
    engine is down, search and checkout still work.
    """

    name: str
    failure_threshold: int = 5
    recovery_timeout_seconds: float = 30.0
    half_open_max_calls: int = 1
    success_threshold: int = 2

    _failure_count: int = field(default=0, init=False)
    _success_count: int = field(default=0, init=False)
    _state: CircuitState = field(default=CircuitState.CLOSED, init=False)
    _last_failure_time: float = field(default=0.0, init=False)
    _last_state_change: float = field(default_factory=time.time, init=False)
    _half_open_calls: int = field(default=0, init=False)
    _total_blocked: int = field(default=0, init=False)

    @property
    def state(self) -> CircuitState:
        """Get current state, with automatic OPEN -> HALF_OPEN transition."""
        if self._state == CircuitState.OPEN:
            elapsed = time.time() - self._last_failure_time
            if elapsed >= self.recovery_timeout_seconds:
                self._transition_to(CircuitState.HALF_OPEN)
        return self._state

    @property
    def failure_count(self) -> int:
        return self._failure_count

    @property
    def time_until_recovery_seconds(self) -> float:
        """Time until circuit moves from OPEN to HALF_OPEN."""
        if self._state != CircuitState.OPEN:
            return 0.0
        elapsed = time.time() - self._last_failure_time
        remaining = self.recovery_timeout_seconds - elapsed
        return max(0.0, remaining)

    def allow_request(self) -> bool:
        """Check if a request is allowed through the circuit."""
        current_state = self.state  # Triggers auto-transition

        if current_state == CircuitState.CLOSED:
            return True

        if current_state == CircuitState.HALF_OPEN:
            if self._half_open_calls < self.half_open_max_calls:
                self._half_open_calls += 1
                logger.info(
                    "Circuit %s HALF_OPEN: allowing probe request %d/%d",
                    self.name, self._half_open_calls, self.half_open_max_calls,
                )
                return True
            return False

        # OPEN
        self._total_blocked += 1
        return False

    def record_success(self) -> None:
        """Record a successful request."""
        if self._state == CircuitState.HALF_OPEN:
            self._success_count += 1
            if self._success_count >= self.success_threshold:
                self._transition_to(CircuitState.CLOSED)
                logger.info(
                    "Circuit %s CLOSED after %d successful probes",
                    self.name, self._success_count,
                )
        elif self._state == CircuitState.CLOSED:
            # Reset failure count on success in closed state
            self._failure_count = 0

    def record_failure(self) -> None:
        """Record a failed request."""
        self._failure_count += 1
        self._last_failure_time = time.time()

        if self._state == CircuitState.HALF_OPEN:
            # Probe failed — go back to OPEN
            self._transition_to(CircuitState.OPEN)
            logger.warning(
                "Circuit %s re-OPENED: probe request failed", self.name
            )
        elif self._state == CircuitState.CLOSED:
            if self._failure_count >= self.failure_threshold:
                self._transition_to(CircuitState.OPEN)
                logger.warning(
                    "Circuit %s OPENED after %d consecutive failures",
                    self.name, self._failure_count,
                )

    def force_open(self) -> None:
        """Manually open the circuit (e.g., for maintenance)."""
        self._transition_to(CircuitState.OPEN)
        self._last_failure_time = time.time()
        logger.warning("Circuit %s FORCE OPENED", self.name)

    def force_close(self) -> None:
        """Manually close the circuit (e.g., after maintenance)."""
        self._transition_to(CircuitState.CLOSED)
        logger.info("Circuit %s FORCE CLOSED", self.name)

    def get_stats(self) -> dict:
        """Return circuit breaker statistics."""
        return {
            "name": self.name,
            "state": self.state.value,
            "failure_count": self._failure_count,
            "failure_threshold": self.failure_threshold,
            "success_count": self._success_count,
            "total_blocked": self._total_blocked,
            "time_until_recovery_s": round(
                self.time_until_recovery_seconds, 1
            ),
            "last_state_change": self._last_state_change,
        }

    def _transition_to(self, new_state: CircuitState) -> None:
        """Transition to a new circuit state."""
        old_state = self._state
        self._state = new_state
        self._last_state_change = time.time()

        if new_state == CircuitState.CLOSED:
            self._failure_count = 0
            self._success_count = 0
        elif new_state == CircuitState.HALF_OPEN:
            self._half_open_calls = 0
            self._success_count = 0
        elif new_state == CircuitState.OPEN:
            self._success_count = 0

        logger.info(
            "Circuit %s: %s -> %s",
            self.name, old_state.value, new_state.value,
        )

7. Retry and Backoff Patterns

7.1 Retry Strategy for MCP Tool Calls

"""
RetryManager: Configurable retry logic for MCP tool calls.
Uses exponential backoff with jitter and respects idempotency.
"""

import asyncio
import random
import time
import logging
from typing import Any, Callable, Dict, Optional
from dataclasses import dataclass

logger = logging.getLogger("mcp-retry")


@dataclass
class RetryConfig:
    """Configuration for retry behavior."""
    max_retries: int = 2
    base_delay_ms: float = 100.0
    max_delay_ms: float = 5000.0
    jitter_factor: float = 0.25
    retry_on_timeout: bool = True
    retry_on_server_error: bool = True
    retry_on_rate_limit: bool = True


class RetryManager:
    """
    Manages retry logic for MCP tool calls with exponential backoff.

    Retry decisions are based on error classification:
    - RETRYABLE errors: retry with exponential backoff.
    - RATE_LIMITED errors: retry with longer backoff.
    - CLIENT_ERROR errors: do not retry (bad request).
    - FATAL errors: do not retry (permanent failure).

    Jitter prevents thundering herd when many clients retry simultaneously
    after an MCP server recovers from an outage.
    """

    def __init__(self, config: Optional[RetryConfig] = None):
        self.config = config or RetryConfig()

    def compute_delay_ms(self, attempt: int) -> float:
        """
        Compute the delay before the next retry.
        Uses exponential backoff with decorrelated jitter.
        """
        base = self.config.base_delay_ms * (2 ** attempt)
        capped = min(base, self.config.max_delay_ms)

        # Add jitter: +/- jitter_factor of the delay
        jitter_range = capped * self.config.jitter_factor
        jitter = random.uniform(-jitter_range, jitter_range)
        delay = max(0, capped + jitter)

        return delay

    def should_retry(
        self,
        attempt: int,
        error_category: str,
        is_idempotent: bool = False,
    ) -> bool:
        """
        Decide whether to retry based on attempt number,
        error type, and idempotency.
        """
        if attempt >= self.config.max_retries:
            return False

        if error_category == "retryable":
            return self.config.retry_on_server_error
        elif error_category == "rate_limited":
            return self.config.retry_on_rate_limit
        elif error_category == "server_error":
            return self.config.retry_on_server_error
        elif error_category == "client_error":
            return False
        elif error_category == "fatal":
            return False
        elif error_category == "not_found":
            return False

        # Default: retry if idempotent
        return is_idempotent

    async def execute_with_retry(
        self,
        operation: Callable,
        tool_name: str,
        is_idempotent: bool = False,
    ) -> Any:
        """
        Execute an async operation with retry logic.
        Returns the result or raises the last exception.
        """
        last_exception = None

        for attempt in range(self.config.max_retries + 1):
            try:
                return await operation()
            except Exception as exc:
                last_exception = exc
                error_category = self._classify_exception(exc)

                if not self.should_retry(
                    attempt, error_category, is_idempotent
                ):
                    raise

                delay = self.compute_delay_ms(attempt)
                logger.warning(
                    "Retry %d/%d for tool %s (delay=%.0fms, error=%s): %s",
                    attempt + 1,
                    self.config.max_retries,
                    tool_name,
                    delay,
                    error_category,
                    exc,
                )
                await asyncio.sleep(delay / 1000.0)

        raise last_exception

    @staticmethod
    def _classify_exception(exc: Exception) -> str:
        """Classify an exception into a retry category."""
        if isinstance(exc, asyncio.TimeoutError):
            return "retryable"
        elif isinstance(exc, ConnectionError):
            return "retryable"
        elif isinstance(exc, RuntimeError) and "rate limit" in str(exc).lower():
            return "rate_limited"
        elif isinstance(exc, ValueError):
            return "client_error"
        else:
            return "server_error"

8. Integration Example — Complete MangaAssist Orchestrator Flow

8.1 End-to-End Tool Call in the Orchestrator

"""
Complete example: MangaAssist orchestrator using the MCP client library
to handle a user query that requires multiple tool calls.
"""

import asyncio
import json
import logging

# Assume all MCP client classes are imported from the modules above.

logger = logging.getLogger("mangaassist-orchestrator")


async def handle_user_message(user_message: str, user_id: str):
    """
    Full orchestrator flow for a user message:
    1. Call Bedrock Claude 3 with user message + tool definitions.
    2. If Claude returns tool_use, invoke MCP tools via client library.
    3. Feed tool results back to Claude for final response.
    """

    # --- Initialize MCP client ---
    client = MCPClient(region="ap-northeast-1")
    await client.initialize()

    # Register MCP server endpoints
    client.register_server(ServerEndpoint(
        name="manga-search-lambda",
        transport=TransportType.LAMBDA,
        lambda_function_name="mangaassist-mcp-catalog-search",
        lambda_qualifier="prod",
        timeout_seconds=5.0,
        max_retries=2,
    ))
    client.register_server(ServerEndpoint(
        name="price-lookup-lambda",
        transport=TransportType.LAMBDA,
        lambda_function_name="mangaassist-mcp-price-lookup",
        timeout_seconds=3.0,
        max_retries=1,
    ))
    client.register_server(ServerEndpoint(
        name="order-processing-ecs",
        transport=TransportType.WEBSOCKET,
        url="ws://order-mcp.mangaassist.internal:8080",
        timeout_seconds=10.0,
        max_retries=1,
    ))

    # Register tool-to-server mappings
    client.register_tool_mapping("manga_search", "manga-search-lambda")
    client.register_tool_mapping("price_lookup", "price-lookup-lambda")
    client.register_tool_mapping("add_to_cart", "order-processing-ecs")
    client.register_tool_mapping("checkout", "order-processing-ecs")

    # --- Step 1: Call Bedrock with tools ---
    # (Simplified — in production, use the Bedrock Converse API)
    tool_use_block = {
        "type": "tool_use",
        "id": "toolu_01abc",
        "name": "manga_search",
        "input": {
            "query": user_message,
            "language": "ja",
            "max_results": 5,
        },
    }

    # --- Step 2: Invoke MCP tool via client library ---
    result = await client.call_tool(
        tool_name=tool_use_block["name"],
        arguments=tool_use_block["input"],
    )

    logger.info(
        "Tool call: %s, status=%s, latency=%.0fms, attempts=%d",
        result.tool_name,
        result.status.value,
        result.latency_ms,
        result.attempt_count,
    )

    if result.is_error:
        logger.error("Tool error: %s", result.error_message)
        tool_result_content = f"Tool {result.tool_name} failed: {result.error_message}"
    else:
        tool_result_content = result.get_text()

    # --- Step 3: Feed results back to Claude ---
    # The tool_result_content is passed as a tool_result message
    # back to Bedrock Converse API for Claude to generate
    # the final user-facing response.

    logger.info(
        "Client metrics: %s", json.dumps(client.get_metrics(), indent=2)
    )

    await client.close()

    return tool_result_content


# --- Run the example ---
if __name__ == "__main__":
    result = asyncio.run(
        handle_user_message("鬼滅の刃は在庫ありますか?", "user-12345")
    )
    print(result)

9. Error Handling Decision Matrix

Error Type MCP Error Code Category Retry? Circuit Breaker Impact Fallback Strategy
Parse error -32700 Client No None Fix request format
Invalid request -32600 Client No None Fix request structure
Method not found -32601 Not found No None Update tool registry
Invalid params -32602 Client No None Fix arguments
Internal error -32603 Server Yes (2x) +1 failure Retry with backoff
Tool execution error -32000 Retryable Yes (2x) +1 failure Retry, then cached
Tool timeout -32001 Retryable Yes (1x) +1 failure Return partial data
Tool not found -32002 Not found No None Refresh tool registry
Rate limit exceeded -32003 Rate limited Yes (3x) None Exponential backoff
Network timeout N/A Transport Yes (2x) +1 failure Switch transport
Connection refused N/A Transport Yes (1x) +1 failure Fail to user

10. Key Takeaways

  1. MCPClient is the single entry point: The orchestrator never directly invokes Lambda or calls ECS endpoints. It calls client.call_tool("manga_search", args) and the client handles everything: transport selection, retries, circuit breaking, and response parsing.

  2. ToolDiscoveryClient enables dynamic tool addition: New MCP servers register in DynamoDB, the discovery client picks them up within 5 minutes, and the orchestrator can invoke new tools without redeployment. This decouples tool development from orchestrator releases.

  3. MCPRequestBuilder enforces protocol correctness: Every request gets a unique ID, proper JSON-RPC 2.0 structure, and optional schema validation before it leaves the client. This catches bad arguments before they consume Lambda invocations.

  4. MCPResponseParser classifies errors for retry decisions: Not all errors are retryable. Client errors (bad arguments) should not be retried. Server errors and timeouts should. Rate limit errors need longer backoff. The parser makes this classification automatic.

  5. Per-server circuit breakers isolate failures: If the recommendation MCP server is down, the circuit opens for that server only. Search, price lookup, and ordering continue to work. At 1M messages/day, this isolation is the difference between a partial outage and a total outage.

  6. Exponential backoff with jitter prevents thundering herds: When an MCP server recovers from an outage, all waiting clients would retry simultaneously without jitter. The decorrelated jitter spreads retries over time, giving the server a chance to warm up.

  7. JSON tool definitions drive the FM's tool selection: The quality of tool descriptions and parameter documentation directly affects Claude 3's ability to select the right tool and construct valid arguments. Investing in clear, detailed tool definitions reduces failed tool calls and wasted tokens.

  8. Three-level caching (local memory -> Redis -> DynamoDB) balances speed and freshness: Tool resolution hits the local cache in microseconds, Redis in ~1ms, and DynamoDB in ~5ms. The 60-second TTL ensures tools are fresh enough while avoiding per-request DynamoDB reads at scale.