MCP Server Architecture: Lambda MCP vs ECS MCP for FM Extension
MangaAssist context: JP Manga store chatbot on AWS — Bedrock Claude 3 (Sonnet at $3/$15 per 1M tokens input/output, Haiku at $0.25/$1.25), OpenSearch Serverless (vector store), DynamoDB (sessions/products), ECS Fargate (orchestrator), API Gateway WebSocket, ElastiCache Redis. Target: useful answer in under 3 seconds, 1M messages/day scale.
Skill Mapping
| Attribute | Detail |
|---|---|
| Skill | 2.1.7 |
| Description | Develop model extension frameworks to enhance FM capabilities |
| Sub-focus | Lambda MCP servers for lightweight tools, ECS MCP servers for complex tools |
| AWS Services | Lambda, ECS Fargate, Bedrock, API Gateway, ElastiCache Redis |
| MangaAssist Relevance | Extending Claude 3 with manga catalog search, order tracking, recommendation tools |
Mind Map
mindmap
root((MCP Server Architecture))
Protocol Fundamentals
JSON-RPC 2.0 Transport
Capability Negotiation
Tool Registration
Resource Exposure
Prompt Templates
Lambda MCP Servers
Lightweight Tools
Stateless Execution
Cold Start Management
Cost per Invocation
Auto-Scaling
ECS MCP Servers
Complex Tools
Stateful Connections
Persistent Processes
WebSocket Support
Connection Pooling
Server Lifecycle
Initialize
Capability Advertisement
Request Handling
Graceful Shutdown
MangaAssist Integration
Catalog Search Tool
Order Tracking Tool
Recommendation Engine
Session Context Tool
1. Model Context Protocol (MCP) Fundamentals
The Model Context Protocol (MCP) is an open standard that defines how foundation models communicate with external tools, data sources, and services. For MangaAssist, MCP enables Claude 3 to call structured tools like catalog search, order lookup, and recommendation generation without baking tool logic into prompt engineering.
1.1 MCP Protocol Structure
sequenceDiagram
participant Client as MCP Client<br/>(Bedrock Agent)
participant Transport as Transport Layer<br/>(HTTP/SSE or WebSocket)
participant Server as MCP Server<br/>(Lambda or ECS)
participant Backend as Backend Service<br/>(DynamoDB/OpenSearch)
Client->>Transport: initialize (protocol version, capabilities)
Transport->>Server: Forward initialize request
Server->>Transport: initialize response (server capabilities)
Transport->>Client: Capability advertisement
Note over Client,Server: Session Established
Client->>Transport: tools/list
Transport->>Server: Forward tools/list
Server->>Transport: Tool definitions (name, schema, description)
Transport->>Client: Available tools
Client->>Transport: tools/call (tool_name, arguments)
Transport->>Server: Forward tool invocation
Server->>Backend: Execute tool logic
Backend->>Server: Result data
Server->>Transport: Tool result (content array)
Transport->>Client: Structured response
1.2 MCP Message Format
Every MCP message uses JSON-RPC 2.0. Understanding the wire format is critical for debugging and for building compliant servers.
"""
MCP message format definitions for MangaAssist tool servers.
These dataclasses define the JSON-RPC 2.0 messages exchanged
between the Bedrock agent (client) and our tool servers.
"""
from dataclasses import dataclass, field, asdict
from typing import Any, Optional
import json
@dataclass
class MCPRequest:
"""JSON-RPC 2.0 request from MCP client to server."""
jsonrpc: str = "2.0"
id: Optional[int] = None
method: str = ""
params: dict = field(default_factory=dict)
def to_json(self) -> str:
return json.dumps(asdict(self), default=str)
@dataclass
class MCPResponse:
"""JSON-RPC 2.0 response from MCP server to client."""
jsonrpc: str = "2.0"
id: Optional[int] = None
result: Optional[dict] = None
error: Optional[dict] = None
def to_json(self) -> str:
payload = {"jsonrpc": self.jsonrpc, "id": self.id}
if self.error:
payload["error"] = self.error
else:
payload["result"] = self.result
return json.dumps(payload, default=str)
@dataclass
class MCPToolDefinition:
"""Definition of a single tool exposed by an MCP server."""
name: str
description: str
inputSchema: dict = field(default_factory=dict)
def to_dict(self) -> dict:
return asdict(self)
@dataclass
class MCPToolResult:
"""Result returned after tool invocation."""
content: list = field(default_factory=list)
isError: bool = False
def to_dict(self) -> dict:
return asdict(self)
# --- Example: MangaAssist tool definitions ---
MANGA_SEARCH_TOOL = MCPToolDefinition(
name="manga_catalog_search",
description="Search the manga catalog by title, author, genre, or ISBN. "
"Returns matching manga with prices, ratings, and availability.",
inputSchema={
"type": "object",
"properties": {
"query": {
"type": "string",
"description": "Search query (title, author name, or keywords)"
},
"genre": {
"type": "string",
"enum": ["shonen", "shojo", "seinen", "josei", "kodomo", "all"],
"description": "Filter by manga genre"
},
"max_results": {
"type": "integer",
"default": 5,
"minimum": 1,
"maximum": 20,
"description": "Maximum number of results to return"
}
},
"required": ["query"]
}
)
ORDER_TRACKING_TOOL = MCPToolDefinition(
name="order_tracking",
description="Look up order status and tracking information by order ID "
"or customer email. Returns shipping status and ETA.",
inputSchema={
"type": "object",
"properties": {
"order_id": {
"type": "string",
"description": "Order ID (format: MNG-XXXXXXXX)"
},
"customer_email": {
"type": "string",
"description": "Customer email for order lookup"
}
},
"oneOf": [
{"required": ["order_id"]},
{"required": ["customer_email"]}
]
}
)
2. Lambda MCP Server Architecture
Lambda MCP servers are the right choice for stateless, lightweight tools that can execute within Lambda's constraints (15 min timeout, 10 GB memory, ephemeral storage). For MangaAssist, catalog search and order tracking are ideal Lambda MCP candidates.
2.1 Architecture Overview
graph TB
subgraph "MCP Client Layer"
BA[Bedrock Agent<br/>Claude 3 Sonnet]
CL[MCP Client Library]
BA --> CL
end
subgraph "Transport Layer"
APIGW[API Gateway<br/>REST + Lambda Proxy]
FURL[Lambda Function URL<br/>Streaming Response]
end
subgraph "Lambda MCP Servers"
L1[Lambda: manga_catalog_search<br/>256 MB / 10s timeout]
L2[Lambda: order_tracking<br/>256 MB / 5s timeout]
L3[Lambda: price_check<br/>128 MB / 3s timeout]
L4[Lambda: inventory_status<br/>128 MB / 3s timeout]
end
subgraph "Backend Services"
OS[OpenSearch Serverless<br/>Vector + Keyword Search]
DDB[DynamoDB<br/>Orders + Products]
RC[ElastiCache Redis<br/>Cached Results]
end
CL --> APIGW
CL --> FURL
APIGW --> L1
APIGW --> L2
FURL --> L3
FURL --> L4
L1 --> OS
L1 --> RC
L2 --> DDB
L2 --> RC
L3 --> DDB
L4 --> DDB
style BA fill:#ff9900,color:#000
style L1 fill:#d86613,color:#fff
style L2 fill:#d86613,color:#fff
style L3 fill:#d86613,color:#fff
style L4 fill:#d86613,color:#fff
style OS fill:#c7131b,color:#fff
style DDB fill:#3b48cc,color:#fff
style RC fill:#1a8c1a,color:#fff
2.2 Production Lambda MCP Server
"""
Lambda MCP Server for MangaAssist catalog search.
Deployed as a Lambda function behind API Gateway.
Handles MCP protocol messages for the manga_catalog_search tool.
"""
import json
import logging
import time
import os
from typing import Any
import boto3
from opensearchpy import OpenSearch, RequestsHttpConnection
from requests_aws4auth import AWS4Auth
logger = logging.getLogger()
logger.setLevel(logging.INFO)
# ---------- Module-level initialization (reused across warm invocations) ----------
REGION = os.environ.get("AWS_REGION", "ap-northeast-1")
OPENSEARCH_ENDPOINT = os.environ["OPENSEARCH_ENDPOINT"]
OPENSEARCH_INDEX = os.environ.get("OPENSEARCH_INDEX", "manga-catalog")
REDIS_ENDPOINT = os.environ.get("REDIS_ENDPOINT", "")
CACHE_TTL = int(os.environ.get("CACHE_TTL", "300"))
# OpenSearch client initialized once per container
credentials = boto3.Session().get_credentials()
aws_auth = AWS4Auth(
credentials.access_key,
credentials.secret_key,
REGION,
"aoss",
session_token=credentials.token,
)
opensearch_client = OpenSearch(
hosts=[{"host": OPENSEARCH_ENDPOINT, "port": 443}],
http_auth=aws_auth,
use_ssl=True,
verify_certs=True,
connection_class=RequestsHttpConnection,
timeout=5,
)
# Redis client for caching (optional)
redis_client = None
if REDIS_ENDPOINT:
import redis
redis_client = redis.Redis(
host=REDIS_ENDPOINT, port=6379, decode_responses=True, socket_timeout=1
)
# ---------- MCP Protocol Constants ----------
PROTOCOL_VERSION = "2024-11-05"
SERVER_NAME = "mangaassist-catalog-search"
SERVER_VERSION = "1.0.0"
TOOL_DEFINITIONS = [
{
"name": "manga_catalog_search",
"description": (
"Search the MangaAssist catalog by title, author, genre, or ISBN. "
"Returns matching manga with prices, ratings, and stock availability. "
"Supports both keyword and semantic search."
),
"inputSchema": {
"type": "object",
"properties": {
"query": {
"type": "string",
"description": "Search query: title, author, keywords, or ISBN"
},
"genre": {
"type": "string",
"enum": [
"shonen", "shojo", "seinen", "josei", "kodomo", "all"
],
"default": "all",
"description": "Filter by manga genre category"
},
"max_results": {
"type": "integer",
"default": 5,
"minimum": 1,
"maximum": 20,
"description": "Maximum results to return"
},
"in_stock_only": {
"type": "boolean",
"default": False,
"description": "Filter to only in-stock items"
}
},
"required": ["query"]
}
}
]
# ---------- MCP Request Handlers ----------
def handle_initialize(params: dict) -> dict:
"""Handle MCP initialize request. Validate protocol version and
return server capabilities."""
client_version = params.get("protocolVersion", "")
logger.info(f"MCP initialize: client version={client_version}")
return {
"protocolVersion": PROTOCOL_VERSION,
"capabilities": {
"tools": {"listChanged": False},
},
"serverInfo": {
"name": SERVER_NAME,
"version": SERVER_VERSION
}
}
def handle_tools_list(params: dict) -> dict:
"""Return all tools this server exposes."""
return {"tools": TOOL_DEFINITIONS}
def handle_tools_call(params: dict) -> dict:
"""Dispatch tool invocation to the appropriate handler."""
tool_name = params.get("name", "")
arguments = params.get("arguments", {})
if tool_name == "manga_catalog_search":
return _execute_catalog_search(arguments)
else:
return {
"content": [{"type": "text", "text": f"Unknown tool: {tool_name}"}],
"isError": True
}
def _execute_catalog_search(arguments: dict) -> dict:
"""Execute manga catalog search against OpenSearch with Redis caching."""
query = arguments.get("query", "")
genre = arguments.get("genre", "all")
max_results = min(arguments.get("max_results", 5), 20)
in_stock_only = arguments.get("in_stock_only", False)
start_time = time.time()
# --- Check Redis cache ---
cache_key = f"search:{query}:{genre}:{max_results}:{in_stock_only}"
if redis_client:
try:
cached = redis_client.get(cache_key)
if cached:
logger.info(f"Cache hit for query='{query}'")
elapsed = (time.time() - start_time) * 1000
result = json.loads(cached)
result["metadata"]["cache_hit"] = True
result["metadata"]["latency_ms"] = round(elapsed, 1)
return {
"content": [{"type": "text", "text": json.dumps(result, ensure_ascii=False)}],
"isError": False
}
except Exception as e:
logger.warning(f"Redis cache read failed: {e}")
# --- Build OpenSearch query ---
must_clauses = [
{
"multi_match": {
"query": query,
"fields": [
"title^3", "title_jp^3", "author^2",
"description", "tags"
],
"type": "best_fields",
"fuzziness": "AUTO"
}
}
]
filter_clauses = []
if genre and genre != "all":
filter_clauses.append({"term": {"genre": genre}})
if in_stock_only:
filter_clauses.append({"range": {"stock_count": {"gt": 0}}})
search_body = {
"size": max_results,
"query": {
"bool": {
"must": must_clauses,
"filter": filter_clauses
}
},
"_source": [
"title", "title_jp", "author", "genre", "price_jpy",
"rating", "stock_count", "isbn", "cover_url", "volumes"
],
"sort": [{"_score": "desc"}, {"rating": "desc"}]
}
# --- Execute search ---
try:
response = opensearch_client.search(
index=OPENSEARCH_INDEX, body=search_body
)
except Exception as e:
logger.error(f"OpenSearch query failed: {e}")
return {
"content": [{"type": "text", "text": f"Search failed: {str(e)}"}],
"isError": True
}
# --- Format results ---
hits = response.get("hits", {}).get("hits", [])
results = []
for hit in hits:
src = hit["_source"]
results.append({
"title": src.get("title", ""),
"title_jp": src.get("title_jp", ""),
"author": src.get("author", ""),
"genre": src.get("genre", ""),
"price_jpy": src.get("price_jpy", 0),
"rating": src.get("rating", 0),
"in_stock": src.get("stock_count", 0) > 0,
"stock_count": src.get("stock_count", 0),
"isbn": src.get("isbn", ""),
"volumes": src.get("volumes", 1),
"relevance_score": round(hit.get("_score", 0), 2)
})
elapsed = (time.time() - start_time) * 1000
result_payload = {
"results": results,
"total_found": response.get("hits", {}).get("total", {}).get("value", 0),
"metadata": {
"query": query,
"genre_filter": genre,
"latency_ms": round(elapsed, 1),
"cache_hit": False,
"source": "opensearch"
}
}
# --- Cache result ---
if redis_client:
try:
redis_client.setex(cache_key, CACHE_TTL, json.dumps(result_payload, ensure_ascii=False))
except Exception as e:
logger.warning(f"Redis cache write failed: {e}")
return {
"content": [
{"type": "text", "text": json.dumps(result_payload, ensure_ascii=False)}
],
"isError": False
}
# ---------- MCP Method Router ----------
MCP_HANDLERS = {
"initialize": handle_initialize,
"notifications/initialized": lambda p: None, # Acknowledge, no response
"tools/list": handle_tools_list,
"tools/call": handle_tools_call,
}
# ---------- Lambda Entry Point ----------
def lambda_handler(event: dict, context: Any) -> dict:
"""
AWS Lambda handler for MCP protocol messages.
Expects JSON-RPC 2.0 messages via API Gateway proxy integration.
"""
try:
# Parse incoming MCP message
if isinstance(event.get("body"), str):
body = json.loads(event["body"])
else:
body = event.get("body", event)
method = body.get("method", "")
msg_id = body.get("id")
params = body.get("params", {})
logger.info(f"MCP request: method={method}, id={msg_id}")
handler = MCP_HANDLERS.get(method)
if handler is None:
error_response = {
"jsonrpc": "2.0",
"id": msg_id,
"error": {
"code": -32601,
"message": f"Method not found: {method}"
}
}
return _api_response(400, error_response)
result = handler(params)
# Notifications have no id and expect no response
if msg_id is None:
return _api_response(204, "")
response = {
"jsonrpc": "2.0",
"id": msg_id,
"result": result
}
return _api_response(200, response)
except json.JSONDecodeError as e:
logger.error(f"Invalid JSON: {e}")
return _api_response(400, {
"jsonrpc": "2.0",
"id": None,
"error": {"code": -32700, "message": "Parse error"}
})
except Exception as e:
logger.error(f"Internal error: {e}", exc_info=True)
return _api_response(500, {
"jsonrpc": "2.0",
"id": body.get("id") if "body" in dir() else None,
"error": {"code": -32603, "message": "Internal error"}
})
def _api_response(status_code: int, body: Any) -> dict:
"""Format Lambda proxy integration response."""
return {
"statusCode": status_code,
"headers": {
"Content-Type": "application/json",
"X-MCP-Server": SERVER_NAME,
"X-MCP-Version": SERVER_VERSION,
},
"body": json.dumps(body, default=str) if body else ""
}
2.3 Lambda MCP Infrastructure (CDK)
"""
CDK stack for deploying Lambda MCP servers for MangaAssist.
Defines the Lambda functions, API Gateway, and IAM roles.
"""
from aws_cdk import (
Stack, Duration, RemovalPolicy, CfnOutput,
aws_lambda as _lambda,
aws_apigateway as apigw,
aws_iam as iam,
aws_logs as logs,
)
from constructs import Construct
class LambdaMCPStack(Stack):
def __init__(self, scope: Construct, id: str, **kwargs):
super().__init__(scope, id, **kwargs)
# --- Shared Lambda layer with MCP dependencies ---
mcp_layer = _lambda.LayerVersion(
self, "MCPDependenciesLayer",
code=_lambda.Code.from_asset("layers/mcp-deps"),
compatible_runtimes=[_lambda.Runtime.PYTHON_3_12],
description="opensearch-py, redis, requests-aws4auth",
)
# --- Catalog search Lambda MCP server ---
catalog_search_fn = _lambda.Function(
self, "CatalogSearchMCP",
function_name="mangaassist-mcp-catalog-search",
runtime=_lambda.Runtime.PYTHON_3_12,
handler="catalog_search_mcp.lambda_handler",
code=_lambda.Code.from_asset("src/mcp_servers/catalog_search"),
memory_size=256,
timeout=Duration.seconds(10),
layers=[mcp_layer],
environment={
"OPENSEARCH_ENDPOINT": "search-manga-xxxxx.ap-northeast-1.aoss.amazonaws.com",
"OPENSEARCH_INDEX": "manga-catalog",
"REDIS_ENDPOINT": "manga-cache.xxxxx.apne1.cache.amazonaws.com",
"CACHE_TTL": "300",
"LOG_LEVEL": "INFO",
},
tracing=_lambda.Tracing.ACTIVE,
log_retention=logs.RetentionDays.TWO_WEEKS,
)
# --- Order tracking Lambda MCP server ---
order_tracking_fn = _lambda.Function(
self, "OrderTrackingMCP",
function_name="mangaassist-mcp-order-tracking",
runtime=_lambda.Runtime.PYTHON_3_12,
handler="order_tracking_mcp.lambda_handler",
code=_lambda.Code.from_asset("src/mcp_servers/order_tracking"),
memory_size=256,
timeout=Duration.seconds(5),
layers=[mcp_layer],
environment={
"ORDERS_TABLE": "mangaassist-orders",
"REDIS_ENDPOINT": "manga-cache.xxxxx.apne1.cache.amazonaws.com",
"CACHE_TTL": "60",
},
tracing=_lambda.Tracing.ACTIVE,
log_retention=logs.RetentionDays.TWO_WEEKS,
)
# --- API Gateway for MCP endpoints ---
api = apigw.RestApi(
self, "MCPApi",
rest_api_name="mangaassist-mcp-api",
deploy_options=apigw.StageOptions(
stage_name="prod",
throttling_rate_limit=1000,
throttling_burst_limit=500,
logging_level=apigw.MethodLoggingLevel.INFO,
metrics_enabled=True,
),
)
mcp_resource = api.root.add_resource("mcp")
catalog_resource = mcp_resource.add_resource("catalog-search")
order_resource = mcp_resource.add_resource("order-tracking")
catalog_resource.add_method(
"POST",
apigw.LambdaIntegration(catalog_search_fn, proxy=True),
)
order_resource.add_method(
"POST",
apigw.LambdaIntegration(order_tracking_fn, proxy=True),
)
# --- Grant permissions ---
catalog_search_fn.add_to_role_policy(iam.PolicyStatement(
actions=["aoss:APIAccessAll"],
resources=["arn:aws:aoss:ap-northeast-1:*:collection/*"],
))
order_tracking_fn.add_to_role_policy(iam.PolicyStatement(
actions=["dynamodb:GetItem", "dynamodb:Query"],
resources=["arn:aws:dynamodb:ap-northeast-1:*:table/mangaassist-orders*"],
))
CfnOutput(self, "MCPApiUrl", value=api.url)
3. ECS MCP Server Architecture
ECS MCP servers handle complex, stateful, or long-running tools. For MangaAssist, the recommendation engine (which loads ML models, maintains user preference caches, and streams results) runs on ECS Fargate.
3.1 Architecture Overview
graph TB
subgraph "MCP Client Layer"
BA2[Bedrock Agent<br/>Claude 3 Sonnet]
CL2[MCP Client Library]
BA2 --> CL2
end
subgraph "Load Balancing"
ALB[Application Load Balancer<br/>WebSocket + HTTP/2]
TG[Target Group<br/>Health: /mcp/health]
end
subgraph "ECS Fargate Cluster"
subgraph "Service: Recommendation MCP"
T1[Task 1<br/>1 vCPU / 2 GB]
T2[Task 2<br/>1 vCPU / 2 GB]
T3[Task 3<br/>1 vCPU / 2 GB]
end
subgraph "Service: Session Context MCP"
T4[Task 4<br/>0.5 vCPU / 1 GB]
T5[Task 5<br/>0.5 vCPU / 1 GB]
end
end
subgraph "Backend Services"
OS2[OpenSearch Serverless<br/>Vector Similarity]
DDB2[DynamoDB<br/>User Preferences]
RC2[ElastiCache Redis<br/>Session State + Model Cache]
S3[S3<br/>Model Artifacts]
end
CL2 --> ALB
ALB --> TG
TG --> T1
TG --> T2
TG --> T3
TG --> T4
TG --> T5
T1 --> OS2
T1 --> DDB2
T1 --> RC2
T2 --> OS2
T2 --> RC2
T3 --> S3
T4 --> DDB2
T4 --> RC2
T5 --> DDB2
T5 --> RC2
style BA2 fill:#ff9900,color:#000
style ALB fill:#8c4fff,color:#fff
style T1 fill:#d86613,color:#fff
style T2 fill:#d86613,color:#fff
style T3 fill:#d86613,color:#fff
style T4 fill:#1a8c1a,color:#fff
style T5 fill:#1a8c1a,color:#fff
3.2 Production ECS MCP Server
"""
ECS MCP Server for MangaAssist recommendation engine.
Runs as a persistent Fargate task with SSE (Server-Sent Events) transport.
Maintains in-memory model cache and connection state.
"""
import asyncio
import json
import logging
import os
import signal
import time
from contextlib import asynccontextmanager
from typing import Any, AsyncIterator
import uvicorn
from starlette.applications import Starlette
from starlette.requests import Request
from starlette.responses import JSONResponse, StreamingResponse
from starlette.routing import Route
import boto3
import redis.asyncio as aioredis
from opensearchpy import AsyncOpenSearch, AsyncHttpConnection
logger = logging.getLogger("mcp-recommendation")
logging.basicConfig(level=logging.INFO, format="%(asctime)s %(name)s %(levelname)s %(message)s")
# ---------- Configuration ----------
REGION = os.environ.get("AWS_REGION", "ap-northeast-1")
OPENSEARCH_ENDPOINT = os.environ["OPENSEARCH_ENDPOINT"]
REDIS_ENDPOINT = os.environ["REDIS_ENDPOINT"]
PORT = int(os.environ.get("PORT", "8080"))
PROTOCOL_VERSION = "2024-11-05"
SERVER_NAME = "mangaassist-recommendation-engine"
SERVER_VERSION = "2.0.0"
# ---------- Stateful Resources (initialized at startup, reused) ----------
class ServerState:
"""Holds long-lived connections and cached resources."""
opensearch: AsyncOpenSearch = None
redis: aioredis.Redis = None
genre_embeddings: dict = {} # Preloaded genre centroids
startup_time: float = 0
request_count: int = 0
active_sessions: dict = {} # session_id -> last_access
state = ServerState()
async def initialize_resources():
"""Initialize backend connections and preload model data."""
logger.info("Initializing server resources...")
state.startup_time = time.time()
# Async OpenSearch client
state.opensearch = AsyncOpenSearch(
hosts=[{"host": OPENSEARCH_ENDPOINT, "port": 443}],
use_ssl=True,
verify_certs=True,
connection_class=AsyncHttpConnection,
timeout=10,
)
# Async Redis client
state.redis = aioredis.Redis(
host=REDIS_ENDPOINT, port=6379,
decode_responses=True, socket_timeout=2
)
# Preload genre embeddings for faster similarity search
try:
genres = ["shonen", "shojo", "seinen", "josei", "kodomo"]
for genre in genres:
cached = await state.redis.get(f"genre_embedding:{genre}")
if cached:
state.genre_embeddings[genre] = json.loads(cached)
logger.info(f"Preloaded {len(state.genre_embeddings)} genre embeddings")
except Exception as e:
logger.warning(f"Genre embedding preload failed: {e}")
logger.info("Server resources initialized")
async def cleanup_resources():
"""Gracefully close all connections."""
logger.info("Cleaning up server resources...")
if state.opensearch:
await state.opensearch.close()
if state.redis:
await state.redis.close()
logger.info("Cleanup complete")
# ---------- Tool Definitions ----------
TOOL_DEFINITIONS = [
{
"name": "manga_recommendations",
"description": (
"Generate personalized manga recommendations based on user "
"reading history, preferences, and current trends. Uses "
"collaborative filtering and content-based similarity."
),
"inputSchema": {
"type": "object",
"properties": {
"user_id": {
"type": "string",
"description": "User ID for personalized recommendations"
},
"seed_titles": {
"type": "array",
"items": {"type": "string"},
"description": "Manga titles the user likes (for cold start)"
},
"genre_preference": {
"type": "string",
"enum": ["shonen", "shojo", "seinen", "josei", "kodomo", "any"],
"default": "any"
},
"count": {
"type": "integer",
"default": 5,
"minimum": 1,
"maximum": 20
},
"exclude_owned": {
"type": "boolean",
"default": True,
"description": "Exclude manga already purchased by user"
}
},
"required": ["user_id"]
}
},
{
"name": "reading_list_analysis",
"description": (
"Analyze a user's reading list to identify patterns, "
"preferred genres, favorite authors, and suggest "
"curated collections."
),
"inputSchema": {
"type": "object",
"properties": {
"user_id": {
"type": "string",
"description": "User ID to analyze"
},
"include_trends": {
"type": "boolean",
"default": True,
"description": "Include trending manga in same genres"
}
},
"required": ["user_id"]
}
}
]
# ---------- Tool Execution ----------
async def execute_manga_recommendations(arguments: dict) -> dict:
"""Generate personalized manga recommendations."""
user_id = arguments["user_id"]
seed_titles = arguments.get("seed_titles", [])
genre_pref = arguments.get("genre_preference", "any")
count = min(arguments.get("count", 5), 20)
exclude_owned = arguments.get("exclude_owned", True)
start = time.time()
# Fetch user reading history from Redis
history_key = f"user_history:{user_id}"
user_history = []
try:
raw_history = await state.redis.lrange(history_key, 0, 49)
user_history = [json.loads(h) for h in raw_history]
except Exception as e:
logger.warning(f"Failed to fetch user history: {e}")
# Build recommendation query using user history or seed titles
if user_history:
# Use collaborative filtering via OpenSearch kNN
recent_isbns = [h["isbn"] for h in user_history[:10]]
query_vector = await _get_aggregate_vector(recent_isbns)
elif seed_titles:
query_vector = await _get_title_vector(seed_titles)
else:
# Cold start: use popular items in genre
query_vector = state.genre_embeddings.get(genre_pref)
if query_vector is None:
return {
"content": [{"type": "text", "text": json.dumps({
"recommendations": [],
"reason": "Insufficient data for personalization. Ask the user about their preferences."
}, ensure_ascii=False)}],
"isError": False
}
# OpenSearch kNN search
owned_isbns = set()
if exclude_owned and user_history:
owned_isbns = {h["isbn"] for h in user_history}
filter_clauses = []
if genre_pref and genre_pref != "any":
filter_clauses.append({"term": {"genre": genre_pref}})
if owned_isbns:
filter_clauses.append({"bool": {"must_not": [
{"terms": {"isbn": list(owned_isbns)}}
]}})
search_body = {
"size": count,
"query": {
"knn": {
"embedding": {
"vector": query_vector,
"k": count * 2,
"filter": {"bool": {"filter": filter_clauses}} if filter_clauses else None
}
}
},
"_source": [
"title", "title_jp", "author", "genre", "price_jpy",
"rating", "stock_count", "isbn", "volumes", "synopsis"
]
}
try:
response = await state.opensearch.search(
index="manga-catalog", body=search_body
)
except Exception as e:
logger.error(f"OpenSearch kNN search failed: {e}")
return {
"content": [{"type": "text", "text": f"Recommendation search failed: {e}"}],
"isError": True
}
hits = response.get("hits", {}).get("hits", [])
recommendations = []
for hit in hits:
src = hit["_source"]
recommendations.append({
"title": src.get("title", ""),
"title_jp": src.get("title_jp", ""),
"author": src.get("author", ""),
"genre": src.get("genre", ""),
"price_jpy": src.get("price_jpy", 0),
"rating": src.get("rating", 0),
"volumes": src.get("volumes", 1),
"synopsis": src.get("synopsis", "")[:200],
"similarity_score": round(hit.get("_score", 0), 3),
"in_stock": src.get("stock_count", 0) > 0
})
elapsed = (time.time() - start) * 1000
return {
"content": [{
"type": "text",
"text": json.dumps({
"recommendations": recommendations,
"personalization": "history" if user_history else ("seeds" if seed_titles else "popular"),
"metadata": {
"user_id": user_id,
"latency_ms": round(elapsed, 1),
"history_depth": len(user_history),
}
}, ensure_ascii=False)
}],
"isError": False
}
async def _get_aggregate_vector(isbns: list) -> list:
"""Compute average embedding vector from multiple ISBNs."""
vectors = []
for isbn in isbns:
cached = await state.redis.get(f"embedding:{isbn}")
if cached:
vectors.append(json.loads(cached))
if not vectors:
return None
# Element-wise average
dim = len(vectors[0])
avg = [sum(v[i] for v in vectors) / len(vectors) for i in range(dim)]
return avg
async def _get_title_vector(titles: list) -> list:
"""Get embedding vector for title text via OpenSearch."""
# Placeholder: in production, call a Bedrock embedding model
return None
# ---------- MCP Route Handlers ----------
TOOL_HANDLERS = {
"manga_recommendations": execute_manga_recommendations,
"reading_list_analysis": lambda args: asyncio.coroutine(lambda: {
"content": [{"type": "text", "text": "Analysis placeholder"}],
"isError": False
})(),
}
async def mcp_endpoint(request: Request) -> JSONResponse:
"""Main MCP JSON-RPC endpoint."""
state.request_count += 1
try:
body = await request.json()
except Exception:
return JSONResponse(
{"jsonrpc": "2.0", "id": None, "error": {"code": -32700, "message": "Parse error"}},
status_code=400
)
method = body.get("method", "")
msg_id = body.get("id")
params = body.get("params", {})
logger.info(f"MCP request: method={method} id={msg_id}")
if method == "initialize":
result = {
"protocolVersion": PROTOCOL_VERSION,
"capabilities": {
"tools": {"listChanged": False},
},
"serverInfo": {"name": SERVER_NAME, "version": SERVER_VERSION}
}
elif method == "notifications/initialized":
return JSONResponse(status_code=204, content=None)
elif method == "tools/list":
result = {"tools": TOOL_DEFINITIONS}
elif method == "tools/call":
tool_name = params.get("name", "")
arguments = params.get("arguments", {})
handler = TOOL_HANDLERS.get(tool_name)
if handler is None:
return JSONResponse({
"jsonrpc": "2.0", "id": msg_id,
"error": {"code": -32601, "message": f"Unknown tool: {tool_name}"}
}, status_code=400)
result = await handler(arguments)
else:
return JSONResponse({
"jsonrpc": "2.0", "id": msg_id,
"error": {"code": -32601, "message": f"Method not found: {method}"}
}, status_code=400)
return JSONResponse({"jsonrpc": "2.0", "id": msg_id, "result": result})
async def health_endpoint(request: Request) -> JSONResponse:
"""Health check for ALB target group."""
checks = {"server": "ok", "uptime_s": round(time.time() - state.startup_time)}
try:
await state.redis.ping()
checks["redis"] = "ok"
except Exception:
checks["redis"] = "degraded"
try:
await state.opensearch.info()
checks["opensearch"] = "ok"
except Exception:
checks["opensearch"] = "degraded"
all_ok = all(v == "ok" for k, v in checks.items() if k != "uptime_s")
status = 200 if all_ok else 503
checks["status"] = "healthy" if all_ok else "degraded"
checks["request_count"] = state.request_count
return JSONResponse(checks, status_code=status)
# ---------- Application Setup ----------
@asynccontextmanager
async def lifespan(app):
"""Manage startup and shutdown of server resources."""
await initialize_resources()
yield
await cleanup_resources()
app = Starlette(
routes=[
Route("/mcp", mcp_endpoint, methods=["POST"]),
Route("/mcp/health", health_endpoint, methods=["GET"]),
],
lifespan=lifespan,
)
if __name__ == "__main__":
uvicorn.run(app, host="0.0.0.0", port=PORT, log_level="info")
4. Lambda MCP vs ECS MCP Comparison
| Dimension | Lambda MCP Server | ECS MCP Server |
|---|---|---|
| Use Case | Lightweight, stateless tools (search, lookup) | Complex, stateful tools (recommendations, analysis) |
| Startup | Cold start 200-800ms; warm <50ms | Container start 30-60s; then always warm |
| Max Duration | 15 minutes | Unlimited (persistent process) |
| Memory | 128 MB - 10 GB | 512 MB - 30 GB per task |
| Connections | New per invocation (or reused in warm) | Persistent connection pools |
| State | Ephemeral (module-level caching in warm) | In-memory caches, preloaded models |
| Transport | HTTP request/response | HTTP, WebSocket, SSE |
| Scaling | Instant (concurrent executions) | Slower (ECS task provisioning ~30s) |
| Cost Model | Per-invocation (cheap at low volume) | Per-hour (cheaper at high volume) |
| MangaAssist Tool | Catalog search, order tracking, price check | Recommendations, reading analysis, session context |
| Latency Budget | <500ms per tool call | <1000ms per tool call |
| Cost at 1M calls/day | ~$200-400/month (256MB, 500ms avg) | ~$150-250/month (2 tasks, 1 vCPU each) |
4.1 Decision Framework
flowchart TD
START[New MCP Tool Needed] --> Q1{Stateful?<br/>Needs persistent<br/>connections or cache?}
Q1 -->|No| Q2{Execution time<br/> under 10 seconds?}
Q1 -->|Yes| ECS[Deploy as ECS MCP Server]
Q2 -->|Yes| Q3{Memory under 512 MB?}
Q2 -->|No| ECS
Q3 -->|Yes| LAMBDA[Deploy as Lambda MCP Server]
Q3 -->|No| Q4{Cold start<br/>acceptable?}
Q4 -->|Yes| LAMBDA
Q4 -->|No| ECS
LAMBDA --> L_DETAIL["Lambda MCP<br/>- API Gateway or Function URL<br/>- Provisioned concurrency for latency<br/>- Module-level caching<br/>- 256-512 MB memory"]
ECS --> E_DETAIL["ECS MCP<br/>- ALB with health checks<br/>- Auto-scaling on CPU/connections<br/>- Persistent connection pools<br/>- Preloaded models/embeddings"]
style START fill:#232f3e,color:#fff
style LAMBDA fill:#d86613,color:#fff
style ECS fill:#1a8c1a,color:#fff
style L_DETAIL fill:#d86613,color:#fff
style E_DETAIL fill:#1a8c1a,color:#fff
5. MCP Server Lifecycle
stateDiagram-v2
[*] --> Initializing: Server starts
Initializing --> Ready: Resources loaded,<br/>capabilities registered
Ready --> Processing: tools/call received
Processing --> Ready: Response sent
Ready --> Draining: Shutdown signal<br/>(SIGTERM)
Processing --> Draining: Finish current,<br/>reject new
Draining --> ShuttingDown: Active requests = 0
ShuttingDown --> [*]: Connections closed,<br/>cleanup complete
note right of Initializing
Lambda: module-level init
ECS: lifespan startup
end note
note right of Draining
Lambda: N/A (per-invocation)
ECS: ALB deregistration delay
end note
5.1 Lifecycle Differences
| Phase | Lambda MCP | ECS MCP |
|---|---|---|
| Initialize | Module-level code (first cold start) | lifespan startup coroutine |
| Capability Advertisement | On each initialize call |
On each initialize call (same protocol) |
| Request Handling | Single request per invocation | Concurrent requests via async handlers |
| Connection Reuse | Warm container reuses module-level clients | Persistent connection pools across requests |
| Shutdown | Container frozen/destroyed by Lambda | SIGTERM -> drain -> close connections |
| Health Checks | N/A (managed by Lambda service) | ALB health check endpoint /mcp/health |
6. MCP Client Library for Consistent Access
The MCP client library abstracts whether a tool runs on Lambda or ECS, giving the Bedrock orchestrator a uniform interface.
"""
MCP Client Library for MangaAssist.
Provides a consistent interface to call MCP tools regardless
of whether they run on Lambda or ECS.
"""
import json
import logging
import time
from dataclasses import dataclass, field
from typing import Optional
from enum import Enum
import httpx
logger = logging.getLogger("mcp-client")
class MCPServerType(Enum):
LAMBDA = "lambda"
ECS = "ecs"
@dataclass
class MCPServerConfig:
"""Configuration for an MCP server endpoint."""
name: str
endpoint: str
server_type: MCPServerType
timeout_ms: int = 5000
retry_count: int = 2
retry_delay_ms: int = 100
@dataclass
class MCPToolCall:
"""Result of an MCP tool invocation."""
tool_name: str
content: list = field(default_factory=list)
is_error: bool = False
latency_ms: float = 0
server_name: str = ""
protocol_version: str = ""
class MCPClient:
"""Unified MCP client that routes tool calls to the correct server."""
def __init__(self, servers: list[MCPServerConfig]):
self._servers = {s.name: s for s in servers}
self._tool_registry: dict[str, str] = {} # tool_name -> server_name
self._http = httpx.AsyncClient(timeout=30)
self._initialized_servers: set = set()
self._request_id = 0
async def discover_tools(self) -> dict[str, list]:
"""Initialize all servers and discover available tools."""
all_tools = {}
for name, config in self._servers.items():
try:
# Initialize
init_result = await self._send_request(config, "initialize", {
"protocolVersion": "2024-11-05",
"capabilities": {},
"clientInfo": {"name": "mangaassist-orchestrator", "version": "1.0.0"}
})
self._initialized_servers.add(name)
logger.info(f"Initialized MCP server: {name}")
# Send initialized notification
await self._send_notification(config, "notifications/initialized", {})
# List tools
tools_result = await self._send_request(config, "tools/list", {})
tools = tools_result.get("tools", [])
all_tools[name] = tools
# Register tool -> server mapping
for tool in tools:
self._tool_registry[tool["name"]] = name
logger.info(f"Registered tool: {tool['name']} -> {name}")
except Exception as e:
logger.error(f"Failed to initialize MCP server {name}: {e}")
return all_tools
async def call_tool(self, tool_name: str, arguments: dict) -> MCPToolCall:
"""Call a tool by name, routing to the correct MCP server."""
server_name = self._tool_registry.get(tool_name)
if not server_name:
return MCPToolCall(
tool_name=tool_name,
content=[{"type": "text", "text": f"Tool not found: {tool_name}"}],
is_error=True
)
config = self._servers[server_name]
start = time.time()
for attempt in range(config.retry_count + 1):
try:
result = await self._send_request(config, "tools/call", {
"name": tool_name,
"arguments": arguments
})
elapsed = (time.time() - start) * 1000
return MCPToolCall(
tool_name=tool_name,
content=result.get("content", []),
is_error=result.get("isError", False),
latency_ms=round(elapsed, 1),
server_name=server_name
)
except Exception as e:
if attempt < config.retry_count:
logger.warning(f"Retry {attempt+1} for {tool_name}: {e}")
await asyncio.sleep(config.retry_delay_ms / 1000)
else:
elapsed = (time.time() - start) * 1000
return MCPToolCall(
tool_name=tool_name,
content=[{"type": "text", "text": f"Tool call failed: {e}"}],
is_error=True,
latency_ms=round(elapsed, 1),
server_name=server_name
)
async def _send_request(self, config: MCPServerConfig, method: str, params: dict) -> dict:
"""Send a JSON-RPC request to an MCP server."""
self._request_id += 1
payload = {
"jsonrpc": "2.0",
"id": self._request_id,
"method": method,
"params": params
}
response = await self._http.post(
config.endpoint,
json=payload,
timeout=config.timeout_ms / 1000
)
response.raise_for_status()
body = response.json()
if "error" in body:
raise Exception(f"MCP error: {body['error']}")
return body.get("result", {})
async def _send_notification(self, config: MCPServerConfig, method: str, params: dict):
"""Send a JSON-RPC notification (no id, no response expected)."""
payload = {"jsonrpc": "2.0", "method": method, "params": params}
await self._http.post(config.endpoint, json=payload, timeout=5)
async def close(self):
await self._http.aclose()
# --- Usage Example ---
import asyncio
async def main():
client = MCPClient(servers=[
MCPServerConfig(
name="catalog-search",
endpoint="https://api.mangaassist.example.com/mcp/catalog-search",
server_type=MCPServerType.LAMBDA,
timeout_ms=5000
),
MCPServerConfig(
name="recommendation-engine",
endpoint="https://mcp-ecs.mangaassist.internal/mcp",
server_type=MCPServerType.ECS,
timeout_ms=10000
),
])
await client.discover_tools()
result = await client.call_tool("manga_catalog_search", {
"query": "Attack on Titan",
"genre": "shonen",
"max_results": 5
})
print(f"Search result ({result.latency_ms}ms): {result.content}")
result = await client.call_tool("manga_recommendations", {
"user_id": "user-12345",
"count": 3
})
print(f"Recommendations ({result.latency_ms}ms): {result.content}")
await client.close()
Key Takeaways
| # | Takeaway |
|---|---|
| 1 | MCP is JSON-RPC 2.0 based -- every message follows {jsonrpc, id, method, params} format with structured capability negotiation at initialization. |
| 2 | Lambda MCP servers excel at stateless tools -- catalog search and order tracking run in <500ms with module-level caching across warm invocations. |
| 3 | ECS MCP servers handle complex stateful tools -- the recommendation engine maintains persistent OpenSearch/Redis connections and preloaded embeddings. |
| 4 | Decision is driven by state, duration, and memory -- if a tool needs persistent connections, >10s execution, or >512MB memory, choose ECS. |
| 5 | The MCP client library abstracts deployment topology -- the orchestrator calls call_tool("name", args) without knowing if it hits Lambda or ECS. |
| 6 | Cost crossover at ~500K calls/day -- below that, Lambda per-invocation pricing wins; above it, ECS per-hour pricing is cheaper for MangaAssist's 1M/day target. |
| 7 | Health checks differ fundamentally -- Lambda relies on the Lambda service; ECS needs an explicit /mcp/health endpoint for ALB target group monitoring. |
| 8 | Server lifecycle management is critical -- ECS servers must handle SIGTERM gracefully (drain, close connections); Lambda handles this automatically via container freeze. |