LOCAL PREVIEW View on GitHub

MCP Server Architecture: Lambda MCP vs ECS MCP for FM Extension

MangaAssist context: JP Manga store chatbot on AWS — Bedrock Claude 3 (Sonnet at $3/$15 per 1M tokens input/output, Haiku at $0.25/$1.25), OpenSearch Serverless (vector store), DynamoDB (sessions/products), ECS Fargate (orchestrator), API Gateway WebSocket, ElastiCache Redis. Target: useful answer in under 3 seconds, 1M messages/day scale.


Skill Mapping

Attribute Detail
Skill 2.1.7
Description Develop model extension frameworks to enhance FM capabilities
Sub-focus Lambda MCP servers for lightweight tools, ECS MCP servers for complex tools
AWS Services Lambda, ECS Fargate, Bedrock, API Gateway, ElastiCache Redis
MangaAssist Relevance Extending Claude 3 with manga catalog search, order tracking, recommendation tools

Mind Map

mindmap
  root((MCP Server Architecture))
    Protocol Fundamentals
      JSON-RPC 2.0 Transport
      Capability Negotiation
      Tool Registration
      Resource Exposure
      Prompt Templates
    Lambda MCP Servers
      Lightweight Tools
      Stateless Execution
      Cold Start Management
      Cost per Invocation
      Auto-Scaling
    ECS MCP Servers
      Complex Tools
      Stateful Connections
      Persistent Processes
      WebSocket Support
      Connection Pooling
    Server Lifecycle
      Initialize
      Capability Advertisement
      Request Handling
      Graceful Shutdown
    MangaAssist Integration
      Catalog Search Tool
      Order Tracking Tool
      Recommendation Engine
      Session Context Tool

1. Model Context Protocol (MCP) Fundamentals

The Model Context Protocol (MCP) is an open standard that defines how foundation models communicate with external tools, data sources, and services. For MangaAssist, MCP enables Claude 3 to call structured tools like catalog search, order lookup, and recommendation generation without baking tool logic into prompt engineering.

1.1 MCP Protocol Structure

sequenceDiagram
    participant Client as MCP Client<br/>(Bedrock Agent)
    participant Transport as Transport Layer<br/>(HTTP/SSE or WebSocket)
    participant Server as MCP Server<br/>(Lambda or ECS)
    participant Backend as Backend Service<br/>(DynamoDB/OpenSearch)

    Client->>Transport: initialize (protocol version, capabilities)
    Transport->>Server: Forward initialize request
    Server->>Transport: initialize response (server capabilities)
    Transport->>Client: Capability advertisement

    Note over Client,Server: Session Established

    Client->>Transport: tools/list
    Transport->>Server: Forward tools/list
    Server->>Transport: Tool definitions (name, schema, description)
    Transport->>Client: Available tools

    Client->>Transport: tools/call (tool_name, arguments)
    Transport->>Server: Forward tool invocation
    Server->>Backend: Execute tool logic
    Backend->>Server: Result data
    Server->>Transport: Tool result (content array)
    Transport->>Client: Structured response

1.2 MCP Message Format

Every MCP message uses JSON-RPC 2.0. Understanding the wire format is critical for debugging and for building compliant servers.

"""
MCP message format definitions for MangaAssist tool servers.
These dataclasses define the JSON-RPC 2.0 messages exchanged
between the Bedrock agent (client) and our tool servers.
"""

from dataclasses import dataclass, field, asdict
from typing import Any, Optional
import json


@dataclass
class MCPRequest:
    """JSON-RPC 2.0 request from MCP client to server."""
    jsonrpc: str = "2.0"
    id: Optional[int] = None
    method: str = ""
    params: dict = field(default_factory=dict)

    def to_json(self) -> str:
        return json.dumps(asdict(self), default=str)


@dataclass
class MCPResponse:
    """JSON-RPC 2.0 response from MCP server to client."""
    jsonrpc: str = "2.0"
    id: Optional[int] = None
    result: Optional[dict] = None
    error: Optional[dict] = None

    def to_json(self) -> str:
        payload = {"jsonrpc": self.jsonrpc, "id": self.id}
        if self.error:
            payload["error"] = self.error
        else:
            payload["result"] = self.result
        return json.dumps(payload, default=str)


@dataclass
class MCPToolDefinition:
    """Definition of a single tool exposed by an MCP server."""
    name: str
    description: str
    inputSchema: dict = field(default_factory=dict)

    def to_dict(self) -> dict:
        return asdict(self)


@dataclass
class MCPToolResult:
    """Result returned after tool invocation."""
    content: list = field(default_factory=list)
    isError: bool = False

    def to_dict(self) -> dict:
        return asdict(self)


# --- Example: MangaAssist tool definitions ---

MANGA_SEARCH_TOOL = MCPToolDefinition(
    name="manga_catalog_search",
    description="Search the manga catalog by title, author, genre, or ISBN. "
                "Returns matching manga with prices, ratings, and availability.",
    inputSchema={
        "type": "object",
        "properties": {
            "query": {
                "type": "string",
                "description": "Search query (title, author name, or keywords)"
            },
            "genre": {
                "type": "string",
                "enum": ["shonen", "shojo", "seinen", "josei", "kodomo", "all"],
                "description": "Filter by manga genre"
            },
            "max_results": {
                "type": "integer",
                "default": 5,
                "minimum": 1,
                "maximum": 20,
                "description": "Maximum number of results to return"
            }
        },
        "required": ["query"]
    }
)

ORDER_TRACKING_TOOL = MCPToolDefinition(
    name="order_tracking",
    description="Look up order status and tracking information by order ID "
                "or customer email. Returns shipping status and ETA.",
    inputSchema={
        "type": "object",
        "properties": {
            "order_id": {
                "type": "string",
                "description": "Order ID (format: MNG-XXXXXXXX)"
            },
            "customer_email": {
                "type": "string",
                "description": "Customer email for order lookup"
            }
        },
        "oneOf": [
            {"required": ["order_id"]},
            {"required": ["customer_email"]}
        ]
    }
)

