LOCAL PREVIEW View on GitHub

MCP Server Scenarios and Runbooks: Troubleshooting Model Extension Frameworks

MangaAssist context: JP Manga store chatbot on AWS — Bedrock Claude 3 (Sonnet at $3/$15 per 1M tokens input/output, Haiku at $0.25/$1.25), OpenSearch Serverless (vector store), DynamoDB (sessions/products), ECS Fargate (orchestrator), API Gateway WebSocket, ElastiCache Redis. Target: useful answer in under 3 seconds, 1M messages/day scale.


Skill Mapping

Attribute Detail
Skill 2.1.7
Description Develop model extension frameworks to enhance FM capabilities
Sub-focus Operational troubleshooting for Lambda MCP and ECS MCP servers
AWS Services Lambda, ECS Fargate, API Gateway, CloudWatch, ElastiCache Redis, X-Ray
MangaAssist Relevance Maintaining <3s response time and 99.9% availability for tool-augmented FM

Mind Map

mindmap
  root((MCP Troubleshooting))
    Scenario 1
      Lambda Cold Start Timeout
      Provisioned Concurrency
      Module Optimization
      Layer Caching
    Scenario 2
      ECS Memory Leak
      Session Cache Growth
      Connection Pool Leak
      Container Restart
    Scenario 3
      Protocol Version Mismatch
      Version Negotiation
      Graceful Degradation
      Client Compatibility
    Scenario 4
      Connection Pool Exhaustion
      Pool Sizing
      Leak Detection
      Backpressure
    Scenario 5
      Capability Drift After Update
      Tool Schema Changes
      Client Cache Invalidation
      Blue-Green Deploy

Scenario 1: Lambda MCP Cold Start Timeout

Problem

The MangaAssist catalog search Lambda MCP server experiences intermittent timeouts when users first interact after a period of inactivity. The tool call fails with a 504 Gateway Timeout after 10 seconds, and the Bedrock agent falls back to answering without catalog data, providing stale or hallucinated manga information.

Impact: During traffic spikes (e.g., after a new manga release announcement at 10:00 JST), 15-20% of first requests per cold container fail. Users see messages like "I don't have current inventory information" instead of actual search results.

Symptoms observed: - CloudWatch Lambda Duration metric shows bimodal distribution: p50 = 200ms, p99 = 9,800ms - API Gateway 504 errors spike at 08:00-10:00 JST (start of business hours) - X-Ray traces show 7-8 seconds spent in "Initialization" segment - Bedrock agent logs show tool_call_failed: timeout for manga_catalog_search

Detection

flowchart TD
    A[CloudWatch Alarm:<br/>Lambda Duration p99 > 8000ms<br/>for 3 consecutive periods] --> B{Check Lambda Metrics}
    B --> C[ColdStart Init Duration > 5000ms?]
    C -->|Yes| D[Cold Start Issue Confirmed]
    C -->|No| E[Check downstream latency<br/>OpenSearch / Redis]

    D --> F{Which phase is slow?}
    F -->|Import time| G["Heavy imports:<br/>opensearch-py, boto3,<br/>numpy (if present)"]
    F -->|Connection setup| H["Backend connections:<br/>OpenSearch TLS handshake,<br/>Redis connect"]
    F -->|Both| I[Combined: imports + connections]

    G --> J[Resolution: Optimize imports]
    H --> K[Resolution: Connection caching]
    I --> L[Resolution: Provisioned concurrency]

    style A fill:#c7131b,color:#fff
    style D fill:#d86613,color:#fff
    style J fill:#1a8c1a,color:#fff
    style K fill:#1a8c1a,color:#fff
    style L fill:#1a8c1a,color:#fff

Root Cause

The Lambda function's cold start is slow due to three compounding factors:

  1. Heavy dependency imports (3-4 seconds): opensearch-py, boto3, requests, and requests-aws4auth import chains load dozens of submodules
  2. TLS connection establishment (2-3 seconds): OpenSearch Serverless requires TLS 1.2 with SigV4 authentication, and the first connection includes certificate verification
  3. Redis connection (0.5-1 second): ElastiCache Redis connection establishment with encryption in transit

Total cold start: 6-8 seconds, leaving only 2-4 seconds of the 10-second API Gateway timeout for actual work.

Resolution

"""
Optimized Lambda MCP server with cold start mitigation.
Applies lazy imports, connection warming, and Provisioned Concurrency support.
"""

import json
import logging
import os
import time

logger = logging.getLogger()
logger.setLevel(logging.INFO)

# ---------- OPTIMIZATION 1: Lazy imports ----------
# Heavy libraries are imported only when first needed, not at module load.
# This shaves 1-2 seconds off cold start for requests that don't need them.

_opensearch_client = None
_redis_client = None

REGION = os.environ.get("AWS_REGION", "ap-northeast-1")
OPENSEARCH_ENDPOINT = os.environ.get("OPENSEARCH_ENDPOINT", "")
REDIS_ENDPOINT = os.environ.get("REDIS_ENDPOINT", "")


def _get_opensearch_client():
    """Lazy-initialize OpenSearch client on first use."""
    global _opensearch_client
    if _opensearch_client is None:
        start = time.time()
        from opensearchpy import OpenSearch, RequestsHttpConnection
        from requests_aws4auth import AWS4Auth
        import boto3

        credentials = boto3.Session().get_credentials()
        aws_auth = AWS4Auth(
            credentials.access_key,
            credentials.secret_key,
            REGION,
            "aoss",
            session_token=credentials.token,
        )
        _opensearch_client = OpenSearch(
            hosts=[{"host": OPENSEARCH_ENDPOINT, "port": 443}],
            http_auth=aws_auth,
            use_ssl=True,
            verify_certs=True,
            connection_class=RequestsHttpConnection,
            timeout=5,
        )
        elapsed = (time.time() - start) * 1000
        logger.info(f"OpenSearch client initialized in {elapsed:.0f}ms")
    return _opensearch_client


def _get_redis_client():
    """Lazy-initialize Redis client on first use."""
    global _redis_client
    if _redis_client is None and REDIS_ENDPOINT:
        start = time.time()
        import redis
        _redis_client = redis.Redis(
            host=REDIS_ENDPOINT, port=6379,
            decode_responses=True, socket_timeout=1,
            socket_connect_timeout=2,
            retry_on_timeout=True,
        )
        # Warm the connection immediately
        try:
            _redis_client.ping()
        except Exception as e:
            logger.warning(f"Redis warm ping failed: {e}")
        elapsed = (time.time() - start) * 1000
        logger.info(f"Redis client initialized in {elapsed:.0f}ms")
    return _redis_client


# ---------- OPTIMIZATION 2: Provisioned Concurrency Warm-Up ----------
# When Provisioned Concurrency is enabled, Lambda calls the handler
# initialization code BEFORE any request arrives. We pre-warm connections.

