LOCAL PREVIEW View on GitHub

MCP Protocol for Agent-Tool Interactions

MangaAssist context: JP Manga store chatbot on AWS — Bedrock Claude 3 (Sonnet at $3/$15 per 1M tokens input/output, Haiku at $0.25/$1.25), OpenSearch Serverless (vector store), DynamoDB (sessions/products), ECS Fargate (orchestrator), API Gateway WebSocket, ElastiCache Redis. Target: useful answer in under 3 seconds, 1M messages/day scale.


Skill Mapping

Dimension Detail
Certification AWS AIP-C01 — AI Practitioner
Domain 2 — Development and Implementation of GenAI Solutions
Task 2.1 — Develop agentic AI solutions using AWS services
Skill 2.1.1 — Develop intelligent autonomous systems with appropriate memory and state management capabilities (for example, by using Strands Agents and AWS Agent Squad for multi-agent systems, MCP for agent-tool interactions)
This File Deep-dive into MCP protocol for agent-tool interactions: session memory patterns, tool registration/discovery, parameter validation, and production implementation

Skill Scope Statement

This file focuses on the Model Context Protocol (MCP) as the standardized interface between agents and tools. It covers the MCP transport layer (stdio/SSE), the JSON-RPC message format for tool registration and invocation, parameter validation schemas, session memory patterns that persist across MCP tool calls, and error propagation strategies. MangaAssist uses MCP to connect its Strands Agents to OpenSearch, DynamoDB, and Bedrock tools, providing a consistent contract that decouples agent reasoning from tool implementation.


Mind Map — MCP Agent-Tool Interactions

mindmap
  root((MCP Protocol<br/>Agent-Tool Interactions))
    MCP Architecture
      Transport Layer
        stdio (local processes)
        SSE (Server-Sent Events over HTTP)
        WebSocket (bidirectional streaming)
      Message Format
        JSON-RPC 2.0
        Request / Response / Notification
        Batch Operations
      Connection Lifecycle
        Initialize → List Tools → Invoke → Disconnect
    Tool Registration and Discovery
      Tool Schema Definition
        JSON Schema for Parameters
        Required vs Optional Fields
        Type Constraints and Enums
      Discovery Endpoint
        tools/list Method
        Capability Negotiation
        Version Compatibility
      Dynamic Registration
        Runtime Tool Addition
        Hot Reload Without Restart
        Feature Flag Gating
    Parameter Validation
      Schema-Based Validation
        JSON Schema Draft 2020-12
        Type Checking
        Range and Pattern Constraints
      Pre-Invocation Checks
        Required Field Presence
        Cross-Field Dependency Rules
        Input Sanitization
      Error Responses
        Validation Error Codes
        Human-Readable Messages
        Retry Guidance
    Session Memory Patterns
      MCP Session Context
        Session ID Propagation
        Turn-Scoped Tool State
        Cross-Tool Result Sharing
      Memory Integration
        Redis Short-Term Context
        DynamoDB Long-Term State
        Tool Call History Tracking
      Context Window Management
        Tool Result Summarization
        Selective Memory Injection
        Token Budget Enforcement
    Error Handling
      Timeout Propagation
        Per-Tool Timeout Budgets
        Cascading Timeout Reduction
        Partial Result Return
      Retry Strategies
        Idempotency Keys
        Exponential Backoff
        Circuit Breaker Integration
      Graceful Degradation
        Fallback Tool Selection
        Cached Result Return
        Partial Answer Synthesis

Architecture — MCP Message Flow in MangaAssist

flowchart TB
    subgraph Agent["Strands Agent (MCP Client)"]
        REASON["Reasoning Engine<br/>Claude 3 Sonnet"]
        MCP_CLIENT["MCP Client<br/>JSON-RPC Dispatcher"]
    end

    subgraph MCPServer["MCP Tool Server (ECS Sidecar)"]
        ROUTER["JSON-RPC Router"]
        VALIDATOR["Parameter Validator<br/>JSON Schema"]
        REGISTRY["Tool Registry<br/>5 tools registered"]
    end

    subgraph Tools["Tool Implementations"]
        T1["OpenSearch Vector Search"]
        T2["DynamoDB Product Lookup"]
        T3["ElastiCache Rating Cache"]
        T4["Bedrock Genre Classifier"]
        T5["Order Status Lookup"]
    end

    subgraph Memory["Session Memory"]
        REDIS["ElastiCache Redis<br/>Tool call history<br/>Result cache"]
        DDB["DynamoDB<br/>Cross-session context"]
    end

    REASON -->|"tools/call JSON-RPC"| MCP_CLIENT
    MCP_CLIENT -->|"HTTP/SSE transport"| ROUTER
    ROUTER -->|"Validate params"| VALIDATOR
    VALIDATOR -->|"Route to tool"| REGISTRY
    REGISTRY --> T1 & T2 & T3 & T4 & T5
    T1 & T2 & T3 & T4 & T5 -->|"Tool result"| ROUTER
    ROUTER -->|"JSON-RPC response"| MCP_CLIENT
    MCP_CLIENT -->|"Observation"| REASON

    MCP_CLIENT -->|"Cache result"| REDIS
    MCP_CLIENT -->|"Persist context"| DDB
    REDIS -->|"Check cache"| MCP_CLIENT