2. Lambda MCP Server Architecture

Lambda MCP servers are the right choice for stateless, lightweight tools that can execute within Lambda's constraints (15 min timeout, 10 GB memory, ephemeral storage). For MangaAssist, catalog search and order tracking are ideal Lambda MCP candidates.

2.1 Architecture Overview

graph TB
    subgraph "MCP Client Layer"
        BA[Bedrock Agent<br/>Claude 3 Sonnet]
        CL[MCP Client Library]
        BA --> CL
    end

    subgraph "Transport Layer"
        APIGW[API Gateway<br/>REST + Lambda Proxy]
        FURL[Lambda Function URL<br/>Streaming Response]
    end

    subgraph "Lambda MCP Servers"
        L1[Lambda: manga_catalog_search<br/>256 MB / 10s timeout]
        L2[Lambda: order_tracking<br/>256 MB / 5s timeout]
        L3[Lambda: price_check<br/>128 MB / 3s timeout]
        L4[Lambda: inventory_status<br/>128 MB / 3s timeout]
    end

    subgraph "Backend Services"
        OS[OpenSearch Serverless<br/>Vector + Keyword Search]
        DDB[DynamoDB<br/>Orders + Products]
        RC[ElastiCache Redis<br/>Cached Results]
    end

    CL --> APIGW
    CL --> FURL
    APIGW --> L1
    APIGW --> L2
    FURL --> L3
    FURL --> L4
    L1 --> OS
    L1 --> RC
    L2 --> DDB
    L2 --> RC
    L3 --> DDB
    L4 --> DDB

    style BA fill:#ff9900,color:#000
    style L1 fill:#d86613,color:#fff
    style L2 fill:#d86613,color:#fff
    style L3 fill:#d86613,color:#fff
    style L4 fill:#d86613,color:#fff
    style OS fill:#c7131b,color:#fff
    style DDB fill:#3b48cc,color:#fff
    style RC fill:#1a8c1a,color:#fff

2.2 Production Lambda MCP Server

"""
Lambda MCP Server for MangaAssist catalog search.
Deployed as a Lambda function behind API Gateway.
Handles MCP protocol messages for the manga_catalog_search tool.
"""

import json
import logging
import time
import os
from typing import Any

import boto3
from opensearchpy import OpenSearch, RequestsHttpConnection
from requests_aws4auth import AWS4Auth

logger = logging.getLogger()
logger.setLevel(logging.INFO)

# ---------- Module-level initialization (reused across warm invocations) ----------

REGION = os.environ.get("AWS_REGION", "ap-northeast-1")
OPENSEARCH_ENDPOINT = os.environ["OPENSEARCH_ENDPOINT"]
OPENSEARCH_INDEX = os.environ.get("OPENSEARCH_INDEX", "manga-catalog")
REDIS_ENDPOINT = os.environ.get("REDIS_ENDPOINT", "")
CACHE_TTL = int(os.environ.get("CACHE_TTL", "300"))

# OpenSearch client initialized once per container
credentials = boto3.Session().get_credentials()
aws_auth = AWS4Auth(
    credentials.access_key,
    credentials.secret_key,
    REGION,
    "aoss",
    session_token=credentials.token,
)
opensearch_client = OpenSearch(
    hosts=[{"host": OPENSEARCH_ENDPOINT, "port": 443}],
    http_auth=aws_auth,
    use_ssl=True,
    verify_certs=True,
    connection_class=RequestsHttpConnection,
    timeout=5,
)

# Redis client for caching (optional)
redis_client = None
if REDIS_ENDPOINT:
    import redis
    redis_client = redis.Redis(
        host=REDIS_ENDPOINT, port=6379, decode_responses=True, socket_timeout=1
    )

# ---------- MCP Protocol Constants ----------

PROTOCOL_VERSION = "2024-11-05"
SERVER_NAME = "mangaassist-catalog-search"
SERVER_VERSION = "1.0.0"

TOOL_DEFINITIONS = [
    {
        "name": "manga_catalog_search",
        "description": (
            "Search the MangaAssist catalog by title, author, genre, or ISBN. "
            "Returns matching manga with prices, ratings, and stock availability. "
            "Supports both keyword and semantic search."
        ),
        "inputSchema": {
            "type": "object",
            "properties": {
                "query": {
                    "type": "string",
                    "description": "Search query: title, author, keywords, or ISBN"
                },
                "genre": {
                    "type": "string",
                    "enum": [
                        "shonen", "shojo", "seinen", "josei", "kodomo", "all"
                    ],
                    "default": "all",
                    "description": "Filter by manga genre category"
                },
                "max_results": {
                    "type": "integer",
                    "default": 5,
                    "minimum": 1,
                    "maximum": 20,
                    "description": "Maximum results to return"
                },
                "in_stock_only": {
                    "type": "boolean",
                    "default": False,
                    "description": "Filter to only in-stock items"
                }
            },
            "required": ["query"]
        }
    }
]


# ---------- MCP Request Handlers ----------

def handle_initialize(params: dict) -> dict:
    """Handle MCP initialize request. Validate protocol version and
    return server capabilities."""
    client_version = params.get("protocolVersion", "")
    logger.info(f"MCP initialize: client version={client_version}")

    return {
        "protocolVersion": PROTOCOL_VERSION,
        "capabilities": {
            "tools": {"listChanged": False},
        },
        "serverInfo": {
            "name": SERVER_NAME,
            "version": SERVER_VERSION
        }
    }


def handle_tools_list(params: dict) -> dict:
    """Return all tools this server exposes."""
    return {"tools": TOOL_DEFINITIONS}