def _warm_connections():
    """Called during Provisioned Concurrency initialization.
    Pre-establishes connections so the first real request is fast."""
    logger.info("Pre-warming connections for Provisioned Concurrency...")
    try:
        os_client = _get_opensearch_client()
        os_client.info()  # Force actual connection
        logger.info("OpenSearch connection warmed")
    except Exception as e:
        logger.warning(f"OpenSearch warm failed: {e}")

    try:
        r = _get_redis_client()
        if r:
            r.ping()
            logger.info("Redis connection warmed")
    except Exception as e:
        logger.warning(f"Redis warm failed: {e}")


# Trigger warm-up during init phase
if os.environ.get("AWS_LAMBDA_INITIALIZATION_TYPE") == "provisioned-concurrency":
    _warm_connections()


# ---------- OPTIMIZATION 3: Minimal response path for simple requests ----------

# MCP protocol handlers (initialize, tools/list) don't need backend connections.
# Serve them immediately without touching OpenSearch or Redis.

PROTOCOL_VERSION = "2024-11-05"
SERVER_NAME = "mangaassist-catalog-search"
SERVER_VERSION = "1.1.0"

TOOL_DEFINITIONS = [
    {
        "name": "manga_catalog_search",
        "description": "Search manga catalog by title, author, genre, or ISBN.",
        "inputSchema": {
            "type": "object",
            "properties": {
                "query": {"type": "string", "description": "Search query"},
                "genre": {"type": "string", "default": "all"},
                "max_results": {"type": "integer", "default": 5},
            },
            "required": ["query"]
        }
    }
]


def lambda_handler(event, context):
    """Optimized Lambda entry point. Protocol messages served instantly;
    only tool/call triggers backend connections."""
    try:
        body = json.loads(event.get("body", "{}")) if isinstance(event.get("body"), str) else event
        method = body.get("method", "")
        msg_id = body.get("id")
        params = body.get("params", {})

        # FAST PATH: Protocol messages need no backend access
        if method == "initialize":
            return _respond(200, msg_id, {
                "protocolVersion": PROTOCOL_VERSION,
                "capabilities": {"tools": {"listChanged": False}},
                "serverInfo": {"name": SERVER_NAME, "version": SERVER_VERSION}
            })

        if method == "notifications/initialized":
            return {"statusCode": 204, "body": ""}

        if method == "tools/list":
            return _respond(200, msg_id, {"tools": TOOL_DEFINITIONS})

        # SLOW PATH: Tool execution needs backends
        if method == "tools/call":
            tool_name = params.get("name", "")
            arguments = params.get("arguments", {})

            if tool_name == "manga_catalog_search":
                result = _execute_search(arguments)
                return _respond(200, msg_id, result)
            else:
                return _respond(400, msg_id, None, {
                    "code": -32601, "message": f"Unknown tool: {tool_name}"
                })

        return _respond(400, msg_id, None, {
            "code": -32601, "message": f"Unknown method: {method}"
        })

    except Exception as e:
        logger.error(f"Handler error: {e}", exc_info=True)
        return _respond(500, None, None, {
            "code": -32603, "message": "Internal error"
        })


def _execute_search(arguments: dict) -> dict:
    """Execute catalog search -- only called for tools/call."""
    query = arguments.get("query", "")
    start = time.time()

    # Try cache first
    redis = _get_redis_client()
    cache_key = f"search:{query}:{arguments.get('genre', 'all')}"
    if redis:
        try:
            cached = redis.get(cache_key)
            if cached:
                return {
                    "content": [{"type": "text", "text": cached}],
                    "isError": False
                }
        except Exception:
            pass

    # Search OpenSearch
    os_client = _get_opensearch_client()
    search_body = {
        "size": min(arguments.get("max_results", 5), 20),
        "query": {"multi_match": {
            "query": query,
            "fields": ["title^3", "title_jp^3", "author^2", "description"]
        }}
    }

    try:
        response = os_client.search(
            index=os.environ.get("OPENSEARCH_INDEX", "manga-catalog"),
            body=search_body
        )
        hits = response.get("hits", {}).get("hits", [])
        results = [h["_source"] for h in hits]
        elapsed = round((time.time() - start) * 1000, 1)

        payload = json.dumps({
            "results": results,
            "metadata": {"query": query, "latency_ms": elapsed}
        }, ensure_ascii=False, default=str)

        # Cache result
        if redis:
            try:
                redis.setex(cache_key, 300, payload)
            except Exception:
                pass

        return {
            "content": [{"type": "text", "text": payload}],
            "isError": False
        }
    except Exception as e:
        return {
            "content": [{"type": "text", "text": f"Search error: {e}"}],
            "isError": True
        }


def _respond(status, msg_id, result=None, error=None):
    body = {"jsonrpc": "2.0", "id": msg_id}
    if error:
        body["error"] = error
    else:
        body["result"] = result
    return {
        "statusCode": status,
        "headers": {"Content-Type": "application/json"},
        "body": json.dumps(body, default=str)
    }

Prevention

Prevention Measure Implementation Impact
Provisioned Concurrency Set 10 warm instances for catalog-search Eliminates cold starts for base traffic; ~$30/month at 10 instances
Warm-up on init Pre-establish OpenSearch and Redis connections during PC init Saves 3-4s on first request
Lazy imports Only import opensearch-py when tools/call is received Protocol messages (initialize, tools/list) respond in <50ms even cold
Lambda SnapStart Enable for Java/Python runtimes (when available) Checkpoint-restore eliminates JVM/Python init overhead
Scheduled warmer EventBridge rule pings function every 5 minutes Keeps at least 1 container warm outside Provisioned Concurrency
Connection timeout tuning Redis socket_connect_timeout=2s, OpenSearch timeout=5s Fail fast on connection issues instead of hanging to Lambda timeout

Scenario 2: ECS MCP Memory Leak

Problem

The MangaAssist recommendation MCP server running on ECS Fargate gradually consumes all available memory (2 GB) over 3-4 days, eventually triggering an OOM kill and task restart. During the restart (30-60 seconds), recommendation requests fail, and users see "Recommendations are temporarily unavailable."

Impact: Every 3-4 days, recommendations go offline for 30-60 seconds. The restart also causes loss of the in-memory session cache (~8,000 active sessions), forcing Redis fallback reads and degrading latency from 50ms to 200ms for the next hour as the cache re-warms.

Symptoms observed: - ECS task memory utilization climbs linearly: Day 1 = 45%, Day 2 = 60%, Day 3 = 78%, Day 4 = OOM - CloudWatch MemoryUtilization alarm fires at 85% threshold - ECS task STOPPED reason: OutOfMemoryError: Container killed due to memory usage - Post-restart: Redis GET requests spike 10x as in-memory cache is empty

Detection