1. MCP Protocol Fundamentals

1.1 What Is the Model Context Protocol?

The Model Context Protocol (MCP) is an open standard that defines how AI agents communicate with external tools and data sources. It uses JSON-RPC 2.0 over configurable transports (stdio, SSE, WebSocket). The protocol defines three core capabilities: tools (callable functions), resources (readable data), and prompts (reusable templates).

For MangaAssist, MCP provides a uniform interface that lets any Strands Agent call any tool without knowing the underlying implementation (OpenSearch, DynamoDB, Redis, or Bedrock).

1.2 MCP Transport Configuration

"""
MangaAssist MCP Server — Tool server running as an ECS Fargate sidecar.
Exposes manga-specific tools over SSE transport for Strands Agent consumption.
"""

import json
import logging
import asyncio
from typing import Any, Callable, Awaitable
from dataclasses import dataclass, field

from mcp.server import Server
from mcp.server.sse import SseServerTransport
from mcp.types import (
    Tool,
    TextContent,
    CallToolResult,
    ListToolsResult,
)

logger = logging.getLogger("manga_mcp_server")
logger.setLevel(logging.INFO)


# ---------------------------------------------------------------------------
# MCP Server Initialization
# ---------------------------------------------------------------------------

app = Server("manga-tools")

# Transport: SSE for HTTP-based communication between ECS containers
# The agent container connects to the tool container on port 8080
transport = SseServerTransport("/mcp")


@dataclass
class MCPServerConfig:
    """Configuration for the MangaAssist MCP tool server."""
    host: str = "0.0.0.0"
    port: int = 8080
    max_concurrent_requests: int = 50
    request_timeout_seconds: float = 5.0
    enable_caching: bool = True
    cache_ttl_seconds: int = 300
    log_tool_calls: bool = True


config = MCPServerConfig()


# ---------------------------------------------------------------------------
# Tool Registry — All MangaAssist tools registered here
# ---------------------------------------------------------------------------

TOOL_DEFINITIONS: list[Tool] = [
    Tool(
        name="search_manga_catalog",
        description=(
            "Search the MangaAssist product catalog using semantic vector search. "
            "Returns ranked manga titles matching the query with metadata including "
            "title, author, genre, price, and stock status."
        ),
        inputSchema={
            "type": "object",
            "properties": {
                "query": {
                    "type": "string",
                    "description": "Natural language search query",
                    "minLength": 1,
                    "maxLength": 500,
                },
                "genre": {
                    "type": "string",
                    "description": "Genre filter (shonen, seinen, shojo, josei, kodomo)",
                    "enum": ["shonen", "seinen", "shojo", "josei", "kodomo", ""],
                },
                "author": {
                    "type": "string",
                    "description": "Author name filter",
                    "maxLength": 200,
                },
                "max_results": {
                    "type": "integer",
                    "description": "Maximum results to return",
                    "minimum": 1,
                    "maximum": 20,
                    "default": 5,
                },
                "min_rating": {
                    "type": "number",
                    "description": "Minimum rating filter (0.0-5.0)",
                    "minimum": 0.0,
                    "maximum": 5.0,
                },
            },
            "required": ["query"],
        },
    ),
    Tool(
        name="get_manga_details",
        description=(
            "Retrieve complete details for a specific manga title including price, "
            "availability, volume count, synopsis, and related titles."
        ),
        inputSchema={
            "type": "object",
            "properties": {
                "title_id": {
                    "type": "string",
                    "description": "Unique manga title identifier",
                    "pattern": "^manga-[a-z0-9]{6,12}$",
                },
            },
            "required": ["title_id"],
        },
    ),
    Tool(
        name="lookup_order",
        description=(
            "Look up a customer order by order ID. Returns status, items, "
            "shipping info, and estimated delivery date."
        ),
        inputSchema={
            "type": "object",
            "properties": {
                "order_id": {
                    "type": "string",
                    "description": "Order ID in format MNG-XXXXXXXX",
                    "pattern": "^MNG-[A-Z0-9]{8}$",
                },
            },
            "required": ["order_id"],
        },
    ),
    Tool(
        name="get_recommendations",
        description=(
            "Generate personalized manga recommendations based on user preferences, "
            "reading history, and specified criteria."
        ),
        inputSchema={
            "type": "object",
            "properties": {
                "user_id": {
                    "type": "string",
                    "description": "Authenticated user ID",
                },
                "criteria": {
                    "type": "string",
                    "description": "Natural language criteria for recommendations",
                },
                "exclude_titles": {
                    "type": "array",
                    "items": {"type": "string"},
                    "description": "Title IDs to exclude from results",
                    "maxItems": 50,
                },
                "count": {
                    "type": "integer",
                    "minimum": 1,
                    "maximum": 10,
                    "default": 3,
                },
            },
            "required": ["criteria"],
        },
    ),
    Tool(
        name="check_inventory",
        description=(
            "Check real-time inventory status and pricing for one or more manga titles."
        ),
        inputSchema={
            "type": "object",
            "properties": {
                "title_ids": {
                    "type": "array",
                    "items": {"type": "string"},
                    "description": "List of manga title IDs to check",
                    "minItems": 1,
                    "maxItems": 20,
                },
            },
            "required": ["title_ids"],
        },
    ),
]