def handle_tools_call(params: dict) -> dict:
    """Dispatch tool invocation to the appropriate handler."""
    tool_name = params.get("name", "")
    arguments = params.get("arguments", {})

    if tool_name == "manga_catalog_search":
        return _execute_catalog_search(arguments)
    else:
        return {
            "content": [{"type": "text", "text": f"Unknown tool: {tool_name}"}],
            "isError": True
        }


def _execute_catalog_search(arguments: dict) -> dict:
    """Execute manga catalog search against OpenSearch with Redis caching."""
    query = arguments.get("query", "")
    genre = arguments.get("genre", "all")
    max_results = min(arguments.get("max_results", 5), 20)
    in_stock_only = arguments.get("in_stock_only", False)

    start_time = time.time()

    # --- Check Redis cache ---
    cache_key = f"search:{query}:{genre}:{max_results}:{in_stock_only}"
    if redis_client:
        try:
            cached = redis_client.get(cache_key)
            if cached:
                logger.info(f"Cache hit for query='{query}'")
                elapsed = (time.time() - start_time) * 1000
                result = json.loads(cached)
                result["metadata"]["cache_hit"] = True
                result["metadata"]["latency_ms"] = round(elapsed, 1)
                return {
                    "content": [{"type": "text", "text": json.dumps(result, ensure_ascii=False)}],
                    "isError": False
                }
        except Exception as e:
            logger.warning(f"Redis cache read failed: {e}")

    # --- Build OpenSearch query ---
    must_clauses = [
        {
            "multi_match": {
                "query": query,
                "fields": [
                    "title^3", "title_jp^3", "author^2",
                    "description", "tags"
                ],
                "type": "best_fields",
                "fuzziness": "AUTO"
            }
        }
    ]
    filter_clauses = []

    if genre and genre != "all":
        filter_clauses.append({"term": {"genre": genre}})
    if in_stock_only:
        filter_clauses.append({"range": {"stock_count": {"gt": 0}}})

    search_body = {
        "size": max_results,
        "query": {
            "bool": {
                "must": must_clauses,
                "filter": filter_clauses
            }
        },
        "_source": [
            "title", "title_jp", "author", "genre", "price_jpy",
            "rating", "stock_count", "isbn", "cover_url", "volumes"
        ],
        "sort": [{"_score": "desc"}, {"rating": "desc"}]
    }

    # --- Execute search ---
    try:
        response = opensearch_client.search(
            index=OPENSEARCH_INDEX, body=search_body
        )
    except Exception as e:
        logger.error(f"OpenSearch query failed: {e}")
        return {
            "content": [{"type": "text", "text": f"Search failed: {str(e)}"}],
            "isError": True
        }

    # --- Format results ---
    hits = response.get("hits", {}).get("hits", [])
    results = []
    for hit in hits:
        src = hit["_source"]
        results.append({
            "title": src.get("title", ""),
            "title_jp": src.get("title_jp", ""),
            "author": src.get("author", ""),
            "genre": src.get("genre", ""),
            "price_jpy": src.get("price_jpy", 0),
            "rating": src.get("rating", 0),
            "in_stock": src.get("stock_count", 0) > 0,
            "stock_count": src.get("stock_count", 0),
            "isbn": src.get("isbn", ""),
            "volumes": src.get("volumes", 1),
            "relevance_score": round(hit.get("_score", 0), 2)
        })

    elapsed = (time.time() - start_time) * 1000
    result_payload = {
        "results": results,
        "total_found": response.get("hits", {}).get("total", {}).get("value", 0),
        "metadata": {
            "query": query,
            "genre_filter": genre,
            "latency_ms": round(elapsed, 1),
            "cache_hit": False,
            "source": "opensearch"
        }
    }

    # --- Cache result ---
    if redis_client:
        try:
            redis_client.setex(cache_key, CACHE_TTL, json.dumps(result_payload, ensure_ascii=False))
        except Exception as e:
            logger.warning(f"Redis cache write failed: {e}")

    return {
        "content": [
            {"type": "text", "text": json.dumps(result_payload, ensure_ascii=False)}
        ],
        "isError": False
    }


# ---------- MCP Method Router ----------

MCP_HANDLERS = {
    "initialize": handle_initialize,
    "notifications/initialized": lambda p: None,  # Acknowledge, no response
    "tools/list": handle_tools_list,
    "tools/call": handle_tools_call,
}


# ---------- Lambda Entry Point ----------

def lambda_handler(event: dict, context: Any) -> dict:
    """
    AWS Lambda handler for MCP protocol messages.
    Expects JSON-RPC 2.0 messages via API Gateway proxy integration.
    """
    try:
        # Parse incoming MCP message
        if isinstance(event.get("body"), str):
            body = json.loads(event["body"])
        else:
            body = event.get("body", event)

        method = body.get("method", "")
        msg_id = body.get("id")
        params = body.get("params", {})

        logger.info(f"MCP request: method={method}, id={msg_id}")

        handler = MCP_HANDLERS.get(method)
        if handler is None:
            error_response = {
                "jsonrpc": "2.0",
                "id": msg_id,
                "error": {
                    "code": -32601,
                    "message": f"Method not found: {method}"
                }
            }
            return _api_response(400, error_response)

        result = handler(params)

        # Notifications have no id and expect no response
        if msg_id is None:
            return _api_response(204, "")

        response = {
            "jsonrpc": "2.0",
            "id": msg_id,
            "result": result
        }
        return _api_response(200, response)

    except json.JSONDecodeError as e:
        logger.error(f"Invalid JSON: {e}")
        return _api_response(400, {
            "jsonrpc": "2.0",
            "id": None,
            "error": {"code": -32700, "message": "Parse error"}
        })
    except Exception as e:
        logger.error(f"Internal error: {e}", exc_info=True)
        return _api_response(500, {
            "jsonrpc": "2.0",
            "id": body.get("id") if "body" in dir() else None,
            "error": {"code": -32603, "message": "Internal error"}
        })