flowchart TD
    A[CloudWatch Alarm:<br/>ECS MemoryUtilization > 85%<br/>for 5 minutes] --> B{Task restarted recently?}

    B -->|Yes - OOM Kill| C[Check ECS Task<br/>Stopped Reason]
    B -->|No - Still running| D[Capture memory profile<br/>before OOM]

    C --> E[Confirm:<br/>OutOfMemoryError]
    D --> F{Memory growth pattern?}

    F -->|Linear growth| G[Likely: unbounded cache<br/>or accumulating data structure]
    F -->|Sudden spike| H[Likely: large query result<br/>or payload in memory]
    F -->|Step function| I[Likely: connection leak<br/>or thread leak]

    G --> J["Investigate:<br/>1. Session cache size<br/>2. Query result caching<br/>3. Logging buffer<br/>4. Metric accumulation"]
    H --> K["Investigate:<br/>1. Large OpenSearch response<br/>2. Unbounded list in result<br/>3. Binary data in memory"]
    I --> L["Investigate:<br/>1. Redis connection pool<br/>2. OpenSearch connection pool<br/>3. Asyncio task leak"]

    J --> M[Resolution: Add LRU eviction<br/>+ periodic cache flush]
    K --> N[Resolution: Limit result sizes<br/>+ streaming responses]
    L --> O[Resolution: Fix pool config<br/>+ connection cleanup]

    style A fill:#c7131b,color:#fff
    style E fill:#d86613,color:#fff
    style M fill:#1a8c1a,color:#fff
    style N fill:#1a8c1a,color:#fff
    style O fill:#1a8c1a,color:#fff

Root Cause

The memory leak has two contributing causes:

  1. Unbounded session cache: The active_sessions dictionary grows without limit. Each session stores conversation history, viewed items, and preference vectors. At 1M messages/day, ~50K unique sessions accumulate, each averaging 40KB = 2GB.

  2. OpenSearch response accumulation: Large search results (20 items with full metadata) are kept in Python memory even after being serialized to the response. Python's garbage collector doesn't reclaim memory promptly due to circular references in the OpenSearch client response objects.

Resolution

"""
Memory-safe ECS MCP server with bounded caches,
periodic cleanup, and memory monitoring.
"""

import asyncio
import gc
import json
import logging
import os
import sys
import time
import tracemalloc
from collections import OrderedDict
from typing import Optional

logger = logging.getLogger("mcp-recommendation")


class BoundedLRUCache:
    """Memory-bounded LRU cache with automatic eviction and size tracking.
    This replaces the unbounded dict that caused the memory leak."""

    def __init__(self, max_items: int = 10000, max_memory_mb: float = 256):
        self._cache: OrderedDict[str, dict] = OrderedDict()
        self._max_items = max_items
        self._max_memory_bytes = int(max_memory_mb * 1024 * 1024)
        self._current_memory_estimate = 0
        self._eviction_count = 0

    def get(self, key: str) -> Optional[dict]:
        if key in self._cache:
            self._cache.move_to_end(key)
            return self._cache[key]
        return None

    def put(self, key: str, value: dict):
        estimated_size = self._estimate_size(value)

        # Evict by item count
        while len(self._cache) >= self._max_items:
            self._evict_oldest()

        # Evict by memory
        while self._current_memory_estimate + estimated_size > self._max_memory_bytes:
            if not self._cache:
                break
            self._evict_oldest()

        self._cache[key] = value
        self._current_memory_estimate += estimated_size

    def _evict_oldest(self):
        if self._cache:
            key, value = self._cache.popitem(last=False)
            self._current_memory_estimate -= self._estimate_size(value)
            self._current_memory_estimate = max(0, self._current_memory_estimate)
            self._eviction_count += 1

    @staticmethod
    def _estimate_size(obj) -> int:
        """Rough estimate of object memory footprint."""
        try:
            return len(json.dumps(obj, default=str).encode("utf-8"))
        except Exception:
            return 1024  # Default 1KB estimate

    def clear_expired(self, max_age_seconds: int = 1800):
        """Remove entries older than max_age_seconds."""
        now = time.time()
        expired_keys = []
        for key, value in self._cache.items():
            if isinstance(value, dict):
                last_access = value.get("_last_access", value.get("created_at", 0))
                if now - last_access > max_age_seconds:
                    expired_keys.append(key)

        for key in expired_keys:
            value = self._cache.pop(key, None)
            if value:
                self._current_memory_estimate -= self._estimate_size(value)

        if expired_keys:
            logger.info(f"Expired {len(expired_keys)} cache entries")

    def stats(self) -> dict:
        return {
            "items": len(self._cache),
            "max_items": self._max_items,
            "estimated_memory_mb": round(self._current_memory_estimate / (1024 * 1024), 2),
            "max_memory_mb": round(self._max_memory_bytes / (1024 * 1024), 2),
            "total_evictions": self._eviction_count,
        }


class MemoryMonitor:
    """Monitors process memory and triggers cleanup when thresholds are reached."""

    def __init__(self, warning_pct: float = 75, critical_pct: float = 85):
        self.warning_pct = warning_pct
        self.critical_pct = critical_pct
        self._container_memory_limit = self._detect_memory_limit()
        self._snapshots: list[dict] = []
        self._tracemalloc_enabled = False

    @staticmethod
    def _detect_memory_limit() -> int:
        """Detect ECS Fargate container memory limit from cgroup."""
        try:
            with open("/sys/fs/cgroup/memory/memory.limit_in_bytes") as f:
                return int(f.read().strip())
        except (FileNotFoundError, ValueError):
            # Fallback: assume 2GB (common Fargate config)
            return 2 * 1024 * 1024 * 1024

    def get_current_usage(self) -> dict:
        """Get current memory usage stats."""
        import resource
        rusage = resource.getrusage(resource.RUSAGE_SELF)
        rss_bytes = rusage.ru_maxrss * 1024  # maxrss is in KB on Linux

        # Also check /proc/self/status for more accurate numbers
        try:
            with open("/proc/self/status") as f:
                for line in f:
                    if line.startswith("VmRSS:"):
                        rss_bytes = int(line.split()[1]) * 1024
                        break
        except (FileNotFoundError, ValueError):
            pass

        limit = self._container_memory_limit
        usage_pct = (rss_bytes / limit) * 100 if limit > 0 else 0

        return {
            "rss_mb": round(rss_bytes / (1024 * 1024), 1),
            "limit_mb": round(limit / (1024 * 1024), 1),
            "usage_pct": round(usage_pct, 1),
            "gc_counts": gc.get_count(),
            "gc_thresholds": gc.get_threshold(),
        }

    def check_and_cleanup(self, session_cache: BoundedLRUCache) -> dict:
        """Check memory and perform cleanup if needed."""
        usage = self.get_current_usage()
        actions_taken = []

        if usage["usage_pct"] >= self.critical_pct:
            # CRITICAL: Aggressive cleanup
            logger.warning(f"Memory CRITICAL: {usage['usage_pct']}%")

            # Force garbage collection
            collected = gc.collect()
            actions_taken.append(f"gc.collect freed {collected} objects")

            # Evict 50% of session cache
            items_before = session_cache.stats()["items"]
            target = items_before // 2
            while session_cache.stats()["items"] > target:
                session_cache._evict_oldest()
            actions_taken.append(
                f"Evicted {items_before - session_cache.stats()['items']} cache entries"
            )

        elif usage["usage_pct"] >= self.warning_pct:
            # WARNING: Moderate cleanup
            logger.warning(f"Memory WARNING: {usage['usage_pct']}%")

            # Expire old sessions
            session_cache.clear_expired(max_age_seconds=900)  # 15 min
            gc.collect()
            actions_taken.append("Cleared sessions > 15min, ran gc.collect")

        # Record snapshot
        self._snapshots.append({
            "timestamp": time.time(),
            "rss_mb": usage["rss_mb"],
            "usage_pct": usage["usage_pct"],
            "actions": actions_taken,
        })
        # Keep last 100 snapshots
        self._snapshots = self._snapshots[-100:]

        usage["actions_taken"] = actions_taken
        return usage

    def enable_tracemalloc(self):
        """Enable tracemalloc for debugging memory leaks in development."""
        if not self._tracemalloc_enabled:
            tracemalloc.start(25)  # 25 frames deep
            self._tracemalloc_enabled = True
            logger.info("tracemalloc enabled")

    def get_top_allocations(self, count: int = 10) -> list[str]:
        """Get top memory allocators (requires tracemalloc enabled)."""
        if not self._tracemalloc_enabled:
            return ["tracemalloc not enabled"]
        snapshot = tracemalloc.take_snapshot()
        top = snapshot.statistics("lineno")[:count]
        return [str(stat) for stat in top]