@app.list_tools()
async def list_tools() -> list[Tool]:
    """MCP tools/list handler — returns all registered tools."""
    logger.info("Tools listed: %d tools available", len(TOOL_DEFINITIONS))
    return TOOL_DEFINITIONS

2. Tool Invocation and Parameter Validation

2.1 MCP Call Handler with Validation

"""
MCP tools/call handler — validates parameters against JSON Schema,
dispatches to the correct tool implementation, and returns structured results.
"""

import time
import jsonschema
from jsonschema import validate, ValidationError


# ---------------------------------------------------------------------------
# Parameter Validator
# ---------------------------------------------------------------------------

class MCPParameterValidator:
    """
    Validates tool call parameters against the registered JSON Schema.
    Runs before any tool execution to catch invalid inputs early.
    """

    def __init__(self, tools: list[Tool]):
        self._schemas = {tool.name: tool.inputSchema for tool in tools}

    def validate(self, tool_name: str, arguments: dict) -> list[str]:
        """
        Validate arguments against the tool's input schema.
        Returns list of error messages (empty = valid).
        """
        schema = self._schemas.get(tool_name)
        if not schema:
            return [f"Unknown tool: {tool_name}"]

        errors = []
        try:
            validate(instance=arguments, schema=schema)
        except ValidationError as e:
            errors.append(f"Validation error at {e.json_path}: {e.message}")
        except jsonschema.SchemaError as e:
            errors.append(f"Schema error: {e.message}")

        return errors


validator = MCPParameterValidator(TOOL_DEFINITIONS)


# ---------------------------------------------------------------------------
# Tool Call Dispatcher
# ---------------------------------------------------------------------------

@app.call_tool()
async def call_tool(name: str, arguments: dict) -> list[TextContent]:
    """
    MCP tools/call handler.

    Flow:
    1. Validate parameters against JSON Schema
    2. Check session context and cache
    3. Dispatch to tool implementation
    4. Cache result if cacheable
    5. Return structured result
    """
    start_time = time.monotonic()
    request_id = f"mcp-{int(time.time() * 1000)}"

    logger.info(
        "Tool call: name=%s request_id=%s args_keys=%s",
        name, request_id, list(arguments.keys()),
    )

    # Step 1: Validate parameters
    validation_errors = validator.validate(name, arguments)
    if validation_errors:
        logger.warning("Validation failed for %s: %s", name, validation_errors)
        return [TextContent(
            type="text",
            text=json.dumps({
                "error": "VALIDATION_ERROR",
                "messages": validation_errors,
                "tool": name,
                "request_id": request_id,
            }),
        )]

    # Step 2: Check cache for idempotent tools
    if config.enable_caching and name in CACHEABLE_TOOLS:
        cache_key = _build_cache_key(name, arguments)
        cached = await _check_cache(cache_key)
        if cached:
            logger.info("Cache hit for %s (key=%s)", name, cache_key[:16])
            return [TextContent(type="text", text=cached)]

    # Step 3: Dispatch to implementation
    tool_handlers = {
        "search_manga_catalog": _handle_search,
        "get_manga_details": _handle_details,
        "lookup_order": _handle_order_lookup,
        "get_recommendations": _handle_recommendations,
        "check_inventory": _handle_inventory,
    }

    handler = tool_handlers.get(name)
    if not handler:
        return [TextContent(
            type="text",
            text=json.dumps({"error": "TOOL_NOT_FOUND", "tool": name}),
        )]

    try:
        result = await asyncio.wait_for(
            handler(arguments, request_id),
            timeout=config.request_timeout_seconds,
        )
    except asyncio.TimeoutError:
        elapsed_ms = (time.monotonic() - start_time) * 1000
        logger.error("Tool %s timed out after %.0fms", name, elapsed_ms)
        return [TextContent(
            type="text",
            text=json.dumps({
                "error": "TIMEOUT",
                "tool": name,
                "timeout_ms": config.request_timeout_seconds * 1000,
                "request_id": request_id,
            }),
        )]
    except Exception as e:
        elapsed_ms = (time.monotonic() - start_time) * 1000
        logger.error("Tool %s failed after %.0fms: %s", name, elapsed_ms, str(e))
        return [TextContent(
            type="text",
            text=json.dumps({
                "error": "TOOL_EXECUTION_ERROR",
                "tool": name,
                "message": str(e),
                "request_id": request_id,
            }),
        )]

    # Step 4: Cache the result
    result_json = json.dumps(result, default=str)
    if config.enable_caching and name in CACHEABLE_TOOLS:
        await _set_cache(cache_key, result_json)

    elapsed_ms = (time.monotonic() - start_time) * 1000
    logger.info("Tool %s completed in %.0fms", name, elapsed_ms)

    return [TextContent(type="text", text=result_json)]