def _api_response(status_code: int, body: Any) -> dict:
    """Format Lambda proxy integration response."""
    return {
        "statusCode": status_code,
        "headers": {
            "Content-Type": "application/json",
            "X-MCP-Server": SERVER_NAME,
            "X-MCP-Version": SERVER_VERSION,
        },
        "body": json.dumps(body, default=str) if body else ""
    }

2.3 Lambda MCP Infrastructure (CDK)

"""
CDK stack for deploying Lambda MCP servers for MangaAssist.
Defines the Lambda functions, API Gateway, and IAM roles.
"""

from aws_cdk import (
    Stack, Duration, RemovalPolicy, CfnOutput,
    aws_lambda as _lambda,
    aws_apigateway as apigw,
    aws_iam as iam,
    aws_logs as logs,
)
from constructs import Construct


class LambdaMCPStack(Stack):
    def __init__(self, scope: Construct, id: str, **kwargs):
        super().__init__(scope, id, **kwargs)

        # --- Shared Lambda layer with MCP dependencies ---
        mcp_layer = _lambda.LayerVersion(
            self, "MCPDependenciesLayer",
            code=_lambda.Code.from_asset("layers/mcp-deps"),
            compatible_runtimes=[_lambda.Runtime.PYTHON_3_12],
            description="opensearch-py, redis, requests-aws4auth",
        )

        # --- Catalog search Lambda MCP server ---
        catalog_search_fn = _lambda.Function(
            self, "CatalogSearchMCP",
            function_name="mangaassist-mcp-catalog-search",
            runtime=_lambda.Runtime.PYTHON_3_12,
            handler="catalog_search_mcp.lambda_handler",
            code=_lambda.Code.from_asset("src/mcp_servers/catalog_search"),
            memory_size=256,
            timeout=Duration.seconds(10),
            layers=[mcp_layer],
            environment={
                "OPENSEARCH_ENDPOINT": "search-manga-xxxxx.ap-northeast-1.aoss.amazonaws.com",
                "OPENSEARCH_INDEX": "manga-catalog",
                "REDIS_ENDPOINT": "manga-cache.xxxxx.apne1.cache.amazonaws.com",
                "CACHE_TTL": "300",
                "LOG_LEVEL": "INFO",
            },
            tracing=_lambda.Tracing.ACTIVE,
            log_retention=logs.RetentionDays.TWO_WEEKS,
        )

        # --- Order tracking Lambda MCP server ---
        order_tracking_fn = _lambda.Function(
            self, "OrderTrackingMCP",
            function_name="mangaassist-mcp-order-tracking",
            runtime=_lambda.Runtime.PYTHON_3_12,
            handler="order_tracking_mcp.lambda_handler",
            code=_lambda.Code.from_asset("src/mcp_servers/order_tracking"),
            memory_size=256,
            timeout=Duration.seconds(5),
            layers=[mcp_layer],
            environment={
                "ORDERS_TABLE": "mangaassist-orders",
                "REDIS_ENDPOINT": "manga-cache.xxxxx.apne1.cache.amazonaws.com",
                "CACHE_TTL": "60",
            },
            tracing=_lambda.Tracing.ACTIVE,
            log_retention=logs.RetentionDays.TWO_WEEKS,
        )

        # --- API Gateway for MCP endpoints ---
        api = apigw.RestApi(
            self, "MCPApi",
            rest_api_name="mangaassist-mcp-api",
            deploy_options=apigw.StageOptions(
                stage_name="prod",
                throttling_rate_limit=1000,
                throttling_burst_limit=500,
                logging_level=apigw.MethodLoggingLevel.INFO,
                metrics_enabled=True,
            ),
        )

        mcp_resource = api.root.add_resource("mcp")
        catalog_resource = mcp_resource.add_resource("catalog-search")
        order_resource = mcp_resource.add_resource("order-tracking")

        catalog_resource.add_method(
            "POST",
            apigw.LambdaIntegration(catalog_search_fn, proxy=True),
        )
        order_resource.add_method(
            "POST",
            apigw.LambdaIntegration(order_tracking_fn, proxy=True),
        )

        # --- Grant permissions ---
        catalog_search_fn.add_to_role_policy(iam.PolicyStatement(
            actions=["aoss:APIAccessAll"],
            resources=["arn:aws:aoss:ap-northeast-1:*:collection/*"],
        ))
        order_tracking_fn.add_to_role_policy(iam.PolicyStatement(
            actions=["dynamodb:GetItem", "dynamodb:Query"],
            resources=["arn:aws:dynamodb:ap-northeast-1:*:table/mangaassist-orders*"],
        ))

        CfnOutput(self, "MCPApiUrl", value=api.url)

3. ECS MCP Server Architecture

ECS MCP servers handle complex, stateful, or long-running tools. For MangaAssist, the recommendation engine (which loads ML models, maintains user preference caches, and streams results) runs on ECS Fargate.

3.1 Architecture Overview

graph TB
    subgraph "MCP Client Layer"
        BA2[Bedrock Agent<br/>Claude 3 Sonnet]
        CL2[MCP Client Library]
        BA2 --> CL2
    end

    subgraph "Load Balancing"
        ALB[Application Load Balancer<br/>WebSocket + HTTP/2]
        TG[Target Group<br/>Health: /mcp/health]
    end

    subgraph "ECS Fargate Cluster"
        subgraph "Service: Recommendation MCP"
            T1[Task 1<br/>1 vCPU / 2 GB]
            T2[Task 2<br/>1 vCPU / 2 GB]
            T3[Task 3<br/>1 vCPU / 2 GB]
        end
        subgraph "Service: Session Context MCP"
            T4[Task 4<br/>0.5 vCPU / 1 GB]
            T5[Task 5<br/>0.5 vCPU / 1 GB]
        end
    end

    subgraph "Backend Services"
        OS2[OpenSearch Serverless<br/>Vector Similarity]
        DDB2[DynamoDB<br/>User Preferences]
        RC2[ElastiCache Redis<br/>Session State + Model Cache]
        S3[S3<br/>Model Artifacts]
    end

    CL2 --> ALB
    ALB --> TG
    TG --> T1
    TG --> T2
    TG --> T3
    TG --> T4
    TG --> T5
    T1 --> OS2
    T1 --> DDB2
    T1 --> RC2
    T2 --> OS2
    T2 --> RC2
    T3 --> S3
    T4 --> DDB2
    T4 --> RC2
    T5 --> DDB2
    T5 --> RC2

    style BA2 fill:#ff9900,color:#000
    style ALB fill:#8c4fff,color:#fff
    style T1 fill:#d86613,color:#fff
    style T2 fill:#d86613,color:#fff
    style T3 fill:#d86613,color:#fff
    style T4 fill:#1a8c1a,color:#fff
    style T5 fill:#1a8c1a,color:#fff