async def periodic_memory_check(
    monitor: MemoryMonitor,
    session_cache: BoundedLRUCache,
    interval_s: int = 60,
):
    """Background task that periodically checks memory and cleans up."""
    while True:
        try:
            result = monitor.check_and_cleanup(session_cache)
            logger.info(
                f"Memory check: {result['rss_mb']}MB "
                f"({result['usage_pct']}%), "
                f"cache={session_cache.stats()['items']} items"
            )
        except Exception as e:
            logger.error(f"Memory check failed: {e}")
        await asyncio.sleep(interval_s)


async def periodic_cache_cleanup(
    session_cache: BoundedLRUCache,
    interval_s: int = 300,
    max_age_s: int = 1800,
):
    """Background task that expires old cache entries every 5 minutes."""
    while True:
        try:
            session_cache.clear_expired(max_age_seconds=max_age_s)
        except Exception as e:
            logger.error(f"Cache cleanup failed: {e}")
        await asyncio.sleep(interval_s)

Prevention

Prevention Measure Implementation Impact
BoundedLRUCache Max 10K items and 256MB memory limit with automatic eviction Caps memory growth at a predictable ceiling
Periodic cache expiration Background task every 5 min removes sessions > 30 min old Prevents stale session accumulation
Memory monitor Checks /proc/self/status every 60s, triggers cleanup at 75%/85% Early warning before OOM
Aggressive GC at critical gc.collect() + 50% cache eviction when memory > 85% Emergency pressure relief
ECS task memory headroom Set task memory to 2.5GB (was 2GB) Extra 500MB buffer for spikes
CloudWatch alarm Alert at 70% memory utilization (15 min sustained) Human notification before automated mitigation
tracemalloc in staging Enable in staging environment to profile top allocators Identify new leaks before production

Scenario 3: Protocol Version Mismatch

Problem

After updating the MangaAssist Bedrock agent to use a newer MCP client library (which defaults to protocol version 2025-01-15), the Lambda MCP servers that still advertise 2024-11-05 begin returning initialization errors. The agent cannot establish MCP sessions, and all tool calls fail silently -- the agent proceeds without tools, producing answers without real catalog data.

Impact: 100% of tool-augmented responses degrade to pure LLM generation without real data. Users asking "Is Attack on Titan volume 34 in stock?" get a generic response instead of a live inventory check.

Symptoms observed: - Agent logs: MCP initialization failed: unsupported protocol version - Lambda CloudWatch: all initialize requests return error response - No tools/call invocations in Lambda metrics (session never established) - Bedrock agent traces show tool skipped with initialization_failed status

Detection

flowchart TD
    A[Alert: MCP tool call<br/>success rate drops to 0%] --> B{Check agent logs}

    B --> C[Agent log: 'MCP initialization failed:<br/>unsupported protocol version']
    C --> D{Which side is wrong?}

    D --> E[Client sends:<br/>protocolVersion: 2025-01-15]
    D --> F[Server supports:<br/>protocolVersion: 2024-11-05]

    E --> G{Server version negotiation<br/>handles mismatch?}
    G -->|No - strict check| H["Bug: Server rejects<br/>unknown versions instead<br/>of negotiating down"]
    G -->|Yes - negotiates| I["Bug: Client rejects<br/>server's lower version"]

    H --> J["Fix: Update server to<br/>negotiate compatible version"]
    I --> K["Fix: Update client to<br/>accept server's version"]

    style A fill:#c7131b,color:#fff
    style H fill:#d86613,color:#fff
    style I fill:#d86613,color:#fff
    style J fill:#1a8c1a,color:#fff
    style K fill:#1a8c1a,color:#fff

Root Cause

The Lambda MCP server's handle_initialize function performs a strict equality check on the protocol version:

# BUG: Strict version check rejects any version we don't recognize
def handle_initialize(params):
    client_version = params.get("protocolVersion", "")
    if client_version != "2024-11-05":
        return {"error": {"code": -32602, "message": f"Unsupported version: {client_version}"}}
    # ...

The MCP specification requires servers to negotiate downward: if the client requests a newer version, the server should respond with its highest supported version and let the client decide if it can work with that.

Resolution

"""
Robust MCP protocol version negotiation.
Handles forward and backward compatibility gracefully.
"""

import logging
from typing import Optional

logger = logging.getLogger("mcp-version")

# All versions this server can speak, ordered newest-first
SUPPORTED_VERSIONS = [
    "2024-11-05",  # Current production version
    "2024-09-01",  # Previous version (backward compat)
]

# Minimum version we require clients to support
MINIMUM_CLIENT_VERSION = "2024-09-01"


def negotiate_protocol_version(
    client_requested_version: str,
) -> tuple[Optional[str], Optional[dict]]:
    """
    Negotiate protocol version per MCP specification.

    Rules:
    1. If client requests a version we support, use it.
    2. If client requests a newer version we don't support,
       offer our highest version (client decides compatibility).
    3. If client requests an older version below our minimum,
       return an error.

    Returns:
        (negotiated_version, error_or_none)
    """
    if not client_requested_version:
        # No version specified -- use our default
        logger.warning("Client sent no protocolVersion, using default")
        return SUPPORTED_VERSIONS[0], None

    # Case 1: Exact match -- ideal case
    if client_requested_version in SUPPORTED_VERSIONS:
        logger.info(f"Exact version match: {client_requested_version}")
        return client_requested_version, None

    # Case 2: Client wants newer version than we support
    # Offer our highest version; client can accept or reject
    if client_requested_version > SUPPORTED_VERSIONS[0]:
        offered = SUPPORTED_VERSIONS[0]
        logger.info(
            f"Client requested {client_requested_version}, "
            f"offering {offered} (our highest)"
        )
        return offered, None

    # Case 3: Client wants older version than our minimum
    if client_requested_version < MINIMUM_CLIENT_VERSION:
        error = {
            "code": -32602,
            "message": (
                f"Protocol version {client_requested_version} is too old. "
                f"Minimum supported: {MINIMUM_CLIENT_VERSION}. "
                f"Supported versions: {SUPPORTED_VERSIONS}"
            )
        }
        logger.warning(f"Rejecting old client version: {client_requested_version}")
        return None, error

    # Case 4: Version between our min and max but not in our list
    # Use the nearest lower version we support
    for version in SUPPORTED_VERSIONS:
        if version <= client_requested_version:
            logger.info(
                f"Client requested {client_requested_version}, "
                f"using nearest lower: {version}"
            )
            return version, None

    # Fallback: offer our highest
    return SUPPORTED_VERSIONS[0], None


