MCP Server Scenarios and Runbooks: Troubleshooting Model Extension Frameworks
MangaAssist context: JP Manga store chatbot on AWS — Bedrock Claude 3 (Sonnet at $3/$15 per 1M tokens input/output, Haiku at $0.25/$1.25), OpenSearch Serverless (vector store), DynamoDB (sessions/products), ECS Fargate (orchestrator), API Gateway WebSocket, ElastiCache Redis. Target: useful answer in under 3 seconds, 1M messages/day scale.
Skill Mapping
| Attribute | Detail |
|---|---|
| Skill | 2.1.7 |
| Description | Develop model extension frameworks to enhance FM capabilities |
| Sub-focus | Operational troubleshooting for Lambda MCP and ECS MCP servers |
| AWS Services | Lambda, ECS Fargate, API Gateway, CloudWatch, ElastiCache Redis, X-Ray |
| MangaAssist Relevance | Maintaining <3s response time and 99.9% availability for tool-augmented FM |
Mind Map
mindmap
root((MCP Troubleshooting))
Scenario 1
Lambda Cold Start Timeout
Provisioned Concurrency
Module Optimization
Layer Caching
Scenario 2
ECS Memory Leak
Session Cache Growth
Connection Pool Leak
Container Restart
Scenario 3
Protocol Version Mismatch
Version Negotiation
Graceful Degradation
Client Compatibility
Scenario 4
Connection Pool Exhaustion
Pool Sizing
Leak Detection
Backpressure
Scenario 5
Capability Drift After Update
Tool Schema Changes
Client Cache Invalidation
Blue-Green Deploy
Scenario 1: Lambda MCP Cold Start Timeout
Problem
The MangaAssist catalog search Lambda MCP server experiences intermittent timeouts when users first interact after a period of inactivity. The tool call fails with a 504 Gateway Timeout after 10 seconds, and the Bedrock agent falls back to answering without catalog data, providing stale or hallucinated manga information.
Impact: During traffic spikes (e.g., after a new manga release announcement at 10:00 JST), 15-20% of first requests per cold container fail. Users see messages like "I don't have current inventory information" instead of actual search results.
Symptoms observed:
- CloudWatch Lambda Duration metric shows bimodal distribution: p50 = 200ms, p99 = 9,800ms
- API Gateway 504 errors spike at 08:00-10:00 JST (start of business hours)
- X-Ray traces show 7-8 seconds spent in "Initialization" segment
- Bedrock agent logs show tool_call_failed: timeout for manga_catalog_search
Detection
flowchart TD
A[CloudWatch Alarm:<br/>Lambda Duration p99 > 8000ms<br/>for 3 consecutive periods] --> B{Check Lambda Metrics}
B --> C[ColdStart Init Duration > 5000ms?]
C -->|Yes| D[Cold Start Issue Confirmed]
C -->|No| E[Check downstream latency<br/>OpenSearch / Redis]
D --> F{Which phase is slow?}
F -->|Import time| G["Heavy imports:<br/>opensearch-py, boto3,<br/>numpy (if present)"]
F -->|Connection setup| H["Backend connections:<br/>OpenSearch TLS handshake,<br/>Redis connect"]
F -->|Both| I[Combined: imports + connections]
G --> J[Resolution: Optimize imports]
H --> K[Resolution: Connection caching]
I --> L[Resolution: Provisioned concurrency]
style A fill:#c7131b,color:#fff
style D fill:#d86613,color:#fff
style J fill:#1a8c1a,color:#fff
style K fill:#1a8c1a,color:#fff
style L fill:#1a8c1a,color:#fff
Root Cause
The Lambda function's cold start is slow due to three compounding factors:
- Heavy dependency imports (3-4 seconds):
opensearch-py,boto3,requests, andrequests-aws4authimport chains load dozens of submodules - TLS connection establishment (2-3 seconds): OpenSearch Serverless requires TLS 1.2 with SigV4 authentication, and the first connection includes certificate verification
- Redis connection (0.5-1 second): ElastiCache Redis connection establishment with encryption in transit
Total cold start: 6-8 seconds, leaving only 2-4 seconds of the 10-second API Gateway timeout for actual work.
Resolution
"""
Optimized Lambda MCP server with cold start mitigation.
Applies lazy imports, connection warming, and Provisioned Concurrency support.
"""
import json
import logging
import os
import time
logger = logging.getLogger()
logger.setLevel(logging.INFO)
# ---------- OPTIMIZATION 1: Lazy imports ----------
# Heavy libraries are imported only when first needed, not at module load.
# This shaves 1-2 seconds off cold start for requests that don't need them.
_opensearch_client = None
_redis_client = None
REGION = os.environ.get("AWS_REGION", "ap-northeast-1")
OPENSEARCH_ENDPOINT = os.environ.get("OPENSEARCH_ENDPOINT", "")
REDIS_ENDPOINT = os.environ.get("REDIS_ENDPOINT", "")
def _get_opensearch_client():
"""Lazy-initialize OpenSearch client on first use."""
global _opensearch_client
if _opensearch_client is None:
start = time.time()
from opensearchpy import OpenSearch, RequestsHttpConnection
from requests_aws4auth import AWS4Auth
import boto3
credentials = boto3.Session().get_credentials()
aws_auth = AWS4Auth(
credentials.access_key,
credentials.secret_key,
REGION,
"aoss",
session_token=credentials.token,
)
_opensearch_client = OpenSearch(
hosts=[{"host": OPENSEARCH_ENDPOINT, "port": 443}],
http_auth=aws_auth,
use_ssl=True,
verify_certs=True,
connection_class=RequestsHttpConnection,
timeout=5,
)
elapsed = (time.time() - start) * 1000
logger.info(f"OpenSearch client initialized in {elapsed:.0f}ms")
return _opensearch_client
def _get_redis_client():
"""Lazy-initialize Redis client on first use."""
global _redis_client
if _redis_client is None and REDIS_ENDPOINT:
start = time.time()
import redis
_redis_client = redis.Redis(
host=REDIS_ENDPOINT, port=6379,
decode_responses=True, socket_timeout=1,
socket_connect_timeout=2,
retry_on_timeout=True,
)
# Warm the connection immediately
try:
_redis_client.ping()
except Exception as e:
logger.warning(f"Redis warm ping failed: {e}")
elapsed = (time.time() - start) * 1000
logger.info(f"Redis client initialized in {elapsed:.0f}ms")
return _redis_client
# ---------- OPTIMIZATION 2: Provisioned Concurrency Warm-Up ----------
# When Provisioned Concurrency is enabled, Lambda calls the handler
# initialization code BEFORE any request arrives. We pre-warm connections.
def _warm_connections():
"""Called during Provisioned Concurrency initialization.
Pre-establishes connections so the first real request is fast."""
logger.info("Pre-warming connections for Provisioned Concurrency...")
try:
os_client = _get_opensearch_client()
os_client.info() # Force actual connection
logger.info("OpenSearch connection warmed")
except Exception as e:
logger.warning(f"OpenSearch warm failed: {e}")
try:
r = _get_redis_client()
if r:
r.ping()
logger.info("Redis connection warmed")
except Exception as e:
logger.warning(f"Redis warm failed: {e}")
# Trigger warm-up during init phase
if os.environ.get("AWS_LAMBDA_INITIALIZATION_TYPE") == "provisioned-concurrency":
_warm_connections()
# ---------- OPTIMIZATION 3: Minimal response path for simple requests ----------
# MCP protocol handlers (initialize, tools/list) don't need backend connections.
# Serve them immediately without touching OpenSearch or Redis.
PROTOCOL_VERSION = "2024-11-05"
SERVER_NAME = "mangaassist-catalog-search"
SERVER_VERSION = "1.1.0"
TOOL_DEFINITIONS = [
{
"name": "manga_catalog_search",
"description": "Search manga catalog by title, author, genre, or ISBN.",
"inputSchema": {
"type": "object",
"properties": {
"query": {"type": "string", "description": "Search query"},
"genre": {"type": "string", "default": "all"},
"max_results": {"type": "integer", "default": 5},
},
"required": ["query"]
}
}
]
def lambda_handler(event, context):
"""Optimized Lambda entry point. Protocol messages served instantly;
only tool/call triggers backend connections."""
try:
body = json.loads(event.get("body", "{}")) if isinstance(event.get("body"), str) else event
method = body.get("method", "")
msg_id = body.get("id")
params = body.get("params", {})
# FAST PATH: Protocol messages need no backend access
if method == "initialize":
return _respond(200, msg_id, {
"protocolVersion": PROTOCOL_VERSION,
"capabilities": {"tools": {"listChanged": False}},
"serverInfo": {"name": SERVER_NAME, "version": SERVER_VERSION}
})
if method == "notifications/initialized":
return {"statusCode": 204, "body": ""}
if method == "tools/list":
return _respond(200, msg_id, {"tools": TOOL_DEFINITIONS})
# SLOW PATH: Tool execution needs backends
if method == "tools/call":
tool_name = params.get("name", "")
arguments = params.get("arguments", {})
if tool_name == "manga_catalog_search":
result = _execute_search(arguments)
return _respond(200, msg_id, result)
else:
return _respond(400, msg_id, None, {
"code": -32601, "message": f"Unknown tool: {tool_name}"
})
return _respond(400, msg_id, None, {
"code": -32601, "message": f"Unknown method: {method}"
})
except Exception as e:
logger.error(f"Handler error: {e}", exc_info=True)
return _respond(500, None, None, {
"code": -32603, "message": "Internal error"
})
def _execute_search(arguments: dict) -> dict:
"""Execute catalog search -- only called for tools/call."""
query = arguments.get("query", "")
start = time.time()
# Try cache first
redis = _get_redis_client()
cache_key = f"search:{query}:{arguments.get('genre', 'all')}"
if redis:
try:
cached = redis.get(cache_key)
if cached:
return {
"content": [{"type": "text", "text": cached}],
"isError": False
}
except Exception:
pass
# Search OpenSearch
os_client = _get_opensearch_client()
search_body = {
"size": min(arguments.get("max_results", 5), 20),
"query": {"multi_match": {
"query": query,
"fields": ["title^3", "title_jp^3", "author^2", "description"]
}}
}
try:
response = os_client.search(
index=os.environ.get("OPENSEARCH_INDEX", "manga-catalog"),
body=search_body
)
hits = response.get("hits", {}).get("hits", [])
results = [h["_source"] for h in hits]
elapsed = round((time.time() - start) * 1000, 1)
payload = json.dumps({
"results": results,
"metadata": {"query": query, "latency_ms": elapsed}
}, ensure_ascii=False, default=str)
# Cache result
if redis:
try:
redis.setex(cache_key, 300, payload)
except Exception:
pass
return {
"content": [{"type": "text", "text": payload}],
"isError": False
}
except Exception as e:
return {
"content": [{"type": "text", "text": f"Search error: {e}"}],
"isError": True
}
def _respond(status, msg_id, result=None, error=None):
body = {"jsonrpc": "2.0", "id": msg_id}
if error:
body["error"] = error
else:
body["result"] = result
return {
"statusCode": status,
"headers": {"Content-Type": "application/json"},
"body": json.dumps(body, default=str)
}
Prevention
| Prevention Measure | Implementation | Impact |
|---|---|---|
| Provisioned Concurrency | Set 10 warm instances for catalog-search |
Eliminates cold starts for base traffic; ~$30/month at 10 instances |
| Warm-up on init | Pre-establish OpenSearch and Redis connections during PC init | Saves 3-4s on first request |
| Lazy imports | Only import opensearch-py when tools/call is received |
Protocol messages (initialize, tools/list) respond in <50ms even cold |
| Lambda SnapStart | Enable for Java/Python runtimes (when available) | Checkpoint-restore eliminates JVM/Python init overhead |
| Scheduled warmer | EventBridge rule pings function every 5 minutes | Keeps at least 1 container warm outside Provisioned Concurrency |
| Connection timeout tuning | Redis socket_connect_timeout=2s, OpenSearch timeout=5s |
Fail fast on connection issues instead of hanging to Lambda timeout |
Scenario 2: ECS MCP Memory Leak
Problem
The MangaAssist recommendation MCP server running on ECS Fargate gradually consumes all available memory (2 GB) over 3-4 days, eventually triggering an OOM kill and task restart. During the restart (30-60 seconds), recommendation requests fail, and users see "Recommendations are temporarily unavailable."
Impact: Every 3-4 days, recommendations go offline for 30-60 seconds. The restart also causes loss of the in-memory session cache (~8,000 active sessions), forcing Redis fallback reads and degrading latency from 50ms to 200ms for the next hour as the cache re-warms.
Symptoms observed:
- ECS task memory utilization climbs linearly: Day 1 = 45%, Day 2 = 60%, Day 3 = 78%, Day 4 = OOM
- CloudWatch MemoryUtilization alarm fires at 85% threshold
- ECS task STOPPED reason: OutOfMemoryError: Container killed due to memory usage
- Post-restart: Redis GET requests spike 10x as in-memory cache is empty
Detection
flowchart TD
A[CloudWatch Alarm:<br/>ECS MemoryUtilization > 85%<br/>for 5 minutes] --> B{Task restarted recently?}
B -->|Yes - OOM Kill| C[Check ECS Task<br/>Stopped Reason]
B -->|No - Still running| D[Capture memory profile<br/>before OOM]
C --> E[Confirm:<br/>OutOfMemoryError]
D --> F{Memory growth pattern?}
F -->|Linear growth| G[Likely: unbounded cache<br/>or accumulating data structure]
F -->|Sudden spike| H[Likely: large query result<br/>or payload in memory]
F -->|Step function| I[Likely: connection leak<br/>or thread leak]
G --> J["Investigate:<br/>1. Session cache size<br/>2. Query result caching<br/>3. Logging buffer<br/>4. Metric accumulation"]
H --> K["Investigate:<br/>1. Large OpenSearch response<br/>2. Unbounded list in result<br/>3. Binary data in memory"]
I --> L["Investigate:<br/>1. Redis connection pool<br/>2. OpenSearch connection pool<br/>3. Asyncio task leak"]
J --> M[Resolution: Add LRU eviction<br/>+ periodic cache flush]
K --> N[Resolution: Limit result sizes<br/>+ streaming responses]
L --> O[Resolution: Fix pool config<br/>+ connection cleanup]
style A fill:#c7131b,color:#fff
style E fill:#d86613,color:#fff
style M fill:#1a8c1a,color:#fff
style N fill:#1a8c1a,color:#fff
style O fill:#1a8c1a,color:#fff
Root Cause
The memory leak has two contributing causes:
-
Unbounded session cache: The
active_sessionsdictionary grows without limit. Each session stores conversation history, viewed items, and preference vectors. At 1M messages/day, ~50K unique sessions accumulate, each averaging 40KB = 2GB. -
OpenSearch response accumulation: Large search results (20 items with full metadata) are kept in Python memory even after being serialized to the response. Python's garbage collector doesn't reclaim memory promptly due to circular references in the OpenSearch client response objects.
Resolution
"""
Memory-safe ECS MCP server with bounded caches,
periodic cleanup, and memory monitoring.
"""
import asyncio
import gc
import json
import logging
import os
import sys
import time
import tracemalloc
from collections import OrderedDict
from typing import Optional
logger = logging.getLogger("mcp-recommendation")
class BoundedLRUCache:
"""Memory-bounded LRU cache with automatic eviction and size tracking.
This replaces the unbounded dict that caused the memory leak."""
def __init__(self, max_items: int = 10000, max_memory_mb: float = 256):
self._cache: OrderedDict[str, dict] = OrderedDict()
self._max_items = max_items
self._max_memory_bytes = int(max_memory_mb * 1024 * 1024)
self._current_memory_estimate = 0
self._eviction_count = 0
def get(self, key: str) -> Optional[dict]:
if key in self._cache:
self._cache.move_to_end(key)
return self._cache[key]
return None
def put(self, key: str, value: dict):
estimated_size = self._estimate_size(value)
# Evict by item count
while len(self._cache) >= self._max_items:
self._evict_oldest()
# Evict by memory
while self._current_memory_estimate + estimated_size > self._max_memory_bytes:
if not self._cache:
break
self._evict_oldest()
self._cache[key] = value
self._current_memory_estimate += estimated_size
def _evict_oldest(self):
if self._cache:
key, value = self._cache.popitem(last=False)
self._current_memory_estimate -= self._estimate_size(value)
self._current_memory_estimate = max(0, self._current_memory_estimate)
self._eviction_count += 1
@staticmethod
def _estimate_size(obj) -> int:
"""Rough estimate of object memory footprint."""
try:
return len(json.dumps(obj, default=str).encode("utf-8"))
except Exception:
return 1024 # Default 1KB estimate
def clear_expired(self, max_age_seconds: int = 1800):
"""Remove entries older than max_age_seconds."""
now = time.time()
expired_keys = []
for key, value in self._cache.items():
if isinstance(value, dict):
last_access = value.get("_last_access", value.get("created_at", 0))
if now - last_access > max_age_seconds:
expired_keys.append(key)
for key in expired_keys:
value = self._cache.pop(key, None)
if value:
self._current_memory_estimate -= self._estimate_size(value)
if expired_keys:
logger.info(f"Expired {len(expired_keys)} cache entries")
def stats(self) -> dict:
return {
"items": len(self._cache),
"max_items": self._max_items,
"estimated_memory_mb": round(self._current_memory_estimate / (1024 * 1024), 2),
"max_memory_mb": round(self._max_memory_bytes / (1024 * 1024), 2),
"total_evictions": self._eviction_count,
}
class MemoryMonitor:
"""Monitors process memory and triggers cleanup when thresholds are reached."""
def __init__(self, warning_pct: float = 75, critical_pct: float = 85):
self.warning_pct = warning_pct
self.critical_pct = critical_pct
self._container_memory_limit = self._detect_memory_limit()
self._snapshots: list[dict] = []
self._tracemalloc_enabled = False
@staticmethod
def _detect_memory_limit() -> int:
"""Detect ECS Fargate container memory limit from cgroup."""
try:
with open("/sys/fs/cgroup/memory/memory.limit_in_bytes") as f:
return int(f.read().strip())
except (FileNotFoundError, ValueError):
# Fallback: assume 2GB (common Fargate config)
return 2 * 1024 * 1024 * 1024
def get_current_usage(self) -> dict:
"""Get current memory usage stats."""
import resource
rusage = resource.getrusage(resource.RUSAGE_SELF)
rss_bytes = rusage.ru_maxrss * 1024 # maxrss is in KB on Linux
# Also check /proc/self/status for more accurate numbers
try:
with open("/proc/self/status") as f:
for line in f:
if line.startswith("VmRSS:"):
rss_bytes = int(line.split()[1]) * 1024
break
except (FileNotFoundError, ValueError):
pass
limit = self._container_memory_limit
usage_pct = (rss_bytes / limit) * 100 if limit > 0 else 0
return {
"rss_mb": round(rss_bytes / (1024 * 1024), 1),
"limit_mb": round(limit / (1024 * 1024), 1),
"usage_pct": round(usage_pct, 1),
"gc_counts": gc.get_count(),
"gc_thresholds": gc.get_threshold(),
}
def check_and_cleanup(self, session_cache: BoundedLRUCache) -> dict:
"""Check memory and perform cleanup if needed."""
usage = self.get_current_usage()
actions_taken = []
if usage["usage_pct"] >= self.critical_pct:
# CRITICAL: Aggressive cleanup
logger.warning(f"Memory CRITICAL: {usage['usage_pct']}%")
# Force garbage collection
collected = gc.collect()
actions_taken.append(f"gc.collect freed {collected} objects")
# Evict 50% of session cache
items_before = session_cache.stats()["items"]
target = items_before // 2
while session_cache.stats()["items"] > target:
session_cache._evict_oldest()
actions_taken.append(
f"Evicted {items_before - session_cache.stats()['items']} cache entries"
)
elif usage["usage_pct"] >= self.warning_pct:
# WARNING: Moderate cleanup
logger.warning(f"Memory WARNING: {usage['usage_pct']}%")
# Expire old sessions
session_cache.clear_expired(max_age_seconds=900) # 15 min
gc.collect()
actions_taken.append("Cleared sessions > 15min, ran gc.collect")
# Record snapshot
self._snapshots.append({
"timestamp": time.time(),
"rss_mb": usage["rss_mb"],
"usage_pct": usage["usage_pct"],
"actions": actions_taken,
})
# Keep last 100 snapshots
self._snapshots = self._snapshots[-100:]
usage["actions_taken"] = actions_taken
return usage
def enable_tracemalloc(self):
"""Enable tracemalloc for debugging memory leaks in development."""
if not self._tracemalloc_enabled:
tracemalloc.start(25) # 25 frames deep
self._tracemalloc_enabled = True
logger.info("tracemalloc enabled")
def get_top_allocations(self, count: int = 10) -> list[str]:
"""Get top memory allocators (requires tracemalloc enabled)."""
if not self._tracemalloc_enabled:
return ["tracemalloc not enabled"]
snapshot = tracemalloc.take_snapshot()
top = snapshot.statistics("lineno")[:count]
return [str(stat) for stat in top]
async def periodic_memory_check(
monitor: MemoryMonitor,
session_cache: BoundedLRUCache,
interval_s: int = 60,
):
"""Background task that periodically checks memory and cleans up."""
while True:
try:
result = monitor.check_and_cleanup(session_cache)
logger.info(
f"Memory check: {result['rss_mb']}MB "
f"({result['usage_pct']}%), "
f"cache={session_cache.stats()['items']} items"
)
except Exception as e:
logger.error(f"Memory check failed: {e}")
await asyncio.sleep(interval_s)
async def periodic_cache_cleanup(
session_cache: BoundedLRUCache,
interval_s: int = 300,
max_age_s: int = 1800,
):
"""Background task that expires old cache entries every 5 minutes."""
while True:
try:
session_cache.clear_expired(max_age_seconds=max_age_s)
except Exception as e:
logger.error(f"Cache cleanup failed: {e}")
await asyncio.sleep(interval_s)
Prevention
| Prevention Measure | Implementation | Impact |
|---|---|---|
| BoundedLRUCache | Max 10K items and 256MB memory limit with automatic eviction | Caps memory growth at a predictable ceiling |
| Periodic cache expiration | Background task every 5 min removes sessions > 30 min old | Prevents stale session accumulation |
| Memory monitor | Checks /proc/self/status every 60s, triggers cleanup at 75%/85% |
Early warning before OOM |
| Aggressive GC at critical | gc.collect() + 50% cache eviction when memory > 85% |
Emergency pressure relief |
| ECS task memory headroom | Set task memory to 2.5GB (was 2GB) | Extra 500MB buffer for spikes |
| CloudWatch alarm | Alert at 70% memory utilization (15 min sustained) | Human notification before automated mitigation |
| tracemalloc in staging | Enable in staging environment to profile top allocators | Identify new leaks before production |
Scenario 3: Protocol Version Mismatch
Problem
After updating the MangaAssist Bedrock agent to use a newer MCP client library (which defaults to protocol version 2025-01-15), the Lambda MCP servers that still advertise 2024-11-05 begin returning initialization errors. The agent cannot establish MCP sessions, and all tool calls fail silently -- the agent proceeds without tools, producing answers without real catalog data.
Impact: 100% of tool-augmented responses degrade to pure LLM generation without real data. Users asking "Is Attack on Titan volume 34 in stock?" get a generic response instead of a live inventory check.
Symptoms observed:
- Agent logs: MCP initialization failed: unsupported protocol version
- Lambda CloudWatch: all initialize requests return error response
- No tools/call invocations in Lambda metrics (session never established)
- Bedrock agent traces show tool skipped with initialization_failed status
Detection
flowchart TD
A[Alert: MCP tool call<br/>success rate drops to 0%] --> B{Check agent logs}
B --> C[Agent log: 'MCP initialization failed:<br/>unsupported protocol version']
C --> D{Which side is wrong?}
D --> E[Client sends:<br/>protocolVersion: 2025-01-15]
D --> F[Server supports:<br/>protocolVersion: 2024-11-05]
E --> G{Server version negotiation<br/>handles mismatch?}
G -->|No - strict check| H["Bug: Server rejects<br/>unknown versions instead<br/>of negotiating down"]
G -->|Yes - negotiates| I["Bug: Client rejects<br/>server's lower version"]
H --> J["Fix: Update server to<br/>negotiate compatible version"]
I --> K["Fix: Update client to<br/>accept server's version"]
style A fill:#c7131b,color:#fff
style H fill:#d86613,color:#fff
style I fill:#d86613,color:#fff
style J fill:#1a8c1a,color:#fff
style K fill:#1a8c1a,color:#fff
Root Cause
The Lambda MCP server's handle_initialize function performs a strict equality check on the protocol version:
# BUG: Strict version check rejects any version we don't recognize
def handle_initialize(params):
client_version = params.get("protocolVersion", "")
if client_version != "2024-11-05":
return {"error": {"code": -32602, "message": f"Unsupported version: {client_version}"}}
# ...
The MCP specification requires servers to negotiate downward: if the client requests a newer version, the server should respond with its highest supported version and let the client decide if it can work with that.
Resolution
"""
Robust MCP protocol version negotiation.
Handles forward and backward compatibility gracefully.
"""
import logging
from typing import Optional
logger = logging.getLogger("mcp-version")
# All versions this server can speak, ordered newest-first
SUPPORTED_VERSIONS = [
"2024-11-05", # Current production version
"2024-09-01", # Previous version (backward compat)
]
# Minimum version we require clients to support
MINIMUM_CLIENT_VERSION = "2024-09-01"
def negotiate_protocol_version(
client_requested_version: str,
) -> tuple[Optional[str], Optional[dict]]:
"""
Negotiate protocol version per MCP specification.
Rules:
1. If client requests a version we support, use it.
2. If client requests a newer version we don't support,
offer our highest version (client decides compatibility).
3. If client requests an older version below our minimum,
return an error.
Returns:
(negotiated_version, error_or_none)
"""
if not client_requested_version:
# No version specified -- use our default
logger.warning("Client sent no protocolVersion, using default")
return SUPPORTED_VERSIONS[0], None
# Case 1: Exact match -- ideal case
if client_requested_version in SUPPORTED_VERSIONS:
logger.info(f"Exact version match: {client_requested_version}")
return client_requested_version, None
# Case 2: Client wants newer version than we support
# Offer our highest version; client can accept or reject
if client_requested_version > SUPPORTED_VERSIONS[0]:
offered = SUPPORTED_VERSIONS[0]
logger.info(
f"Client requested {client_requested_version}, "
f"offering {offered} (our highest)"
)
return offered, None
# Case 3: Client wants older version than our minimum
if client_requested_version < MINIMUM_CLIENT_VERSION:
error = {
"code": -32602,
"message": (
f"Protocol version {client_requested_version} is too old. "
f"Minimum supported: {MINIMUM_CLIENT_VERSION}. "
f"Supported versions: {SUPPORTED_VERSIONS}"
)
}
logger.warning(f"Rejecting old client version: {client_requested_version}")
return None, error
# Case 4: Version between our min and max but not in our list
# Use the nearest lower version we support
for version in SUPPORTED_VERSIONS:
if version <= client_requested_version:
logger.info(
f"Client requested {client_requested_version}, "
f"using nearest lower: {version}"
)
return version, None
# Fallback: offer our highest
return SUPPORTED_VERSIONS[0], None
def handle_initialize(params: dict) -> dict:
"""
Fixed initialize handler with proper version negotiation.
Replaces the strict equality check that caused the outage.
"""
client_version = params.get("protocolVersion", "")
client_info = params.get("clientInfo", {})
negotiated_version, error = negotiate_protocol_version(client_version)
if error:
logger.error(f"Version negotiation failed: {error}")
return {"error": error}
logger.info(
f"MCP session initialized: client={client_info.get('name', 'unknown')} "
f"requested={client_version} negotiated={negotiated_version}"
)
return {
"protocolVersion": negotiated_version,
"capabilities": {
"tools": {"listChanged": False},
},
"serverInfo": {
"name": "mangaassist-catalog-search",
"version": "1.2.0",
}
}
# ---------- Client-side version handling ----------
def validate_server_version(
requested_version: str,
server_response_version: str,
acceptable_versions: list[str],
) -> tuple[bool, str]:
"""
Client-side validation of the server's negotiated version.
The client should accept any version it can speak, even if
the server offered a lower version than requested.
"""
if server_response_version in acceptable_versions:
return True, ""
# Check if it's a version we can work with (same major.minor)
req_parts = requested_version.split("-")
srv_parts = server_response_version.split("-")
if len(req_parts) >= 2 and len(srv_parts) >= 2:
# Same year = likely compatible
if req_parts[0] == srv_parts[0]:
return True, ""
return False, (
f"Server offered {server_response_version} which is not in "
f"our acceptable versions: {acceptable_versions}"
)
Prevention
| Prevention Measure | Implementation | Impact |
|---|---|---|
| Negotiation, not rejection | Server offers its highest compatible version instead of erroring | Graceful degradation across versions |
| Client version tolerance | Client accepts any version from same calendar year | Forward-compatible with new server releases |
| Version in health check | /mcp/health returns protocol_versions array |
Monitoring can detect version skew before outages |
| Canary deployment | Update one Lambda alias first, test client compatibility | Catch mismatches before full rollout |
| Integration tests | CI pipeline tests client against all supported server versions | Prevent regressions in version negotiation |
| CloudWatch metric | Emit VersionNegotiationResult metric (success/failure/downgrade) |
Dashboard visibility into version health |
Scenario 4: Connection Pool Exhaustion
Problem
The ECS MCP recommendation server's Redis connection pool becomes exhausted under sustained load (>200 concurrent requests). New tool calls block waiting for a connection and eventually timeout. The server remains "healthy" (liveness check passes) but cannot process tool requests, causing the agent to receive timeout errors.
Impact: During peak traffic (12:00-13:00 JST lunch break, 18:00-20:00 JST evening peak), 30-40% of recommendation requests fail. The circuit breaker opens after 5 failures, blocking all recommendation requests for 30 seconds. Users see generic responses instead of personalized recommendations.
Symptoms observed:
- Redis client logs: redis.exceptions.ConnectionError: Too many connections
- MCP tool latency p99 jumps from 200ms to 10,000ms (connection wait timeout)
- ECS task CPU drops to near-zero (blocked on connection acquisition)
- Circuit breaker state flips to OPEN repeatedly
- Redis INFO clients shows connected_clients near maxclients limit
Detection
flowchart TD
A[Alert: MCP tool latency<br/>p99 > 5000ms for 5 minutes] --> B{Check connection metrics}
B --> C[Redis INFO clients]
C --> D{connected_clients near maxclients?}
D -->|Yes| E[Connection Pool Exhaustion]
D -->|No| F[Check other bottlenecks<br/>OpenSearch, CPU, network]
E --> G{Where are connections going?}
G --> H[Leak: Connections opened<br/>but not returned to pool]
G --> I[Sizing: Pool too small<br/>for concurrent load]
G --> J[Starvation: Slow operations<br/>holding connections too long]
H --> K["Fix:<br/>1. Ensure async with/context managers<br/>2. Add connection timeout<br/>3. Add pool health check"]
I --> L["Fix:<br/>1. Increase pool size to 50<br/>2. Add connection queuing<br/>3. Add backpressure"]
J --> M["Fix:<br/>1. Add per-operation timeout<br/>2. Pipeline Redis commands<br/>3. Optimize slow queries"]
style A fill:#c7131b,color:#fff
style E fill:#d86613,color:#fff
style K fill:#1a8c1a,color:#fff
style L fill:#1a8c1a,color:#fff
style M fill:#1a8c1a,color:#fff
Root Cause
Three factors combine to exhaust the connection pool:
- Pool too small: The Redis connection pool was configured with
max_connections=20, but the ECS task handles up to 200 concurrent requests (each needing 1-3 Redis operations) - Connection leak in error paths: When an OpenSearch query fails after acquiring a Redis connection for caching, the Redis connection is not returned to the pool because the exception skips the cleanup code
- No backpressure: The server accepts unlimited concurrent requests, even when all connections are occupied
Resolution
"""
Connection pool management with proper sizing, leak prevention,
backpressure, and monitoring for ECS MCP servers.
"""
import asyncio
import logging
import time
from contextlib import asynccontextmanager
from typing import Optional
from dataclasses import dataclass, field
import redis.asyncio as aioredis
logger = logging.getLogger("mcp-connection-pool")
@dataclass
class PoolMetrics:
"""Tracks connection pool health metrics."""
total_acquisitions: int = 0
total_releases: int = 0
total_timeouts: int = 0
total_errors: int = 0
peak_concurrent: int = 0
current_active: int = 0
_acquisition_times: list = field(default_factory=list)
def record_acquisition(self, elapsed_ms: float):
self.total_acquisitions += 1
self.current_active += 1
self.peak_concurrent = max(self.peak_concurrent, self.current_active)
self._acquisition_times.append(elapsed_ms)
if len(self._acquisition_times) > 1000:
self._acquisition_times = self._acquisition_times[-1000:]
def record_release(self):
self.total_releases += 1
self.current_active = max(0, self.current_active - 1)
def record_timeout(self):
self.total_timeouts += 1
def stats(self) -> dict:
times = self._acquisition_times
avg_wait = sum(times) / len(times) if times else 0
p99_wait = sorted(times)[int(len(times) * 0.99)] if times else 0
return {
"total_acquisitions": self.total_acquisitions,
"total_releases": self.total_releases,
"leaked": self.total_acquisitions - self.total_releases,
"total_timeouts": self.total_timeouts,
"total_errors": self.total_errors,
"peak_concurrent": self.peak_concurrent,
"current_active": self.current_active,
"avg_wait_ms": round(avg_wait, 1),
"p99_wait_ms": round(p99_wait, 1),
}
class ManagedRedisPool:
"""Redis connection pool with leak prevention, monitoring,
and backpressure."""
def __init__(
self,
host: str,
port: int = 6379,
max_connections: int = 50,
acquire_timeout_s: float = 3.0,
operation_timeout_s: float = 2.0,
):
self._pool = aioredis.ConnectionPool(
host=host,
port=port,
max_connections=max_connections,
decode_responses=True,
socket_timeout=operation_timeout_s,
socket_connect_timeout=3,
retry_on_timeout=False, # We handle retries at the MCP level
)
self._client = aioredis.Redis(connection_pool=self._pool)
self._semaphore = asyncio.Semaphore(max_connections)
self._acquire_timeout = acquire_timeout_s
self._metrics = PoolMetrics()
self._max_connections = max_connections
@asynccontextmanager
async def connection(self):
"""
Acquire a connection from the pool with backpressure.
Uses a semaphore to prevent over-acquisition and ensures
connections are ALWAYS returned, even on exceptions.
"""
start = time.time()
try:
# Backpressure: wait for available connection slot
acquired = await asyncio.wait_for(
self._semaphore.acquire(),
timeout=self._acquire_timeout
)
except asyncio.TimeoutError:
self._metrics.record_timeout()
logger.warning(
f"Connection pool timeout after {self._acquire_timeout}s "
f"(active={self._metrics.current_active}/{self._max_connections})"
)
raise ConnectionError(
f"Redis connection pool exhausted "
f"({self._metrics.current_active} active connections)"
)
elapsed = (time.time() - start) * 1000
self._metrics.record_acquisition(elapsed)
try:
yield self._client
finally:
# CRITICAL: Always release the semaphore, even on exception.
# This is the fix for the connection leak.
self._semaphore.release()
self._metrics.record_release()
async def execute(self, operation: str, *args, **kwargs):
"""Execute a Redis operation with managed connection."""
async with self.connection() as client:
method = getattr(client, operation)
return await method(*args, **kwargs)
async def ping(self) -> bool:
"""Health check that tests an actual connection."""
try:
async with self.connection() as client:
return await client.ping()
except Exception:
return False
async def close(self):
"""Close all connections in the pool."""
await self._client.close()
await self._pool.disconnect()
@property
def stats(self) -> dict:
pool_info = {
"max_connections": self._max_connections,
"pool_created_connections": self._pool._created_connections
if hasattr(self._pool, '_created_connections') else "unknown",
}
return {**self._metrics.stats(), **pool_info}
# ---------- Usage in MCP tool handler (with leak prevention) ----------
async def execute_recommendation_with_safe_pool(
arguments: dict,
redis_pool: ManagedRedisPool,
opensearch_client,
) -> dict:
"""
Recommendation handler that properly manages Redis connections.
The async-with block ensures connections are returned even if
OpenSearch or other code raises an exception.
"""
user_id = arguments["user_id"]
count = arguments.get("count", 5)
# Phase 1: Check Redis cache (connection acquired and released here)
cached_result = None
try:
async with redis_pool.connection() as redis:
cached = await redis.get(f"reco:{user_id}")
if cached:
cached_result = json.loads(cached)
except ConnectionError:
# Pool exhausted -- proceed without cache
logger.warning("Redis pool exhausted, skipping cache check")
except Exception as e:
logger.warning(f"Redis cache check failed: {e}")
if cached_result:
return {
"content": [{"type": "text", "text": json.dumps(cached_result, ensure_ascii=False)}],
"isError": False
}
# Phase 2: Query OpenSearch (no Redis connection held!)
try:
response = await opensearch_client.search(
index="manga-catalog",
body={"size": count, "query": {"match_all": {}}}
)
results = [h["_source"] for h in response["hits"]["hits"]]
except Exception as e:
# If OpenSearch fails here, we have NOT leaked a Redis connection
# because the Redis connection was already released in Phase 1
return {
"content": [{"type": "text", "text": f"Search failed: {e}"}],
"isError": True
}
result_payload = {"recommendations": results, "user_id": user_id}
# Phase 3: Cache result in Redis (separate connection acquisition)
try:
async with redis_pool.connection() as redis:
await redis.setex(
f"reco:{user_id}", 600,
json.dumps(result_payload, ensure_ascii=False, default=str)
)
except ConnectionError:
logger.warning("Redis pool exhausted, skipping cache write")
except Exception as e:
logger.warning(f"Redis cache write failed: {e}")
return {
"content": [{"type": "text", "text": json.dumps(result_payload, ensure_ascii=False)}],
"isError": False
}
import json # Required for the handler above
Prevention
| Prevention Measure | Implementation | Impact |
|---|---|---|
| Semaphore-based backpressure | asyncio.Semaphore(max_connections) gates pool access |
Prevents over-acquisition; callers wait instead of erroring |
| Acquire timeout | 3-second timeout on semaphore acquisition | Fast failure with clear error message instead of hanging |
| Context manager pattern | async with pool.connection() ensures release in finally |
Eliminates connection leaks on exceptions |
| Phase separation | Acquire/release Redis connection per phase, not per request | Don't hold connections during slow OpenSearch queries |
| Pool metrics | Track acquisitions, releases, leaked count, wait times | Dashboard shows pool health and leak detection |
| Sizing formula | max_connections = max_concurrent_requests * redis_ops_per_request * 1.5 |
Right-sized pool: 200 * 1.5 * 1.5 = 450, capped at 50 per task (10 tasks) |
| ElastiCache maxclients | Set to pool_size_per_task * task_count * 2 = 50 * 10 * 2 = 1000 |
Headroom on Redis side |
Scenario 5: Capability Drift After Update
Problem
After deploying a new version of the manga_catalog_search Lambda MCP server that adds a new language parameter and renames genre to category, the MCP client (Bedrock agent orchestrator) continues using the old tool schema. Tool calls fail because the agent sends genre (which no longer exists) instead of category, and never sends the new required language parameter.
Impact: All catalog search requests return validation errors. The agent retries with the same incorrect arguments, wastes token budget on retries, and eventually gives up. Users cannot search the manga catalog.
Symptoms observed:
- Lambda logs: Missing required argument: 'language' and Unknown argument: 'genre'
- Agent retry count increases (3 retries per tool call, all failing)
- Token usage spikes (each retry consumes input/output tokens)
- Bedrock model invocation cost increases ~3x due to retries
Detection
flowchart TD
A[Alert: manga_catalog_search<br/>error rate > 95% for 10 min] --> B{Check error messages}
B --> C["'Missing required argument: language'<br/>'Unknown argument: genre'"]
C --> D{Recent deployment?}
D -->|Yes| E[Schema Changed in New Version]
D -->|No| F[Check for config drift]
E --> G{Client using cached schema?}
G -->|Yes| H["Root Cause:<br/>Client cached old tool definitions<br/>from previous tools/list call"]
G -->|No| I["Root Cause:<br/>Server schema changed without<br/>backward compatibility"]
H --> J["Fix 1: Force client<br/>to re-discover tools"]
I --> K["Fix 2: Restore backward<br/>compatibility in server"]
J --> L["Long-term: Implement<br/>listChanged notification"]
K --> L
style A fill:#c7131b,color:#fff
style E fill:#d86613,color:#fff
style J fill:#1a8c1a,color:#fff
style K fill:#1a8c1a,color:#fff
style L fill:#1a8c1a,color:#fff
Root Cause
The server deployment introduced a breaking schema change without backward compatibility:
- Renamed parameter:
genrewas renamed tocategory(breaking: old clients sendgenre) - New required parameter:
languagewas added as required (breaking: old clients don't know about it) - No
listChangednotification: The server didn't signal that its tool definitions changed - Client caches stale schema: The MCP client cached tool definitions from the initial
tools/listcall and never refreshes them
Resolution
"""
Backward-compatible MCP tool schema evolution.
Demonstrates how to add, rename, and deprecate parameters
without breaking existing clients.
"""
import json
import logging
import time
from typing import Any
logger = logging.getLogger("mcp-schema-evolution")
# ---------- Strategy 1: Backward-Compatible Schema ----------
TOOL_DEFINITIONS_V2 = [
{
"name": "manga_catalog_search",
"description": (
"Search the MangaAssist catalog by title, author, category/genre, "
"or ISBN. Returns matching manga with prices, ratings, and availability. "
"Supports Japanese and English language results."
),
"inputSchema": {
"type": "object",
"properties": {
"query": {
"type": "string",
"description": "Search query: title, author, keywords, or ISBN"
},
# NEW parameter: category (replaces genre)
"category": {
"type": "string",
"enum": ["shonen", "shojo", "seinen", "josei", "kodomo", "all"],
"default": "all",
"description": "Filter by manga category (preferred over 'genre')"
},
# DEPRECATED but still accepted: genre (alias for category)
"genre": {
"type": "string",
"enum": ["shonen", "shojo", "seinen", "josei", "kodomo", "all"],
"default": "all",
"description": "[DEPRECATED: use 'category'] Filter by manga genre"
},
# NEW parameter: language (optional with default, NOT required)
"language": {
"type": "string",
"enum": ["ja", "en", "both"],
"default": "both",
"description": "Language for results (ja=Japanese, en=English, both=all)"
},
"max_results": {
"type": "integer",
"default": 5,
"minimum": 1,
"maximum": 20
},
"in_stock_only": {
"type": "boolean",
"default": False
}
},
# NOTE: Only 'query' is required. 'language' has a default.
# 'genre' is still accepted for backward compatibility.
"required": ["query"]
}
}
]
def normalize_arguments(tool_name: str, arguments: dict) -> dict:
"""
Normalize tool arguments to handle deprecated parameter names
and missing optional parameters with defaults.
This adapter layer sits between the MCP protocol handler and
the actual tool implementation.
"""
if tool_name != "manga_catalog_search":
return arguments
normalized = dict(arguments)
# Handle genre -> category migration
if "genre" in normalized and "category" not in normalized:
normalized["category"] = normalized.pop("genre")
logger.info("Migrated deprecated 'genre' parameter to 'category'")
elif "genre" in normalized and "category" in normalized:
# Both provided: prefer category, discard genre
normalized.pop("genre")
logger.info("Both 'genre' and 'category' provided, using 'category'")
elif "genre" not in normalized and "category" not in normalized:
normalized["category"] = "all"
# Apply defaults for new optional parameters
if "language" not in normalized:
normalized["language"] = "both"
return normalized
# ---------- Strategy 2: Schema Version Tracking ----------
class SchemaVersionManager:
"""Tracks tool schema versions and detects drift between
client expectations and server definitions."""
def __init__(self):
self._schema_versions: dict[str, str] = {} # tool_name -> schema_hash
self._version_history: list[dict] = []
def register_schema(self, tool_name: str, schema: dict) -> str:
"""Register a tool schema and return its version hash."""
import hashlib
schema_str = json.dumps(schema, sort_keys=True)
schema_hash = hashlib.sha256(schema_str.encode()).hexdigest()[:12]
old_hash = self._schema_versions.get(tool_name)
if old_hash and old_hash != schema_hash:
logger.warning(
f"Schema change detected for {tool_name}: "
f"{old_hash} -> {schema_hash}"
)
self._version_history.append({
"tool": tool_name,
"old_version": old_hash,
"new_version": schema_hash,
"timestamp": time.time(),
})
self._schema_versions[tool_name] = schema_hash
return schema_hash
def get_schema_version(self, tool_name: str) -> str:
return self._schema_versions.get(tool_name, "unknown")
def check_client_compatibility(
self,
tool_name: str,
client_schema_hash: str,
) -> dict:
"""Check if client's cached schema matches current server schema."""
current = self._schema_versions.get(tool_name, "unknown")
is_compatible = client_schema_hash == current or client_schema_hash == "unknown"
return {
"compatible": is_compatible,
"client_version": client_schema_hash,
"server_version": current,
"action": "none" if is_compatible else "refresh_tools_list"
}
# ---------- Strategy 3: Capability Drift Detection ----------
class CapabilityDriftDetector:
"""Detects when client and server tool definitions are out of sync.
Runs as a periodic check in the MCP client."""
def __init__(self, refresh_interval_s: int = 300):
self._cached_tools: dict[str, dict] = {} # server -> tools_list result
self._last_refresh: dict[str, float] = {}
self._refresh_interval = refresh_interval_s
self._drift_events: list[dict] = []
async def check_and_refresh(
self,
server_name: str,
server_endpoint: str,
http_client,
) -> dict:
"""Check if tool definitions have changed on the server."""
last = self._last_refresh.get(server_name, 0)
now = time.time()
if now - last < self._refresh_interval:
return {"action": "none", "reason": "within refresh interval"}
# Fetch current tool definitions from server
try:
response = await http_client.post(
server_endpoint,
json={
"jsonrpc": "2.0",
"id": int(now),
"method": "tools/list",
"params": {}
},
timeout=5
)
result = response.json().get("result", {})
new_tools = result.get("tools", [])
except Exception as e:
logger.warning(f"Failed to refresh tools from {server_name}: {e}")
return {"action": "error", "reason": str(e)}
self._last_refresh[server_name] = now
# Compare with cached version
old_tools = self._cached_tools.get(server_name, [])
drift = self._detect_drift(old_tools, new_tools)
self._cached_tools[server_name] = new_tools
if drift["has_drift"]:
self._drift_events.append({
"server": server_name,
"timestamp": now,
"changes": drift["changes"],
})
logger.warning(
f"Capability drift detected on {server_name}: "
f"{json.dumps(drift['changes'])}"
)
return drift
@staticmethod
def _detect_drift(old_tools: list, new_tools: list) -> dict:
"""Compare old and new tool definitions to find changes."""
old_map = {t["name"]: t for t in old_tools}
new_map = {t["name"]: t for t in new_tools}
changes = []
# Tools added
for name in new_map:
if name not in old_map:
changes.append({"type": "tool_added", "tool": name})
# Tools removed
for name in old_map:
if name not in new_map:
changes.append({"type": "tool_removed", "tool": name})
# Tools modified
for name in set(old_map) & set(new_map):
old_schema = json.dumps(old_map[name].get("inputSchema", {}), sort_keys=True)
new_schema = json.dumps(new_map[name].get("inputSchema", {}), sort_keys=True)
if old_schema != new_schema:
# Identify specific parameter changes
old_props = old_map[name].get("inputSchema", {}).get("properties", {})
new_props = new_map[name].get("inputSchema", {}).get("properties", {})
added_params = set(new_props) - set(old_props)
removed_params = set(old_props) - set(new_props)
modified_params = [
p for p in set(old_props) & set(new_props)
if json.dumps(old_props[p], sort_keys=True) != json.dumps(new_props[p], sort_keys=True)
]
changes.append({
"type": "schema_changed",
"tool": name,
"added_params": list(added_params),
"removed_params": list(removed_params),
"modified_params": modified_params,
})
# Check if required fields changed
old_req = set(old_map[name].get("inputSchema", {}).get("required", []))
new_req = set(new_map[name].get("inputSchema", {}).get("required", []))
if old_req != new_req:
changes.append({
"type": "required_fields_changed",
"tool": name,
"added_required": list(new_req - old_req),
"removed_required": list(old_req - new_req),
})
return {
"has_drift": len(changes) > 0,
"changes": changes,
}
# ---------- Strategy 4: Blue-Green Deployment for Schema Changes ----------
def create_blue_green_tool_definitions(
blue_tools: list[dict],
green_tools: list[dict],
active_color: str = "green",
) -> list[dict]:
"""
During a blue-green deployment, serve both old and new tool
definitions to allow gradual client migration.
Phase 1: Deploy green with backward-compatible schema
Phase 2: Verify all clients work with green
Phase 3: Remove blue (old schema) support
"""
if active_color == "green":
return green_tools
elif active_color == "blue":
return blue_tools
else:
# Canary: merge both, preferring green where names overlap
merged = {}
for tool in blue_tools:
merged[tool["name"]] = tool
for tool in green_tools:
merged[tool["name"]] = tool # Green overwrites blue
return list(merged.values())
Prevention
| Prevention Measure | Implementation | Impact |
|---|---|---|
| Never make parameters required in updates | New parameters always have defaults; language defaults to "both" |
Old clients work without changes |
| Alias deprecated parameters | normalize_arguments() maps genre to category transparently |
Smooth migration without client coordination |
| Schema version tracking | SHA-256 hash of tool schema included in health endpoint | Monitoring detects schema changes immediately |
| Periodic tools/list refresh | Client re-fetches tool definitions every 5 minutes | Stale cache window limited to 5 minutes max |
| Drift detection | Compare old vs new tool definitions field-by-field | Automated alerting on breaking changes |
| Blue-green deployment | Serve both old and new schemas during transition | Zero-downtime schema evolution |
| Integration test matrix | Test old client against new server AND new client against old server | Catch incompatibilities in CI pipeline |
Scenario Comparison Summary
| # | Scenario | Severity | Detection Time | Resolution Time | Root Cause Category |
|---|---|---|---|---|---|
| 1 | Lambda MCP Cold Start Timeout | Medium | 5 min (CloudWatch alarm) | 1 hour (Provisioned Concurrency) | Performance / Configuration |
| 2 | ECS MCP Memory Leak | High | 1-4 days (gradual) | 30 min (code fix + deploy) | Resource Management |
| 3 | Protocol Version Mismatch | Critical | Immediate (100% failure) | 15 min (rollback or fix) | Compatibility / Protocol |
| 4 | Connection Pool Exhaustion | High | 5 min (latency spike) | 2 hours (pool redesign) | Resource Management |
| 5 | Capability Drift After Update | Critical | 10 min (error rate alert) | 30 min (backward compat fix) | Deployment / Schema |
Cross-Scenario Prevention Checklist
flowchart LR
subgraph "Before Deployment"
A[Integration test matrix<br/>client x server versions]
B[Schema backward<br/>compat check]
C[Load test with<br/>production traffic shape]
end
subgraph "During Deployment"
D[Canary deployment<br/>5% traffic first]
E[Blue-green for<br/>schema changes]
F[Real-time error rate<br/>monitoring]
end
subgraph "After Deployment"
G[Periodic drift detection<br/>every 5 minutes]
H[Memory monitoring<br/>every 60 seconds]
I[Connection pool metrics<br/>continuous]
end
A --> D
B --> E
C --> F
D --> G
E --> H
F --> I
style A fill:#3b48cc,color:#fff
style B fill:#3b48cc,color:#fff
style C fill:#3b48cc,color:#fff
style D fill:#d86613,color:#fff
style E fill:#d86613,color:#fff
style F fill:#d86613,color:#fff
style G fill:#1a8c1a,color:#fff
style H fill:#1a8c1a,color:#fff
style I fill:#1a8c1a,color:#fff
Key Takeaways
| # | Takeaway |
|---|---|
| 1 | Lambda cold starts are the top MCP latency killer -- Provisioned Concurrency, lazy imports, and connection pre-warming reduce cold start from 8s to <1s for MangaAssist's catalog search. |
| 2 | Unbounded in-memory caches cause predictable OOM on ECS -- the BoundedLRUCache with item count AND memory byte limits prevents the 3-4 day memory leak cycle. |
| 3 | Protocol version negotiation must be permissive -- servers should offer their highest compatible version, not reject unknown versions. A strict equality check caused 100% tool failure. |
| 4 | Connection pools need semaphore-based backpressure -- without it, 200 concurrent requests overwhelm a 20-connection Redis pool. The async with pool.connection() pattern prevents leaks in error paths. |
| 5 | Tool schema changes must be backward compatible -- never add required parameters (use defaults), alias renamed parameters (genre -> category), and run drift detection every 5 minutes. |
| 6 | Phase-separated connection usage prevents compound failures -- acquire Redis for cache check, release, then query OpenSearch, then acquire Redis again for cache write. Never hold connections across slow operations. |
| 7 | Three-tier health checks catch different failure modes -- liveness catches process death, readiness catches initialization failures, deep checks catch dependency degradation. |
| 8 | Every scenario has a monitoring signal that precedes the outage -- cold start duration bimodal distribution, linear memory growth, version negotiation failures, pool wait time increase, and schema hash mismatches are all detectable before user impact. |