3.2 Production ECS MCP Server

"""
ECS MCP Server for MangaAssist recommendation engine.
Runs as a persistent Fargate task with SSE (Server-Sent Events) transport.
Maintains in-memory model cache and connection state.
"""

import asyncio
import json
import logging
import os
import signal
import time
from contextlib import asynccontextmanager
from typing import Any, AsyncIterator

import uvicorn
from starlette.applications import Starlette
from starlette.requests import Request
from starlette.responses import JSONResponse, StreamingResponse
from starlette.routing import Route

import boto3
import redis.asyncio as aioredis
from opensearchpy import AsyncOpenSearch, AsyncHttpConnection

logger = logging.getLogger("mcp-recommendation")
logging.basicConfig(level=logging.INFO, format="%(asctime)s %(name)s %(levelname)s %(message)s")

# ---------- Configuration ----------

REGION = os.environ.get("AWS_REGION", "ap-northeast-1")
OPENSEARCH_ENDPOINT = os.environ["OPENSEARCH_ENDPOINT"]
REDIS_ENDPOINT = os.environ["REDIS_ENDPOINT"]
PORT = int(os.environ.get("PORT", "8080"))

PROTOCOL_VERSION = "2024-11-05"
SERVER_NAME = "mangaassist-recommendation-engine"
SERVER_VERSION = "2.0.0"


# ---------- Stateful Resources (initialized at startup, reused) ----------

class ServerState:
    """Holds long-lived connections and cached resources."""
    opensearch: AsyncOpenSearch = None
    redis: aioredis.Redis = None
    genre_embeddings: dict = {}  # Preloaded genre centroids
    startup_time: float = 0
    request_count: int = 0
    active_sessions: dict = {}  # session_id -> last_access


state = ServerState()


async def initialize_resources():
    """Initialize backend connections and preload model data."""
    logger.info("Initializing server resources...")
    state.startup_time = time.time()

    # Async OpenSearch client
    state.opensearch = AsyncOpenSearch(
        hosts=[{"host": OPENSEARCH_ENDPOINT, "port": 443}],
        use_ssl=True,
        verify_certs=True,
        connection_class=AsyncHttpConnection,
        timeout=10,
    )

    # Async Redis client
    state.redis = aioredis.Redis(
        host=REDIS_ENDPOINT, port=6379,
        decode_responses=True, socket_timeout=2
    )

    # Preload genre embeddings for faster similarity search
    try:
        genres = ["shonen", "shojo", "seinen", "josei", "kodomo"]
        for genre in genres:
            cached = await state.redis.get(f"genre_embedding:{genre}")
            if cached:
                state.genre_embeddings[genre] = json.loads(cached)
        logger.info(f"Preloaded {len(state.genre_embeddings)} genre embeddings")
    except Exception as e:
        logger.warning(f"Genre embedding preload failed: {e}")

    logger.info("Server resources initialized")


async def cleanup_resources():
    """Gracefully close all connections."""
    logger.info("Cleaning up server resources...")
    if state.opensearch:
        await state.opensearch.close()
    if state.redis:
        await state.redis.close()
    logger.info("Cleanup complete")


# ---------- Tool Definitions ----------

TOOL_DEFINITIONS = [
    {
        "name": "manga_recommendations",
        "description": (
            "Generate personalized manga recommendations based on user "
            "reading history, preferences, and current trends. Uses "
            "collaborative filtering and content-based similarity."
        ),
        "inputSchema": {
            "type": "object",
            "properties": {
                "user_id": {
                    "type": "string",
                    "description": "User ID for personalized recommendations"
                },
                "seed_titles": {
                    "type": "array",
                    "items": {"type": "string"},
                    "description": "Manga titles the user likes (for cold start)"
                },
                "genre_preference": {
                    "type": "string",
                    "enum": ["shonen", "shojo", "seinen", "josei", "kodomo", "any"],
                    "default": "any"
                },
                "count": {
                    "type": "integer",
                    "default": 5,
                    "minimum": 1,
                    "maximum": 20
                },
                "exclude_owned": {
                    "type": "boolean",
                    "default": True,
                    "description": "Exclude manga already purchased by user"
                }
            },
            "required": ["user_id"]
        }
    },
    {
        "name": "reading_list_analysis",
        "description": (
            "Analyze a user's reading list to identify patterns, "
            "preferred genres, favorite authors, and suggest "
            "curated collections."
        ),
        "inputSchema": {
            "type": "object",
            "properties": {
                "user_id": {
                    "type": "string",
                    "description": "User ID to analyze"
                },
                "include_trends": {
                    "type": "boolean",
                    "default": True,
                    "description": "Include trending manga in same genres"
                }
            },
            "required": ["user_id"]
        }
    }
]


# ---------- Tool Execution ----------