def handle_initialize(params: dict) -> dict:
    """
    Fixed initialize handler with proper version negotiation.
    Replaces the strict equality check that caused the outage.
    """
    client_version = params.get("protocolVersion", "")
    client_info = params.get("clientInfo", {})

    negotiated_version, error = negotiate_protocol_version(client_version)

    if error:
        logger.error(f"Version negotiation failed: {error}")
        return {"error": error}

    logger.info(
        f"MCP session initialized: client={client_info.get('name', 'unknown')} "
        f"requested={client_version} negotiated={negotiated_version}"
    )

    return {
        "protocolVersion": negotiated_version,
        "capabilities": {
            "tools": {"listChanged": False},
        },
        "serverInfo": {
            "name": "mangaassist-catalog-search",
            "version": "1.2.0",
        }
    }


# ---------- Client-side version handling ----------

def validate_server_version(
    requested_version: str,
    server_response_version: str,
    acceptable_versions: list[str],
) -> tuple[bool, str]:
    """
    Client-side validation of the server's negotiated version.

    The client should accept any version it can speak, even if
    the server offered a lower version than requested.
    """
    if server_response_version in acceptable_versions:
        return True, ""

    # Check if it's a version we can work with (same major.minor)
    req_parts = requested_version.split("-")
    srv_parts = server_response_version.split("-")

    if len(req_parts) >= 2 and len(srv_parts) >= 2:
        # Same year = likely compatible
        if req_parts[0] == srv_parts[0]:
            return True, ""

    return False, (
        f"Server offered {server_response_version} which is not in "
        f"our acceptable versions: {acceptable_versions}"
    )

Prevention

Prevention Measure Implementation Impact
Negotiation, not rejection Server offers its highest compatible version instead of erroring Graceful degradation across versions
Client version tolerance Client accepts any version from same calendar year Forward-compatible with new server releases
Version in health check /mcp/health returns protocol_versions array Monitoring can detect version skew before outages
Canary deployment Update one Lambda alias first, test client compatibility Catch mismatches before full rollout
Integration tests CI pipeline tests client against all supported server versions Prevent regressions in version negotiation
CloudWatch metric Emit VersionNegotiationResult metric (success/failure/downgrade) Dashboard visibility into version health

Scenario 4: Connection Pool Exhaustion

Problem

The ECS MCP recommendation server's Redis connection pool becomes exhausted under sustained load (>200 concurrent requests). New tool calls block waiting for a connection and eventually timeout. The server remains "healthy" (liveness check passes) but cannot process tool requests, causing the agent to receive timeout errors.

Impact: During peak traffic (12:00-13:00 JST lunch break, 18:00-20:00 JST evening peak), 30-40% of recommendation requests fail. The circuit breaker opens after 5 failures, blocking all recommendation requests for 30 seconds. Users see generic responses instead of personalized recommendations.

Symptoms observed: - Redis client logs: redis.exceptions.ConnectionError: Too many connections - MCP tool latency p99 jumps from 200ms to 10,000ms (connection wait timeout) - ECS task CPU drops to near-zero (blocked on connection acquisition) - Circuit breaker state flips to OPEN repeatedly - Redis INFO clients shows connected_clients near maxclients limit

Detection

flowchart TD
    A[Alert: MCP tool latency<br/>p99 > 5000ms for 5 minutes] --> B{Check connection metrics}

    B --> C[Redis INFO clients]
    C --> D{connected_clients near maxclients?}

    D -->|Yes| E[Connection Pool Exhaustion]
    D -->|No| F[Check other bottlenecks<br/>OpenSearch, CPU, network]

    E --> G{Where are connections going?}

    G --> H[Leak: Connections opened<br/>but not returned to pool]
    G --> I[Sizing: Pool too small<br/>for concurrent load]
    G --> J[Starvation: Slow operations<br/>holding connections too long]

    H --> K["Fix:<br/>1. Ensure async with/context managers<br/>2. Add connection timeout<br/>3. Add pool health check"]
    I --> L["Fix:<br/>1. Increase pool size to 50<br/>2. Add connection queuing<br/>3. Add backpressure"]
    J --> M["Fix:<br/>1. Add per-operation timeout<br/>2. Pipeline Redis commands<br/>3. Optimize slow queries"]

    style A fill:#c7131b,color:#fff
    style E fill:#d86613,color:#fff
    style K fill:#1a8c1a,color:#fff
    style L fill:#1a8c1a,color:#fff
    style M fill:#1a8c1a,color:#fff

Root Cause

Three factors combine to exhaust the connection pool:

  1. Pool too small: The Redis connection pool was configured with max_connections=20, but the ECS task handles up to 200 concurrent requests (each needing 1-3 Redis operations)
  2. Connection leak in error paths: When an OpenSearch query fails after acquiring a Redis connection for caching, the Redis connection is not returned to the pool because the exception skips the cleanup code
  3. No backpressure: The server accepts unlimited concurrent requests, even when all connections are occupied

Resolution

"""
Connection pool management with proper sizing, leak prevention,
backpressure, and monitoring for ECS MCP servers.
"""

import asyncio
import logging
import time
from contextlib import asynccontextmanager
from typing import Optional
from dataclasses import dataclass, field

import redis.asyncio as aioredis

logger = logging.getLogger("mcp-connection-pool")


@dataclass
class PoolMetrics:
    """Tracks connection pool health metrics."""
    total_acquisitions: int = 0
    total_releases: int = 0
    total_timeouts: int = 0
    total_errors: int = 0
    peak_concurrent: int = 0
    current_active: int = 0
    _acquisition_times: list = field(default_factory=list)

    def record_acquisition(self, elapsed_ms: float):
        self.total_acquisitions += 1
        self.current_active += 1
        self.peak_concurrent = max(self.peak_concurrent, self.current_active)
        self._acquisition_times.append(elapsed_ms)
        if len(self._acquisition_times) > 1000:
            self._acquisition_times = self._acquisition_times[-1000:]

    def record_release(self):
        self.total_releases += 1
        self.current_active = max(0, self.current_active - 1)

    def record_timeout(self):
        self.total_timeouts += 1

    def stats(self) -> dict:
        times = self._acquisition_times
        avg_wait = sum(times) / len(times) if times else 0
        p99_wait = sorted(times)[int(len(times) * 0.99)] if times else 0
        return {
            "total_acquisitions": self.total_acquisitions,
            "total_releases": self.total_releases,
            "leaked": self.total_acquisitions - self.total_releases,
            "total_timeouts": self.total_timeouts,
            "total_errors": self.total_errors,
            "peak_concurrent": self.peak_concurrent,
            "current_active": self.current_active,
            "avg_wait_ms": round(avg_wait, 1),
            "p99_wait_ms": round(p99_wait, 1),
        }