CACHEABLE_TOOLS = {"search_manga_catalog", "get_manga_details", "check_inventory"}

3. Session Memory Patterns for MCP

3.1 Session Context Propagation Across Tool Calls

"""
MangaAssist MCP session memory — tracks tool call history, caches intermediate
results, and provides cross-tool context sharing within a single session.
"""

import json
import time
import hashlib
import logging
from dataclasses import dataclass, field, asdict
from typing import Any, Optional

import redis

logger = logging.getLogger("manga_mcp_session")


@dataclass
class MCPToolCallRecord:
    """Record of a single MCP tool invocation within a session."""
    tool_name: str
    arguments: dict
    result_summary: str
    success: bool
    latency_ms: float
    timestamp: float = field(default_factory=time.time)
    cache_hit: bool = False
    token_cost_estimate: int = 0


@dataclass
class MCPSessionContext:
    """
    Session-scoped context that flows across MCP tool calls.
    Stored in Redis with the session ID as the key.
    """
    session_id: str
    user_id: str
    tool_call_history: list[MCPToolCallRecord] = field(default_factory=list)
    shared_results: dict[str, Any] = field(default_factory=dict)
    accumulated_latency_ms: float = 0.0
    remaining_budget_ms: float = 3000.0
    total_tool_calls: int = 0
    created_at: float = field(default_factory=time.time)

    def record_tool_call(self, record: MCPToolCallRecord) -> None:
        """Record a tool call and update session metrics."""
        self.tool_call_history.append(record)
        self.accumulated_latency_ms += record.latency_ms
        self.remaining_budget_ms -= record.latency_ms
        self.total_tool_calls += 1

    def share_result(self, key: str, value: Any) -> None:
        """Share a result across tools within the session."""
        self.shared_results[key] = value

    def get_shared_result(self, key: str) -> Optional[Any]:
        """Retrieve a shared result from a previous tool call."""
        return self.shared_results.get(key)

    def get_history_summary(self, max_entries: int = 5) -> str:
        """Generate a summary of recent tool calls for context injection."""
        recent = self.tool_call_history[-max_entries:]
        lines = []
        for record in recent:
            status = "OK" if record.success else "FAILED"
            lines.append(
                f"  [{record.tool_name}] {status} in {record.latency_ms:.0f}ms"
                f" — {record.result_summary[:100]}"
            )
        return "\n".join(lines)

    def has_budget(self, estimated_ms: float = 500.0) -> bool:
        """Check if there is enough latency budget for another tool call."""
        return self.remaining_budget_ms >= estimated_ms