async def execute_manga_recommendations(arguments: dict) -> dict:
    """Generate personalized manga recommendations."""
    user_id = arguments["user_id"]
    seed_titles = arguments.get("seed_titles", [])
    genre_pref = arguments.get("genre_preference", "any")
    count = min(arguments.get("count", 5), 20)
    exclude_owned = arguments.get("exclude_owned", True)

    start = time.time()

    # Fetch user reading history from Redis
    history_key = f"user_history:{user_id}"
    user_history = []
    try:
        raw_history = await state.redis.lrange(history_key, 0, 49)
        user_history = [json.loads(h) for h in raw_history]
    except Exception as e:
        logger.warning(f"Failed to fetch user history: {e}")

    # Build recommendation query using user history or seed titles
    if user_history:
        # Use collaborative filtering via OpenSearch kNN
        recent_isbns = [h["isbn"] for h in user_history[:10]]
        query_vector = await _get_aggregate_vector(recent_isbns)
    elif seed_titles:
        query_vector = await _get_title_vector(seed_titles)
    else:
        # Cold start: use popular items in genre
        query_vector = state.genre_embeddings.get(genre_pref)

    if query_vector is None:
        return {
            "content": [{"type": "text", "text": json.dumps({
                "recommendations": [],
                "reason": "Insufficient data for personalization. Ask the user about their preferences."
            }, ensure_ascii=False)}],
            "isError": False
        }

    # OpenSearch kNN search
    owned_isbns = set()
    if exclude_owned and user_history:
        owned_isbns = {h["isbn"] for h in user_history}

    filter_clauses = []
    if genre_pref and genre_pref != "any":
        filter_clauses.append({"term": {"genre": genre_pref}})
    if owned_isbns:
        filter_clauses.append({"bool": {"must_not": [
            {"terms": {"isbn": list(owned_isbns)}}
        ]}})

    search_body = {
        "size": count,
        "query": {
            "knn": {
                "embedding": {
                    "vector": query_vector,
                    "k": count * 2,
                    "filter": {"bool": {"filter": filter_clauses}} if filter_clauses else None
                }
            }
        },
        "_source": [
            "title", "title_jp", "author", "genre", "price_jpy",
            "rating", "stock_count", "isbn", "volumes", "synopsis"
        ]
    }

    try:
        response = await state.opensearch.search(
            index="manga-catalog", body=search_body
        )
    except Exception as e:
        logger.error(f"OpenSearch kNN search failed: {e}")
        return {
            "content": [{"type": "text", "text": f"Recommendation search failed: {e}"}],
            "isError": True
        }

    hits = response.get("hits", {}).get("hits", [])
    recommendations = []
    for hit in hits:
        src = hit["_source"]
        recommendations.append({
            "title": src.get("title", ""),
            "title_jp": src.get("title_jp", ""),
            "author": src.get("author", ""),
            "genre": src.get("genre", ""),
            "price_jpy": src.get("price_jpy", 0),
            "rating": src.get("rating", 0),
            "volumes": src.get("volumes", 1),
            "synopsis": src.get("synopsis", "")[:200],
            "similarity_score": round(hit.get("_score", 0), 3),
            "in_stock": src.get("stock_count", 0) > 0
        })

    elapsed = (time.time() - start) * 1000
    return {
        "content": [{
            "type": "text",
            "text": json.dumps({
                "recommendations": recommendations,
                "personalization": "history" if user_history else ("seeds" if seed_titles else "popular"),
                "metadata": {
                    "user_id": user_id,
                    "latency_ms": round(elapsed, 1),
                    "history_depth": len(user_history),
                }
            }, ensure_ascii=False)
        }],
        "isError": False
    }


async def _get_aggregate_vector(isbns: list) -> list:
    """Compute average embedding vector from multiple ISBNs."""
    vectors = []
    for isbn in isbns:
        cached = await state.redis.get(f"embedding:{isbn}")
        if cached:
            vectors.append(json.loads(cached))
    if not vectors:
        return None
    # Element-wise average
    dim = len(vectors[0])
    avg = [sum(v[i] for v in vectors) / len(vectors) for i in range(dim)]
    return avg


async def _get_title_vector(titles: list) -> list:
    """Get embedding vector for title text via OpenSearch."""
    # Placeholder: in production, call a Bedrock embedding model
    return None


# ---------- MCP Route Handlers ----------

TOOL_HANDLERS = {
    "manga_recommendations": execute_manga_recommendations,
    "reading_list_analysis": lambda args: asyncio.coroutine(lambda: {
        "content": [{"type": "text", "text": "Analysis placeholder"}],
        "isError": False
    })(),
}


async def mcp_endpoint(request: Request) -> JSONResponse:
    """Main MCP JSON-RPC endpoint."""
    state.request_count += 1

    try:
        body = await request.json()
    except Exception:
        return JSONResponse(
            {"jsonrpc": "2.0", "id": None, "error": {"code": -32700, "message": "Parse error"}},
            status_code=400
        )

    method = body.get("method", "")
    msg_id = body.get("id")
    params = body.get("params", {})

    logger.info(f"MCP request: method={method} id={msg_id}")

    if method == "initialize":
        result = {
            "protocolVersion": PROTOCOL_VERSION,
            "capabilities": {
                "tools": {"listChanged": False},
            },
            "serverInfo": {"name": SERVER_NAME, "version": SERVER_VERSION}
        }

    elif method == "notifications/initialized":
        return JSONResponse(status_code=204, content=None)

    elif method == "tools/list":
        result = {"tools": TOOL_DEFINITIONS}

    elif method == "tools/call":
        tool_name = params.get("name", "")
        arguments = params.get("arguments", {})
        handler = TOOL_HANDLERS.get(tool_name)
        if handler is None:
            return JSONResponse({
                "jsonrpc": "2.0", "id": msg_id,
                "error": {"code": -32601, "message": f"Unknown tool: {tool_name}"}
            }, status_code=400)
        result = await handler(arguments)

    else:
        return JSONResponse({
            "jsonrpc": "2.0", "id": msg_id,
            "error": {"code": -32601, "message": f"Method not found: {method}"}
        }, status_code=400)

    return JSONResponse({"jsonrpc": "2.0", "id": msg_id, "result": result})