class ManagedRedisPool:
    """Redis connection pool with leak prevention, monitoring,
    and backpressure."""

    def __init__(
        self,
        host: str,
        port: int = 6379,
        max_connections: int = 50,
        acquire_timeout_s: float = 3.0,
        operation_timeout_s: float = 2.0,
    ):
        self._pool = aioredis.ConnectionPool(
            host=host,
            port=port,
            max_connections=max_connections,
            decode_responses=True,
            socket_timeout=operation_timeout_s,
            socket_connect_timeout=3,
            retry_on_timeout=False,  # We handle retries at the MCP level
        )
        self._client = aioredis.Redis(connection_pool=self._pool)
        self._semaphore = asyncio.Semaphore(max_connections)
        self._acquire_timeout = acquire_timeout_s
        self._metrics = PoolMetrics()
        self._max_connections = max_connections

    @asynccontextmanager
    async def connection(self):
        """
        Acquire a connection from the pool with backpressure.
        Uses a semaphore to prevent over-acquisition and ensures
        connections are ALWAYS returned, even on exceptions.
        """
        start = time.time()
        try:
            # Backpressure: wait for available connection slot
            acquired = await asyncio.wait_for(
                self._semaphore.acquire(),
                timeout=self._acquire_timeout
            )
        except asyncio.TimeoutError:
            self._metrics.record_timeout()
            logger.warning(
                f"Connection pool timeout after {self._acquire_timeout}s "
                f"(active={self._metrics.current_active}/{self._max_connections})"
            )
            raise ConnectionError(
                f"Redis connection pool exhausted "
                f"({self._metrics.current_active} active connections)"
            )

        elapsed = (time.time() - start) * 1000
        self._metrics.record_acquisition(elapsed)

        try:
            yield self._client
        finally:
            # CRITICAL: Always release the semaphore, even on exception.
            # This is the fix for the connection leak.
            self._semaphore.release()
            self._metrics.record_release()

    async def execute(self, operation: str, *args, **kwargs):
        """Execute a Redis operation with managed connection."""
        async with self.connection() as client:
            method = getattr(client, operation)
            return await method(*args, **kwargs)

    async def ping(self) -> bool:
        """Health check that tests an actual connection."""
        try:
            async with self.connection() as client:
                return await client.ping()
        except Exception:
            return False

    async def close(self):
        """Close all connections in the pool."""
        await self._client.close()
        await self._pool.disconnect()

    @property
    def stats(self) -> dict:
        pool_info = {
            "max_connections": self._max_connections,
            "pool_created_connections": self._pool._created_connections
            if hasattr(self._pool, '_created_connections') else "unknown",
        }
        return {**self._metrics.stats(), **pool_info}


# ---------- Usage in MCP tool handler (with leak prevention) ----------

async def execute_recommendation_with_safe_pool(
    arguments: dict,
    redis_pool: ManagedRedisPool,
    opensearch_client,
) -> dict:
    """
    Recommendation handler that properly manages Redis connections.
    The async-with block ensures connections are returned even if
    OpenSearch or other code raises an exception.
    """
    user_id = arguments["user_id"]
    count = arguments.get("count", 5)

    # Phase 1: Check Redis cache (connection acquired and released here)
    cached_result = None
    try:
        async with redis_pool.connection() as redis:
            cached = await redis.get(f"reco:{user_id}")
            if cached:
                cached_result = json.loads(cached)
    except ConnectionError:
        # Pool exhausted -- proceed without cache
        logger.warning("Redis pool exhausted, skipping cache check")
    except Exception as e:
        logger.warning(f"Redis cache check failed: {e}")

    if cached_result:
        return {
            "content": [{"type": "text", "text": json.dumps(cached_result, ensure_ascii=False)}],
            "isError": False
        }

    # Phase 2: Query OpenSearch (no Redis connection held!)
    try:
        response = await opensearch_client.search(
            index="manga-catalog",
            body={"size": count, "query": {"match_all": {}}}
        )
        results = [h["_source"] for h in response["hits"]["hits"]]
    except Exception as e:
        # If OpenSearch fails here, we have NOT leaked a Redis connection
        # because the Redis connection was already released in Phase 1
        return {
            "content": [{"type": "text", "text": f"Search failed: {e}"}],
            "isError": True
        }

    result_payload = {"recommendations": results, "user_id": user_id}

    # Phase 3: Cache result in Redis (separate connection acquisition)
    try:
        async with redis_pool.connection() as redis:
            await redis.setex(
                f"reco:{user_id}", 600,
                json.dumps(result_payload, ensure_ascii=False, default=str)
            )
    except ConnectionError:
        logger.warning("Redis pool exhausted, skipping cache write")
    except Exception as e:
        logger.warning(f"Redis cache write failed: {e}")

    return {
        "content": [{"type": "text", "text": json.dumps(result_payload, ensure_ascii=False)}],
        "isError": False
    }


import json  # Required for the handler above

Prevention

Prevention Measure Implementation Impact
Semaphore-based backpressure asyncio.Semaphore(max_connections) gates pool access Prevents over-acquisition; callers wait instead of erroring
Acquire timeout 3-second timeout on semaphore acquisition Fast failure with clear error message instead of hanging
Context manager pattern async with pool.connection() ensures release in finally Eliminates connection leaks on exceptions
Phase separation Acquire/release Redis connection per phase, not per request Don't hold connections during slow OpenSearch queries
Pool metrics Track acquisitions, releases, leaked count, wait times Dashboard shows pool health and leak detection
Sizing formula max_connections = max_concurrent_requests * redis_ops_per_request * 1.5 Right-sized pool: 200 * 1.5 * 1.5 = 450, capped at 50 per task (10 tasks)
ElastiCache maxclients Set to pool_size_per_task * task_count * 2 = 50 * 10 * 2 = 1000 Headroom on Redis side

Scenario 5: Capability Drift After Update

Problem

After deploying a new version of the manga_catalog_search Lambda MCP server that adds a new language parameter and renames genre to category, the MCP client (Bedrock agent orchestrator) continues using the old tool schema. Tool calls fail because the agent sends genre (which no longer exists) instead of category, and never sends the new required language parameter.

Impact: All catalog search requests return validation errors. The agent retries with the same incorrect arguments, wastes token budget on retries, and eventually gives up. Users cannot search the manga catalog.

Symptoms observed: - Lambda logs: Missing required argument: 'language' and Unknown argument: 'genre' - Agent retry count increases (3 retries per tool call, all failing) - Token usage spikes (each retry consumes input/output tokens) - Bedrock model invocation cost increases ~3x due to retries

Detection