class MCPSessionManager:
    """
    Manages MCP session context in ElastiCache Redis.
    Provides cross-tool result sharing and latency budget tracking.
    """

    SESSION_TTL = 1800  # 30 minutes
    KEY_PREFIX = "manga:mcp:session:"

    def __init__(self, redis_client: redis.Redis):
        self._redis = redis_client

    async def get_or_create(
        self, session_id: str, user_id: str
    ) -> MCPSessionContext:
        """Load existing session context or create a new one."""
        key = f"{self.KEY_PREFIX}{session_id}"
        raw = self._redis.get(key)

        if raw:
            data = json.loads(raw)
            ctx = MCPSessionContext(
                session_id=data["session_id"],
                user_id=data["user_id"],
                shared_results=data.get("shared_results", {}),
                accumulated_latency_ms=data.get("accumulated_latency_ms", 0.0),
                remaining_budget_ms=data.get("remaining_budget_ms", 3000.0),
                total_tool_calls=data.get("total_tool_calls", 0),
                created_at=data.get("created_at", time.time()),
            )
            # Rebuild tool call history from stored records
            for rec in data.get("tool_call_history", []):
                ctx.tool_call_history.append(MCPToolCallRecord(**rec))
            return ctx

        # Create new session
        ctx = MCPSessionContext(session_id=session_id, user_id=user_id)
        await self.save(ctx)
        return ctx

    async def save(self, ctx: MCPSessionContext) -> None:
        """Persist session context to Redis."""
        key = f"{self.KEY_PREFIX}{ctx.session_id}"
        data = {
            "session_id": ctx.session_id,
            "user_id": ctx.user_id,
            "tool_call_history": [asdict(r) for r in ctx.tool_call_history[-20:]],
            "shared_results": ctx.shared_results,
            "accumulated_latency_ms": ctx.accumulated_latency_ms,
            "remaining_budget_ms": ctx.remaining_budget_ms,
            "total_tool_calls": ctx.total_tool_calls,
            "created_at": ctx.created_at,
        }
        self._redis.setex(key, self.SESSION_TTL, json.dumps(data, default=str))

    async def clear(self, session_id: str) -> None:
        """Clear session context."""
        self._redis.delete(f"{self.KEY_PREFIX}{session_id}")

3.2 Cross-Tool Result Sharing

"""
Demonstrates cross-tool result sharing within an MCP session.
When the search tool finds manga IDs, the inventory tool can reuse them
without the agent needing to extract and re-pass the IDs.
"""


async def _handle_search_with_sharing(
    arguments: dict,
    request_id: str,
    session_ctx: MCPSessionContext,
) -> dict:
    """
    Search handler that shares results with the session context.
    Subsequent tools (inventory, ratings) can pick up the found IDs.
    """
    # Execute the search
    results = await _execute_opensearch_query(arguments)

    # Share the found manga IDs with the session
    found_ids = [r["title_id"] for r in results.get("results", [])]
    session_ctx.share_result("last_search_ids", found_ids)
    session_ctx.share_result("last_search_query", arguments.get("query", ""))
    session_ctx.share_result("last_search_results", results.get("results", []))

    return results


async def _handle_inventory_with_sharing(
    arguments: dict,
    request_id: str,
    session_ctx: MCPSessionContext,
) -> dict:
    """
    Inventory handler that can use shared search results.
    If no title_ids provided, falls back to the last search results.
    """
    title_ids = arguments.get("title_ids", [])

    # Fall back to shared search results if no IDs provided
    if not title_ids:
        title_ids = session_ctx.get_shared_result("last_search_ids") or []
        if title_ids:
            logger.info(
                "Inventory check using %d shared IDs from last search",
                len(title_ids),
            )

    if not title_ids:
        return {"error": "No title IDs provided and no recent search results"}

    return await _execute_inventory_check(title_ids)

4. Tool Registration and Dynamic Discovery

4.1 Dynamic Tool Registration

"""
MangaAssist dynamic tool registration — allows adding or removing tools
at runtime without restarting the MCP server. Useful for A/B testing
new tools or feature-flagged capabilities.
"""

import threading
from typing import Callable, Awaitable


class DynamicToolRegistry:
    """
    Thread-safe tool registry that supports runtime registration.
    Backed by the MCP server's tool list.
    """

    def __init__(self):
        self._tools: dict[str, Tool] = {}
        self._handlers: dict[str, Callable] = {}
        self._lock = threading.RLock()
        self._version = 0

    def register(
        self,
        tool: Tool,
        handler: Callable[[dict, str], Awaitable[dict]],
    ) -> None:
        """Register a new tool at runtime."""
        with self._lock:
            self._tools[tool.name] = tool
            self._handlers[tool.name] = handler
            self._version += 1
            logger.info(
                "Tool registered: %s (registry v%d, total=%d)",
                tool.name, self._version, len(self._tools),
            )

    def unregister(self, tool_name: str) -> bool:
        """Remove a tool from the registry."""
        with self._lock:
            if tool_name in self._tools:
                del self._tools[tool_name]
                del self._handlers[tool_name]
                self._version += 1
                logger.info("Tool unregistered: %s (registry v%d)", tool_name, self._version)
                return True
            return False

    def list_tools(self) -> list[Tool]:
        """Return all currently registered tools."""
        with self._lock:
            return list(self._tools.values())

    def get_handler(self, tool_name: str) -> Optional[Callable]:
        """Get the handler for a tool."""
        with self._lock:
            return self._handlers.get(tool_name)

    def has_tool(self, tool_name: str) -> bool:
        """Check if a tool is registered."""
        with self._lock:
            return tool_name in self._tools