async def health_endpoint(request: Request) -> JSONResponse:
    """Health check for ALB target group."""
    checks = {"server": "ok", "uptime_s": round(time.time() - state.startup_time)}
    try:
        await state.redis.ping()
        checks["redis"] = "ok"
    except Exception:
        checks["redis"] = "degraded"
    try:
        await state.opensearch.info()
        checks["opensearch"] = "ok"
    except Exception:
        checks["opensearch"] = "degraded"

    all_ok = all(v == "ok" for k, v in checks.items() if k != "uptime_s")
    status = 200 if all_ok else 503
    checks["status"] = "healthy" if all_ok else "degraded"
    checks["request_count"] = state.request_count
    return JSONResponse(checks, status_code=status)


# ---------- Application Setup ----------

@asynccontextmanager
async def lifespan(app):
    """Manage startup and shutdown of server resources."""
    await initialize_resources()
    yield
    await cleanup_resources()


app = Starlette(
    routes=[
        Route("/mcp", mcp_endpoint, methods=["POST"]),
        Route("/mcp/health", health_endpoint, methods=["GET"]),
    ],
    lifespan=lifespan,
)


if __name__ == "__main__":
    uvicorn.run(app, host="0.0.0.0", port=PORT, log_level="info")

4. Lambda MCP vs ECS MCP Comparison

Dimension Lambda MCP Server ECS MCP Server
Use Case Lightweight, stateless tools (search, lookup) Complex, stateful tools (recommendations, analysis)
Startup Cold start 200-800ms; warm <50ms Container start 30-60s; then always warm
Max Duration 15 minutes Unlimited (persistent process)
Memory 128 MB - 10 GB 512 MB - 30 GB per task
Connections New per invocation (or reused in warm) Persistent connection pools
State Ephemeral (module-level caching in warm) In-memory caches, preloaded models
Transport HTTP request/response HTTP, WebSocket, SSE
Scaling Instant (concurrent executions) Slower (ECS task provisioning ~30s)
Cost Model Per-invocation (cheap at low volume) Per-hour (cheaper at high volume)
MangaAssist Tool Catalog search, order tracking, price check Recommendations, reading analysis, session context
Latency Budget <500ms per tool call <1000ms per tool call
Cost at 1M calls/day ~$200-400/month (256MB, 500ms avg) ~$150-250/month (2 tasks, 1 vCPU each)

4.1 Decision Framework

flowchart TD
    START[New MCP Tool Needed] --> Q1{Stateful?<br/>Needs persistent<br/>connections or cache?}
    Q1 -->|No| Q2{Execution time<br/> under 10 seconds?}
    Q1 -->|Yes| ECS[Deploy as ECS MCP Server]

    Q2 -->|Yes| Q3{Memory under 512 MB?}
    Q2 -->|No| ECS

    Q3 -->|Yes| LAMBDA[Deploy as Lambda MCP Server]
    Q3 -->|No| Q4{Cold start<br/>acceptable?}

    Q4 -->|Yes| LAMBDA
    Q4 -->|No| ECS

    LAMBDA --> L_DETAIL["Lambda MCP<br/>- API Gateway or Function URL<br/>- Provisioned concurrency for latency<br/>- Module-level caching<br/>- 256-512 MB memory"]

    ECS --> E_DETAIL["ECS MCP<br/>- ALB with health checks<br/>- Auto-scaling on CPU/connections<br/>- Persistent connection pools<br/>- Preloaded models/embeddings"]

    style START fill:#232f3e,color:#fff
    style LAMBDA fill:#d86613,color:#fff
    style ECS fill:#1a8c1a,color:#fff
    style L_DETAIL fill:#d86613,color:#fff
    style E_DETAIL fill:#1a8c1a,color:#fff

5. MCP Server Lifecycle

stateDiagram-v2
    [*] --> Initializing: Server starts
    Initializing --> Ready: Resources loaded,<br/>capabilities registered
    Ready --> Processing: tools/call received
    Processing --> Ready: Response sent
    Ready --> Draining: Shutdown signal<br/>(SIGTERM)
    Processing --> Draining: Finish current,<br/>reject new
    Draining --> ShuttingDown: Active requests = 0
    ShuttingDown --> [*]: Connections closed,<br/>cleanup complete

    note right of Initializing
        Lambda: module-level init
        ECS: lifespan startup
    end note

    note right of Draining
        Lambda: N/A (per-invocation)
        ECS: ALB deregistration delay
    end note

5.1 Lifecycle Differences

Phase Lambda MCP ECS MCP
Initialize Module-level code (first cold start) lifespan startup coroutine
Capability Advertisement On each initialize call On each initialize call (same protocol)
Request Handling Single request per invocation Concurrent requests via async handlers
Connection Reuse Warm container reuses module-level clients Persistent connection pools across requests
Shutdown Container frozen/destroyed by Lambda SIGTERM -> drain -> close connections
Health Checks N/A (managed by Lambda service) ALB health check endpoint /mcp/health

6. MCP Client Library for Consistent Access

The MCP client library abstracts whether a tool runs on Lambda or ECS, giving the Bedrock orchestrator a uniform interface.

"""
MCP Client Library for MangaAssist.
Provides a consistent interface to call MCP tools regardless
of whether they run on Lambda or ECS.
"""

import json
import logging
import time
from dataclasses import dataclass, field
from typing import Optional
from enum import Enum

import httpx

logger = logging.getLogger("mcp-client")


class MCPServerType(Enum):
    LAMBDA = "lambda"
    ECS = "ecs"


@dataclass
class MCPServerConfig:
    """Configuration for an MCP server endpoint."""
    name: str
    endpoint: str
    server_type: MCPServerType
    timeout_ms: int = 5000
    retry_count: int = 2
    retry_delay_ms: int = 100


@dataclass
class MCPToolCall:
    """Result of an MCP tool invocation."""
    tool_name: str
    content: list = field(default_factory=list)
    is_error: bool = False
    latency_ms: float = 0
    server_name: str = ""
    protocol_version: str = ""