flowchart TD
    A[Alert: manga_catalog_search<br/>error rate > 95% for 10 min] --> B{Check error messages}

    B --> C["'Missing required argument: language'<br/>'Unknown argument: genre'"]
    C --> D{Recent deployment?}

    D -->|Yes| E[Schema Changed in New Version]
    D -->|No| F[Check for config drift]

    E --> G{Client using cached schema?}
    G -->|Yes| H["Root Cause:<br/>Client cached old tool definitions<br/>from previous tools/list call"]
    G -->|No| I["Root Cause:<br/>Server schema changed without<br/>backward compatibility"]

    H --> J["Fix 1: Force client<br/>to re-discover tools"]
    I --> K["Fix 2: Restore backward<br/>compatibility in server"]

    J --> L["Long-term: Implement<br/>listChanged notification"]
    K --> L

    style A fill:#c7131b,color:#fff
    style E fill:#d86613,color:#fff
    style J fill:#1a8c1a,color:#fff
    style K fill:#1a8c1a,color:#fff
    style L fill:#1a8c1a,color:#fff

Root Cause

The server deployment introduced a breaking schema change without backward compatibility:

  1. Renamed parameter: genre was renamed to category (breaking: old clients send genre)
  2. New required parameter: language was added as required (breaking: old clients don't know about it)
  3. No listChanged notification: The server didn't signal that its tool definitions changed
  4. Client caches stale schema: The MCP client cached tool definitions from the initial tools/list call and never refreshes them

Resolution

"""
Backward-compatible MCP tool schema evolution.
Demonstrates how to add, rename, and deprecate parameters
without breaking existing clients.
"""

import json
import logging
import time
from typing import Any

logger = logging.getLogger("mcp-schema-evolution")


# ---------- Strategy 1: Backward-Compatible Schema ----------

TOOL_DEFINITIONS_V2 = [
    {
        "name": "manga_catalog_search",
        "description": (
            "Search the MangaAssist catalog by title, author, category/genre, "
            "or ISBN. Returns matching manga with prices, ratings, and availability. "
            "Supports Japanese and English language results."
        ),
        "inputSchema": {
            "type": "object",
            "properties": {
                "query": {
                    "type": "string",
                    "description": "Search query: title, author, keywords, or ISBN"
                },
                # NEW parameter: category (replaces genre)
                "category": {
                    "type": "string",
                    "enum": ["shonen", "shojo", "seinen", "josei", "kodomo", "all"],
                    "default": "all",
                    "description": "Filter by manga category (preferred over 'genre')"
                },
                # DEPRECATED but still accepted: genre (alias for category)
                "genre": {
                    "type": "string",
                    "enum": ["shonen", "shojo", "seinen", "josei", "kodomo", "all"],
                    "default": "all",
                    "description": "[DEPRECATED: use 'category'] Filter by manga genre"
                },
                # NEW parameter: language (optional with default, NOT required)
                "language": {
                    "type": "string",
                    "enum": ["ja", "en", "both"],
                    "default": "both",
                    "description": "Language for results (ja=Japanese, en=English, both=all)"
                },
                "max_results": {
                    "type": "integer",
                    "default": 5,
                    "minimum": 1,
                    "maximum": 20
                },
                "in_stock_only": {
                    "type": "boolean",
                    "default": False
                }
            },
            # NOTE: Only 'query' is required. 'language' has a default.
            # 'genre' is still accepted for backward compatibility.
            "required": ["query"]
        }
    }
]


def normalize_arguments(tool_name: str, arguments: dict) -> dict:
    """
    Normalize tool arguments to handle deprecated parameter names
    and missing optional parameters with defaults.

    This adapter layer sits between the MCP protocol handler and
    the actual tool implementation.
    """
    if tool_name != "manga_catalog_search":
        return arguments

    normalized = dict(arguments)

    # Handle genre -> category migration
    if "genre" in normalized and "category" not in normalized:
        normalized["category"] = normalized.pop("genre")
        logger.info("Migrated deprecated 'genre' parameter to 'category'")
    elif "genre" in normalized and "category" in normalized:
        # Both provided: prefer category, discard genre
        normalized.pop("genre")
        logger.info("Both 'genre' and 'category' provided, using 'category'")
    elif "genre" not in normalized and "category" not in normalized:
        normalized["category"] = "all"

    # Apply defaults for new optional parameters
    if "language" not in normalized:
        normalized["language"] = "both"

    return normalized


# ---------- Strategy 2: Schema Version Tracking ----------

class SchemaVersionManager:
    """Tracks tool schema versions and detects drift between
    client expectations and server definitions."""

    def __init__(self):
        self._schema_versions: dict[str, str] = {}  # tool_name -> schema_hash
        self._version_history: list[dict] = []

    def register_schema(self, tool_name: str, schema: dict) -> str:
        """Register a tool schema and return its version hash."""
        import hashlib
        schema_str = json.dumps(schema, sort_keys=True)
        schema_hash = hashlib.sha256(schema_str.encode()).hexdigest()[:12]

        old_hash = self._schema_versions.get(tool_name)
        if old_hash and old_hash != schema_hash:
            logger.warning(
                f"Schema change detected for {tool_name}: "
                f"{old_hash} -> {schema_hash}"
            )
            self._version_history.append({
                "tool": tool_name,
                "old_version": old_hash,
                "new_version": schema_hash,
                "timestamp": time.time(),
            })

        self._schema_versions[tool_name] = schema_hash
        return schema_hash

    def get_schema_version(self, tool_name: str) -> str:
        return self._schema_versions.get(tool_name, "unknown")

    def check_client_compatibility(
        self,
        tool_name: str,
        client_schema_hash: str,
    ) -> dict:
        """Check if client's cached schema matches current server schema."""
        current = self._schema_versions.get(tool_name, "unknown")
        is_compatible = client_schema_hash == current or client_schema_hash == "unknown"

        return {
            "compatible": is_compatible,
            "client_version": client_schema_hash,
            "server_version": current,
            "action": "none" if is_compatible else "refresh_tools_list"
        }


# ---------- Strategy 3: Capability Drift Detection ----------

class CapabilityDriftDetector:
    """Detects when client and server tool definitions are out of sync.
    Runs as a periodic check in the MCP client."""

    def __init__(self, refresh_interval_s: int = 300):
        self._cached_tools: dict[str, dict] = {}  # server -> tools_list result
        self._last_refresh: dict[str, float] = {}
        self._refresh_interval = refresh_interval_s
        self._drift_events: list[dict] = []

    async def check_and_refresh(
        self,
        server_name: str,
        server_endpoint: str,
        http_client,
    ) -> dict:
        """Check if tool definitions have changed on the server."""
        last = self._last_refresh.get(server_name, 0)
        now = time.time()

        if now - last < self._refresh_interval:
            return {"action": "none", "reason": "within refresh interval"}

        # Fetch current tool definitions from server
        try:
            response = await http_client.post(
                server_endpoint,
                json={
                    "jsonrpc": "2.0",
                    "id": int(now),
                    "method": "tools/list",
                    "params": {}
                },
                timeout=5
            )
            result = response.json().get("result", {})
            new_tools = result.get("tools", [])
        except Exception as e:
            logger.warning(f"Failed to refresh tools from {server_name}: {e}")
            return {"action": "error", "reason": str(e)}

        self._last_refresh[server_name] = now

        # Compare with cached version
        old_tools = self._cached_tools.get(server_name, [])
        drift = self._detect_drift(old_tools, new_tools)

        self._cached_tools[server_name] = new_tools

        if drift["has_drift"]:
            self._drift_events.append({
                "server": server_name,
                "timestamp": now,
                "changes": drift["changes"],
            })
            logger.warning(
                f"Capability drift detected on {server_name}: "
                f"{json.dumps(drift['changes'])}"
            )

        return drift

    @staticmethod
    def _detect_drift(old_tools: list, new_tools: list) -> dict:
        """Compare old and new tool definitions to find changes."""
        old_map = {t["name"]: t for t in old_tools}
        new_map = {t["name"]: t for t in new_tools}

        changes = []

        # Tools added
        for name in new_map:
            if name not in old_map:
                changes.append({"type": "tool_added", "tool": name})

        # Tools removed
        for name in old_map:
            if name not in new_map:
                changes.append({"type": "tool_removed", "tool": name})

        # Tools modified
        for name in set(old_map) & set(new_map):
            old_schema = json.dumps(old_map[name].get("inputSchema", {}), sort_keys=True)
            new_schema = json.dumps(new_map[name].get("inputSchema", {}), sort_keys=True)
            if old_schema != new_schema:
                # Identify specific parameter changes
                old_props = old_map[name].get("inputSchema", {}).get("properties", {})
                new_props = new_map[name].get("inputSchema", {}).get("properties", {})

                added_params = set(new_props) - set(old_props)
                removed_params = set(old_props) - set(new_props)
                modified_params = [
                    p for p in set(old_props) & set(new_props)
                    if json.dumps(old_props[p], sort_keys=True) != json.dumps(new_props[p], sort_keys=True)
                ]

                changes.append({
                    "type": "schema_changed",
                    "tool": name,
                    "added_params": list(added_params),
                    "removed_params": list(removed_params),
                    "modified_params": modified_params,
                })

                # Check if required fields changed
                old_req = set(old_map[name].get("inputSchema", {}).get("required", []))
                new_req = set(new_map[name].get("inputSchema", {}).get("required", []))
                if old_req != new_req:
                    changes.append({
                        "type": "required_fields_changed",
                        "tool": name,
                        "added_required": list(new_req - old_req),
                        "removed_required": list(old_req - new_req),
                    })

        return {
            "has_drift": len(changes) > 0,
            "changes": changes,
        }


# ---------- Strategy 4: Blue-Green Deployment for Schema Changes ----------

def create_blue_green_tool_definitions(
    blue_tools: list[dict],
    green_tools: list[dict],
    active_color: str = "green",
) -> list[dict]:
    """
    During a blue-green deployment, serve both old and new tool
    definitions to allow gradual client migration.

    Phase 1: Deploy green with backward-compatible schema
    Phase 2: Verify all clients work with green
    Phase 3: Remove blue (old schema) support
    """
    if active_color == "green":
        return green_tools
    elif active_color == "blue":
        return blue_tools
    else:
        # Canary: merge both, preferring green where names overlap
        merged = {}
        for tool in blue_tools:
            merged[tool["name"]] = tool
        for tool in green_tools:
            merged[tool["name"]] = tool  # Green overwrites blue
        return list(merged.values())

Prevention

Prevention Measure Implementation Impact
Never make parameters required in updates New parameters always have defaults; language defaults to "both" Old clients work without changes
Alias deprecated parameters normalize_arguments() maps genre to category transparently Smooth migration without client coordination
Schema version tracking SHA-256 hash of tool schema included in health endpoint Monitoring detects schema changes immediately
Periodic tools/list refresh Client re-fetches tool definitions every 5 minutes Stale cache window limited to 5 minutes max
Drift detection Compare old vs new tool definitions field-by-field Automated alerting on breaking changes
Blue-green deployment Serve both old and new schemas during transition Zero-downtime schema evolution
Integration test matrix Test old client against new server AND new client against old server Catch incompatibilities in CI pipeline

Scenario Comparison Summary

# Scenario Severity Detection Time Resolution Time Root Cause Category
1 Lambda MCP Cold Start Timeout Medium 5 min (CloudWatch alarm) 1 hour (Provisioned Concurrency) Performance / Configuration
2 ECS MCP Memory Leak High 1-4 days (gradual) 30 min (code fix + deploy) Resource Management
3 Protocol Version Mismatch Critical Immediate (100% failure) 15 min (rollback or fix) Compatibility / Protocol
4 Connection Pool Exhaustion High 5 min (latency spike) 2 hours (pool redesign) Resource Management
5 Capability Drift After Update Critical 10 min (error rate alert) 30 min (backward compat fix) Deployment / Schema

Cross-Scenario Prevention Checklist

flowchart LR
    subgraph "Before Deployment"
        A[Integration test matrix<br/>client x server versions]
        B[Schema backward<br/>compat check]
        C[Load test with<br/>production traffic shape]
    end

    subgraph "During Deployment"
        D[Canary deployment<br/>5% traffic first]
        E[Blue-green for<br/>schema changes]
        F[Real-time error rate<br/>monitoring]
    end

    subgraph "After Deployment"
        G[Periodic drift detection<br/>every 5 minutes]
        H[Memory monitoring<br/>every 60 seconds]
        I[Connection pool metrics<br/>continuous]
    end

    A --> D
    B --> E
    C --> F
    D --> G
    E --> H
    F --> I

    style A fill:#3b48cc,color:#fff
    style B fill:#3b48cc,color:#fff
    style C fill:#3b48cc,color:#fff
    style D fill:#d86613,color:#fff
    style E fill:#d86613,color:#fff
    style F fill:#d86613,color:#fff
    style G fill:#1a8c1a,color:#fff
    style H fill:#1a8c1a,color:#fff
    style I fill:#1a8c1a,color:#fff

Key Takeaways

# Takeaway
1 Lambda cold starts are the top MCP latency killer -- Provisioned Concurrency, lazy imports, and connection pre-warming reduce cold start from 8s to <1s for MangaAssist's catalog search.
2 Unbounded in-memory caches cause predictable OOM on ECS -- the BoundedLRUCache with item count AND memory byte limits prevents the 3-4 day memory leak cycle.
3 Protocol version negotiation must be permissive -- servers should offer their highest compatible version, not reject unknown versions. A strict equality check caused 100% tool failure.
4 Connection pools need semaphore-based backpressure -- without it, 200 concurrent requests overwhelm a 20-connection Redis pool. The async with pool.connection() pattern prevents leaks in error paths.
5 Tool schema changes must be backward compatible -- never add required parameters (use defaults), alias renamed parameters (genre -> category), and run drift detection every 5 minutes.
6 Phase-separated connection usage prevents compound failures -- acquire Redis for cache check, release, then query OpenSearch, then acquire Redis again for cache write. Never hold connections across slow operations.
7 Three-tier health checks catch different failure modes -- liveness catches process death, readiness catches initialization failures, deep checks catch dependency degradation.
8 Every scenario has a monitoring signal that precedes the outage -- cold start duration bimodal distribution, linear memory growth, version negotiation failures, pool wait time increase, and schema hash mismatches are all detectable before user impact.