# Feature-flagged tool registration example
registry = DynamicToolRegistry()


def register_tools_with_feature_flags(feature_flags: dict) -> None:
    """
    Register tools based on feature flags from DynamoDB or AppConfig.
    Called at startup and on flag changes.
    """
    # Always register core tools
    for tool_def in TOOL_DEFINITIONS:
        registry.register(tool_def, tool_handlers[tool_def.name])

    # Conditionally register experimental tools
    if feature_flags.get("enable_manga_preview_tool", False):
        preview_tool = Tool(
            name="preview_manga_pages",
            description="Preview first 3 pages of a manga volume",
            inputSchema={
                "type": "object",
                "properties": {
                    "title_id": {"type": "string"},
                    "volume": {"type": "integer", "minimum": 1},
                },
                "required": ["title_id", "volume"],
            },
        )
        registry.register(preview_tool, _handle_manga_preview)
        logger.info("Experimental tool 'preview_manga_pages' enabled")

    if feature_flags.get("enable_price_comparison_tool", False):
        price_tool = Tool(
            name="compare_prices",
            description="Compare manga prices across formats (physical, digital, bundle)",
            inputSchema={
                "type": "object",
                "properties": {
                    "title_ids": {
                        "type": "array",
                        "items": {"type": "string"},
                        "minItems": 1,
                        "maxItems": 10,
                    },
                },
                "required": ["title_ids"],
            },
        )
        registry.register(price_tool, _handle_price_comparison)
        logger.info("Experimental tool 'compare_prices' enabled")

5. Error Handling and Timeout Propagation

5.1 Cascading Timeout Budget

"""
MangaAssist MCP timeout propagation — each tool call gets a fraction
of the remaining latency budget. If an early tool is slow, later tools
get a tighter timeout to stay within the 3-second SLA.
"""

from dataclasses import dataclass


@dataclass
class TimeoutBudget:
    """Tracks and distributes the remaining time budget across tool calls."""
    total_budget_ms: float = 3000.0
    start_time: float = 0.0
    reserved_overhead_ms: float = 200.0  # Reserve for response formatting

    def __post_init__(self):
        if self.start_time == 0.0:
            self.start_time = time.monotonic()

    @property
    def elapsed_ms(self) -> float:
        return (time.monotonic() - self.start_time) * 1000

    @property
    def remaining_ms(self) -> float:
        return max(0, self.total_budget_ms - self.elapsed_ms - self.reserved_overhead_ms)

    def allocate(self, estimated_calls_remaining: int = 1) -> float:
        """
        Allocate timeout for the next tool call.
        Splits remaining budget evenly across expected remaining calls.
        """
        if estimated_calls_remaining <= 0:
            return 0.0
        per_call = self.remaining_ms / estimated_calls_remaining
        return max(per_call, 100.0)  # Minimum 100ms per call

    def is_exhausted(self) -> bool:
        return self.remaining_ms <= 100.0


class MCPTimeoutManager:
    """
    Manages timeout budgets for MCP tool calls within a session.
    Integrates with the session context to track cumulative latency.
    """

    # Default timeout per tool type (used when budget is ample)
    DEFAULT_TIMEOUTS_MS = {
        "search_manga_catalog": 800,
        "get_manga_details": 300,
        "lookup_order": 300,
        "get_recommendations": 1000,
        "check_inventory": 200,
    }

    def __init__(self):
        self._budgets: dict[str, TimeoutBudget] = {}

    def get_timeout(
        self,
        session_id: str,
        tool_name: str,
        estimated_remaining_calls: int = 1,
    ) -> float:
        """
        Calculate the timeout for a tool call, respecting the session budget.
        Returns timeout in milliseconds.
        """
        budget = self._budgets.get(session_id)
        if not budget:
            budget = TimeoutBudget()
            self._budgets[session_id] = budget

        # Get the default timeout for this tool
        default_timeout = self.DEFAULT_TIMEOUTS_MS.get(tool_name, 500)

        # Get the budget-allocated timeout
        budget_timeout = budget.allocate(estimated_remaining_calls)

        # Use the smaller of the two
        effective_timeout = min(default_timeout, budget_timeout)

        logger.debug(
            "Timeout for %s: default=%dms, budget=%dms, effective=%dms "
            "(remaining=%dms, elapsed=%dms)",
            tool_name, default_timeout, budget_timeout, effective_timeout,
            budget.remaining_ms, budget.elapsed_ms,
        )

        return effective_timeout

    def create_budget(self, session_id: str, total_ms: float = 3000.0) -> TimeoutBudget:
        """Create a new timeout budget for a session."""
        budget = TimeoutBudget(total_budget_ms=total_ms)
        self._budgets[session_id] = budget
        return budget

    def cleanup(self, session_id: str) -> None:
        """Remove the budget for a completed session."""
        self._budgets.pop(session_id, None)