class MCPClient:
    """Unified MCP client that routes tool calls to the correct server."""

    def __init__(self, servers: list[MCPServerConfig]):
        self._servers = {s.name: s for s in servers}
        self._tool_registry: dict[str, str] = {}  # tool_name -> server_name
        self._http = httpx.AsyncClient(timeout=30)
        self._initialized_servers: set = set()
        self._request_id = 0

    async def discover_tools(self) -> dict[str, list]:
        """Initialize all servers and discover available tools."""
        all_tools = {}
        for name, config in self._servers.items():
            try:
                # Initialize
                init_result = await self._send_request(config, "initialize", {
                    "protocolVersion": "2024-11-05",
                    "capabilities": {},
                    "clientInfo": {"name": "mangaassist-orchestrator", "version": "1.0.0"}
                })
                self._initialized_servers.add(name)
                logger.info(f"Initialized MCP server: {name}")

                # Send initialized notification
                await self._send_notification(config, "notifications/initialized", {})

                # List tools
                tools_result = await self._send_request(config, "tools/list", {})
                tools = tools_result.get("tools", [])
                all_tools[name] = tools

                # Register tool -> server mapping
                for tool in tools:
                    self._tool_registry[tool["name"]] = name
                    logger.info(f"Registered tool: {tool['name']} -> {name}")

            except Exception as e:
                logger.error(f"Failed to initialize MCP server {name}: {e}")

        return all_tools

    async def call_tool(self, tool_name: str, arguments: dict) -> MCPToolCall:
        """Call a tool by name, routing to the correct MCP server."""
        server_name = self._tool_registry.get(tool_name)
        if not server_name:
            return MCPToolCall(
                tool_name=tool_name,
                content=[{"type": "text", "text": f"Tool not found: {tool_name}"}],
                is_error=True
            )

        config = self._servers[server_name]
        start = time.time()

        for attempt in range(config.retry_count + 1):
            try:
                result = await self._send_request(config, "tools/call", {
                    "name": tool_name,
                    "arguments": arguments
                })
                elapsed = (time.time() - start) * 1000
                return MCPToolCall(
                    tool_name=tool_name,
                    content=result.get("content", []),
                    is_error=result.get("isError", False),
                    latency_ms=round(elapsed, 1),
                    server_name=server_name
                )
            except Exception as e:
                if attempt < config.retry_count:
                    logger.warning(f"Retry {attempt+1} for {tool_name}: {e}")
                    await asyncio.sleep(config.retry_delay_ms / 1000)
                else:
                    elapsed = (time.time() - start) * 1000
                    return MCPToolCall(
                        tool_name=tool_name,
                        content=[{"type": "text", "text": f"Tool call failed: {e}"}],
                        is_error=True,
                        latency_ms=round(elapsed, 1),
                        server_name=server_name
                    )

    async def _send_request(self, config: MCPServerConfig, method: str, params: dict) -> dict:
        """Send a JSON-RPC request to an MCP server."""
        self._request_id += 1
        payload = {
            "jsonrpc": "2.0",
            "id": self._request_id,
            "method": method,
            "params": params
        }
        response = await self._http.post(
            config.endpoint,
            json=payload,
            timeout=config.timeout_ms / 1000
        )
        response.raise_for_status()
        body = response.json()
        if "error" in body:
            raise Exception(f"MCP error: {body['error']}")
        return body.get("result", {})

    async def _send_notification(self, config: MCPServerConfig, method: str, params: dict):
        """Send a JSON-RPC notification (no id, no response expected)."""
        payload = {"jsonrpc": "2.0", "method": method, "params": params}
        await self._http.post(config.endpoint, json=payload, timeout=5)

    async def close(self):
        await self._http.aclose()


# --- Usage Example ---
import asyncio

async def main():
    client = MCPClient(servers=[
        MCPServerConfig(
            name="catalog-search",
            endpoint="https://api.mangaassist.example.com/mcp/catalog-search",
            server_type=MCPServerType.LAMBDA,
            timeout_ms=5000
        ),
        MCPServerConfig(
            name="recommendation-engine",
            endpoint="https://mcp-ecs.mangaassist.internal/mcp",
            server_type=MCPServerType.ECS,
            timeout_ms=10000
        ),
    ])

    await client.discover_tools()

    result = await client.call_tool("manga_catalog_search", {
        "query": "Attack on Titan",
        "genre": "shonen",
        "max_results": 5
    })
    print(f"Search result ({result.latency_ms}ms): {result.content}")

    result = await client.call_tool("manga_recommendations", {
        "user_id": "user-12345",
        "count": 3
    })
    print(f"Recommendations ({result.latency_ms}ms): {result.content}")

    await client.close()

Key Takeaways

# Takeaway
1 MCP is JSON-RPC 2.0 based -- every message follows {jsonrpc, id, method, params} format with structured capability negotiation at initialization.
2 Lambda MCP servers excel at stateless tools -- catalog search and order tracking run in <500ms with module-level caching across warm invocations.
3 ECS MCP servers handle complex stateful tools -- the recommendation engine maintains persistent OpenSearch/Redis connections and preloaded embeddings.
4 Decision is driven by state, duration, and memory -- if a tool needs persistent connections, >10s execution, or >512MB memory, choose ECS.
5 The MCP client library abstracts deployment topology -- the orchestrator calls call_tool("name", args) without knowing if it hits Lambda or ECS.
6 Cost crossover at ~500K calls/day -- below that, Lambda per-invocation pricing wins; above it, ECS per-hour pricing is cheaper for MangaAssist's 1M/day target.
7 Health checks differ fundamentally -- Lambda relies on the Lambda service; ECS needs an explicit /mcp/health endpoint for ALB target group monitoring.
8 Server lifecycle management is critical -- ECS servers must handle SIGTERM gracefully (drain, close connections); Lambda handles this automatically via container freeze.