5.2 Error Propagation Through MCP

"""
MangaAssist MCP error codes and propagation — standardized error
handling that flows from tool to agent to user.
"""

from enum import IntEnum


class MCPErrorCode(IntEnum):
    """Standard MCP error codes used by MangaAssist tools."""
    VALIDATION_ERROR = -32602      # Invalid params (JSON-RPC standard)
    TOOL_NOT_FOUND = -32601        # Method not found (JSON-RPC standard)
    INTERNAL_ERROR = -32603        # Internal tool error
    TIMEOUT = -32001               # Tool execution timeout
    RATE_LIMITED = -32002          # Upstream service rate limited
    CIRCUIT_OPEN = -32003         # Circuit breaker is open
    CACHE_MISS = -32004           # Expected cache hit missed
    DEPENDENCY_FAILURE = -32005   # Downstream service failure
    BUDGET_EXHAUSTED = -32006     # Latency or token budget exhausted


@dataclass
class MCPError:
    """Structured error for MCP tool responses."""
    code: MCPErrorCode
    message: str
    tool_name: str
    retryable: bool = False
    retry_after_ms: int = 0
    fallback_available: bool = False
    details: Optional[dict] = None

    def to_json_rpc(self) -> dict:
        """Convert to JSON-RPC error format."""
        return {
            "code": int(self.code),
            "message": self.message,
            "data": {
                "tool": self.tool_name,
                "retryable": self.retryable,
                "retry_after_ms": self.retry_after_ms,
                "fallback_available": self.fallback_available,
                "details": self.details or {},
            },
        }


def handle_tool_error(
    tool_name: str,
    error: Exception,
    budget: Optional[TimeoutBudget] = None,
) -> MCPError:
    """
    Convert a tool exception to a structured MCP error.
    The agent uses the error metadata to decide whether to retry,
    use a fallback, or report to the user.
    """
    if isinstance(error, asyncio.TimeoutError):
        return MCPError(
            code=MCPErrorCode.TIMEOUT,
            message=f"Tool '{tool_name}' timed out",
            tool_name=tool_name,
            retryable=True if budget and budget.remaining_ms > 200 else False,
            retry_after_ms=100,
            fallback_available=tool_name in FALLBACK_TOOLS,
        )

    if "ThrottlingException" in str(type(error).__name__):
        return MCPError(
            code=MCPErrorCode.RATE_LIMITED,
            message=f"Tool '{tool_name}' is rate limited",
            tool_name=tool_name,
            retryable=True,
            retry_after_ms=1000,
            fallback_available=True,
        )

    if isinstance(error, CircuitOpenError):
        return MCPError(
            code=MCPErrorCode.CIRCUIT_OPEN,
            message=f"Circuit breaker for '{tool_name}' is open",
            tool_name=tool_name,
            retryable=False,
            fallback_available=True,
        )

    return MCPError(
        code=MCPErrorCode.INTERNAL_ERROR,
        message=f"Tool '{tool_name}' failed: {str(error)}",
        tool_name=tool_name,
        retryable=False,
        fallback_available=tool_name in FALLBACK_TOOLS,
        details={"error_type": type(error).__name__},
    )


FALLBACK_TOOLS = {
    "search_manga_catalog": "cached_popular_manga",
    "get_recommendations": "cached_trending_manga",
    "check_inventory": "cached_inventory_snapshot",
}

6. MCP Client Integration in Strands Agent

6.1 Strands Agent with MCP Client

"""
MangaAssist Strands Agent configured to use the MCP tool server.
The agent connects to the MCP server over SSE and discovers tools dynamically.
"""

from strands import Agent
from strands.models.bedrock import BedrockModel
from strands.tools.mcp import MCPClient
from mcp.client.sse import SseServerParameters


# ---------------------------------------------------------------------------
# MCP Client connecting to the tool server sidecar
# ---------------------------------------------------------------------------

mcp_client = MCPClient(
    lambda: SseServerParameters(
        url="http://localhost:8080/mcp",
        headers={"X-Session-ID": "will-be-set-per-request"},
        timeout=5.0,
    )
)

# ---------------------------------------------------------------------------
# Agent using MCP-provided tools
# ---------------------------------------------------------------------------

SYSTEM_PROMPT = """You are MangaAssist, a helpful JP manga store chatbot.
You have access to tools via MCP for searching the catalog, looking up orders,
checking inventory, and making recommendations.

## Tool Usage Guidelines
1. Always search before answering product questions — never guess.
2. Use check_inventory before confirming availability.
3. Limit to 3 tool calls per turn to stay within 3-second budget.
4. If a tool fails, explain what happened and offer alternatives.
5. Share tool results naturally — do not dump raw JSON to the user.
"""

sonnet_model = BedrockModel(
    model_id="anthropic.claude-3-sonnet-20240229-v1:0",
    region_name="us-east-1",
    temperature=0.3,
    max_tokens=1024,
)

# The agent automatically discovers tools from the MCP server
with mcp_client:
    manga_agent = Agent(
        model=sonnet_model,
        system_prompt=SYSTEM_PROMPT,
        tools=[mcp_client],  # MCP client exposes all server tools
        max_turns=10,
    )

7. Comparison — MCP vs Direct Tool Integration vs API Gateway

Dimension MCP Protocol Direct Tool Integration API Gateway Proxy
Interface Contract JSON-RPC 2.0 with JSON Schema validation Custom Python function signatures REST/HTTP with OpenAPI spec
Tool Discovery Dynamic via tools/list — agent discovers at runtime Static imports — hardcoded at build time Static via API spec
Parameter Validation Built-in JSON Schema validation before dispatch Manual validation in each tool function Request validators in APIGW
Transport stdio, SSE, WebSocket — protocol-level flexibility In-process function calls — zero network overhead HTTP only — adds network latency
Session Context Session ID propagated in protocol headers Passed as function arguments Custom headers or query params
Error Handling Standardized JSON-RPC error codes Custom exception handling per tool HTTP status codes
Latency Overhead ~5-15ms per call (SSE transport) ~0ms (in-process) ~20-50ms (HTTP + Lambda cold start)
Cross-Language Yes — any language implementing JSON-RPC Python only (or language-specific) Yes — HTTP is universal
Hot Reload Register/unregister tools without restart Requires code deployment Requires API redeployment
MangaAssist Usage Primary interface between agents and tool server Used for in-agent utility functions Used for external partner APIs
Best For Multi-agent systems with shared tool servers Single-agent, performance-critical paths External service integration
Cost at 1M msgs/day Minimal — lightweight JSON-RPC over localhost Zero additional cost ~$3.50/day for APIGW requests

8. Key Takeaways

  1. MCP provides a standardized contract between agents and tools — JSON-RPC 2.0 with JSON Schema validation ensures that tool parameters are validated before execution, reducing runtime errors. MangaAssist registers 5 tools with full schema definitions including types, patterns, ranges, and required fields.

  2. Session context propagation is built into the MCP flow — each tool call carries the session ID, enabling cross-tool result sharing (search results flow to inventory checks) and cumulative latency tracking. The session context lives in Redis with a 30-minute TTL.

  3. Dynamic tool registration enables safe experimentation — new tools (manga previews, price comparisons) can be registered at runtime behind feature flags. The agent discovers them via tools/list without any code changes to the agent itself.

  4. Cascading timeout budgets keep the system within the 3-second SLA — each tool call gets a timeout calculated from the remaining budget. If the first tool call is slow (800ms), later calls get tighter timeouts to compensate, ensuring the total stays under 3 seconds.

  5. Structured error codes drive agent retry decisions — MCP errors include retryable, retry_after_ms, and fallback_available fields. The agent uses these to decide whether to retry the same tool, switch to a fallback, or synthesize an answer from what it already knows.

  6. Parameter validation catches bad inputs before they waste latency and tokens — the JSON Schema validation layer rejects malformed requests instantly (e.g., invalid order ID format, out-of-range max_results), saving the 500ms+ that a failed tool execution would cost.

  7. MCP vs direct integration is a latency-cost tradeoff — MCP adds ~5-15ms per call but provides discovery, validation, and cross-language support. For MangaAssist at 1M messages/day, the ~10ms overhead per tool call is acceptable given the benefits of standardization.


Next file: 03-scenarios-and-runbooks.md — Five production scenarios with detection, root cause, resolution, and prevention for autonomous agent issues.