LOCAL PREVIEW View on GitHub

Skill 2.1.6: Tool Integration Architecture for Extending FM Capabilities

MangaAssist context: JP Manga store chatbot on AWS — Bedrock Claude 3 (Sonnet at $3/$15 per 1M tokens input/output, Haiku at $0.25/$1.25), OpenSearch Serverless (vector store), DynamoDB (sessions/products), ECS Fargate (orchestrator), API Gateway WebSocket, ElastiCache Redis. Target: useful answer in under 3 seconds, 1M messages/day scale.


Mind Map: Intelligent Tool Integrations

Skill 2.1.6: Intelligent Tool Integrations
|
+-- Strands API Custom Behaviors
|   +-- @tool decorator for function registration
|   +-- Agent-level tool binding and context injection
|   +-- Custom callback hooks (on_tool_start, on_tool_end, on_tool_error)
|   +-- Streaming tool results for long-running operations
|   +-- Tool-level system prompts and usage hints
|
+-- Tool Definitions (Function Schemas)
|   +-- Parameter Types
|   |   +-- Primitives: str, int, float, bool
|   |   +-- Complex: dict, list, Optional, Union
|   |   +-- Constrained: Enum, Literal, Annotated
|   +-- Return Types
|   |   +-- Structured dict responses
|   |   +-- Error envelopes with status codes
|   |   +-- Streaming generators for large payloads
|   +-- JSON Schema Generation
|   |   +-- Auto-generated from type hints
|   |   +-- Manual override for complex constraints
|   |   +-- Descriptions propagated to FM context
|
+-- Tool Discovery and Registration
|   +-- Static registry (compile-time binding)
|   +-- Dynamic registry (runtime tool loading)
|   +-- MCP-based discovery (Model Context Protocol)
|   +-- Tool versioning and capability negotiation
|   +-- Health-check based availability
|
+-- Error Handling Patterns
|   +-- Retry with exponential backoff
|   +-- Fallback to alternative tools
|   +-- Graceful degradation (partial results)
|   +-- Circuit breaker for failing tools
|   +-- Dead letter queue for failed tool calls
|
+-- Parameter Validation
|   +-- JSON Schema validation at gateway
|   +-- Type coercion (string -> int, etc.)
|   +-- Constraint checking (min/max, regex, enum)
|   +-- Japanese character encoding validation (UTF-8)
|   +-- Injection prevention (prompt injection in params)
|
+-- Tool Chaining and Composition
    +-- Sequential pipelines (A -> B -> C)
    +-- Parallel fan-out (A -> [B, C] -> D)
    +-- Conditional branching (if/else tool selection)
    +-- Aggregation of multi-tool results
    +-- Context propagation across tool chain

Architecture: Tool Integration Flow in MangaAssist

                          MangaAssist Tool Integration Architecture
 ==========================================================================================

   Customer (WebSocket)
        |
        v
 +----------------+
 | API Gateway    |    WebSocket connection, message routing
 | (WebSocket)    |
 +-------+--------+
         |
         v
 +----------------+       +-------------------+
 | ECS Fargate    | <---> | ElastiCache Redis |   Session state, tool result cache,
 | Orchestrator   |       | (Cache Layer)     |   rate limit counters
 +-------+--------+       +-------------------+
         |
         |  1. Parse user message
         |  2. Load conversation history
         |  3. Invoke Strands Agent
         |
         v
 +---------------------------+
 | Strands Agent             |
 | (Claude 3 Sonnet/Haiku)  |
 |                           |
 |  System Prompt:           |
 |  "You are MangaAssist..." |
 |                           |
 |  Available Tools:         |
 |  +---------------------+  |
 |  | Tool Registry       |  |
 |  | - product_search    |  |
 |  | - order_lookup      |  |
 |  | - recommendation    |  |
 |  | - inventory_check   |  |
 |  | - price_compare     |  |
 |  | - review_summary    |  |
 |  +---------------------+  |
 +------+----+----+----+-----+
        |    |    |    |
        v    v    v    v
 +------+--+ +---+--+ +---+-------+ +---+----------+
 | Product  | |Order | |Recommend  | |Inventory     |
 | Search   | |Lookup| |Engine     | |Check         |
 | Tool     | |Tool  | |Tool       | |Tool          |
 +----+-----+ +--+---+ +---+-------+ +---+----------+
      |          |          |             |
      v          v          v             v
 +----+-----+ +--+------+ +---+-------+ +---+----------+
 |OpenSearch | |DynamoDB | |Bedrock    | |DynamoDB      |
 |Serverless | |Orders   | |Embeddings | |Inventory     |
 |(Products) | |Table    | |+ Redis    | |Table         |
 +----------+ +---------+ +----------+ +--------------+

 Tool Call Flow:
 ===============
 1. FM decides which tool(s) to call based on user intent
 2. Strands validates parameters against JSON schema
 3. Tool middleware: auth check -> rate limit -> param validation -> execute
 4. Tool executes against backend service
 5. Response validated, cached (if applicable), returned to FM
 6. FM incorporates tool result into response generation
 7. If tool fails: retry -> fallback -> graceful degradation

 Tool Chain Example (Recommendation Request):
 =============================================
 User: "Recommend something like Attack on Titan"

 Step 1: product_search("Attack on Titan")     -> Get manga metadata
 Step 2: recommendation_engine(genre, author)   -> Get similar titles (parallel)
         inventory_check(recommended_ids)        -> Filter to in-stock only (parallel)
 Step 3: price_compare(filtered_ids)            -> Add pricing info
 Step 4: FM synthesizes final recommendation with all data

Tool Definition Standards

Why Standardized Tool Definitions Matter

When an FM calls a tool, it relies entirely on the tool's schema to understand: - What the tool does (description) - What inputs it needs (parameters with types and constraints) - What output it returns (return type and structure)

Poor tool definitions cause the FM to hallucinate parameters, call tools incorrectly, or misinterpret results. In MangaAssist, where we handle Japanese text, currency (JPY), and manga-specific metadata, precision in tool definitions is critical.


Production Code: Strands Tool Definitions for MangaAssist

Core Tool Registry and Base Infrastructure

"""
MangaAssist Tool Integration Layer
===================================
Strands-based tool definitions for the MangaAssist JP Manga store chatbot.
Implements standardized function schemas, tool registry pattern, and
production-grade error handling.

Architecture:
- Strands Agent with Claude 3 Sonnet (complex queries) / Haiku (simple lookups)
- Tools backed by OpenSearch Serverless, DynamoDB, ElastiCache Redis
- Parameter validation via JSON Schema + custom validators
- Tool chaining for multi-step operations
"""

import json
import time
import hashlib
import logging
import re
from enum import Enum
from typing import Optional, Union, Any
from datetime import datetime, timedelta
from dataclasses import dataclass, field
from functools import wraps

import boto3
from strands import Agent, tool
from strands.models.bedrock import BedrockModel

logger = logging.getLogger("mangaassist.tools")
logger.setLevel(logging.INFO)


# ---------------------------------------------------------------------------
# Configuration
# ---------------------------------------------------------------------------
class ToolConfig:
    """Centralized configuration for all MangaAssist tools."""

    # AWS Resources
    OPENSEARCH_ENDPOINT = "https://mangaassist-products.us-east-1.aoss.amazonaws.com"
    DYNAMODB_ORDERS_TABLE = "MangaAssist-Orders"
    DYNAMODB_INVENTORY_TABLE = "MangaAssist-Inventory"
    DYNAMODB_PRODUCTS_TABLE = "MangaAssist-Products"
    DYNAMODB_SESSIONS_TABLE = "MangaAssist-Sessions"
    REDIS_ENDPOINT = "mangaassist-cache.xxxxx.use1.cache.amazonaws.com"
    BEDROCK_EMBED_MODEL = "amazon.titan-embed-text-v2:0"

    # Tool behavior
    DEFAULT_SEARCH_LIMIT = 10
    MAX_SEARCH_LIMIT = 50
    CACHE_TTL_SECONDS = 300          # 5 minutes for product searches
    ORDER_CACHE_TTL_SECONDS = 60     # 1 minute for order lookups
    RECOMMENDATION_CACHE_TTL = 600   # 10 minutes for recommendations
    TOOL_TIMEOUT_SECONDS = 5         # Per-tool timeout (budget: 3s total)
    MAX_RETRIES = 2
    RETRY_BASE_DELAY = 0.1          # 100ms base for exponential backoff

    # Validation
    MAX_QUERY_LENGTH = 500
    MAX_JAPANESE_QUERY_LENGTH = 200  # Japanese chars take more tokens
    VALID_GENRES = [
        "shonen", "shojo", "seinen", "josei", "kodomomuke",
        "isekai", "mecha", "slice_of_life", "horror", "romance",
        "action", "comedy", "drama", "fantasy", "sci_fi",
        "sports", "mystery", "psychological", "historical",
    ]
    VALID_SORT_FIELDS = ["relevance", "price_asc", "price_desc", "rating", "release_date", "popularity"]
    VALID_ORDER_STATUSES = ["pending", "processing", "shipped", "delivered", "cancelled", "returned"]
    PRICE_MIN_JPY = 0
    PRICE_MAX_JPY = 100_000


# ---------------------------------------------------------------------------
# Tool Response Envelope
# ---------------------------------------------------------------------------
@dataclass
class ToolResponse:
    """
    Standardized response envelope for all MangaAssist tools.

    Every tool returns this structure so the FM always knows:
    - Whether the call succeeded
    - What data is available
    - What went wrong (if anything)
    - How fresh the data is
    """

    success: bool
    data: Any = None
    error_code: Optional[str] = None
    error_message: Optional[str] = None
    metadata: dict = field(default_factory=dict)

    def to_dict(self) -> dict:
        result = {
            "success": self.success,
            "data": self.data,
            "metadata": {
                "timestamp": datetime.utcnow().isoformat(),
                "cached": self.metadata.get("cached", False),
                "execution_time_ms": self.metadata.get("execution_time_ms", 0),
                **self.metadata,
            },
        }
        if not self.success:
            result["error"] = {
                "code": self.error_code or "UNKNOWN_ERROR",
                "message": self.error_message or "An unexpected error occurred",
            }
        return result


# ---------------------------------------------------------------------------
# Tool Registry Pattern
# ---------------------------------------------------------------------------
class ToolRegistry:
    """
    Central registry for all MangaAssist tools.

    Provides:
    - Tool registration with metadata (version, category, health status)
    - Tool discovery by capability
    - Health checking and automatic deregistration of failing tools
    - Usage metrics collection
    """

    def __init__(self):
        self._tools: dict[str, dict] = {}
        self._health_status: dict[str, bool] = {}
        self._usage_counts: dict[str, int] = {}
        self._error_counts: dict[str, int] = {}
        self._circuit_breakers: dict[str, dict] = {}

    def register(
        self,
        name: str,
        tool_fn: callable,
        category: str,
        version: str = "1.0.0",
        description: str = "",
        fallback: Optional[str] = None,
        timeout_ms: int = 5000,
    ):
        """Register a tool with metadata for discovery and management."""
        self._tools[name] = {
            "function": tool_fn,
            "category": category,
            "version": version,
            "description": description,
            "fallback": fallback,
            "timeout_ms": timeout_ms,
            "registered_at": datetime.utcnow().isoformat(),
        }
        self._health_status[name] = True
        self._usage_counts[name] = 0
        self._error_counts[name] = 0
        self._circuit_breakers[name] = {
            "state": "closed",       # closed = healthy, open = failing, half_open = testing
            "failure_count": 0,
            "failure_threshold": 5,
            "last_failure_time": None,
            "recovery_timeout_s": 30,
        }
        logger.info(f"Registered tool: {name} v{version} [{category}]")

    def get_tool(self, name: str) -> Optional[callable]:
        """Get a tool function by name, respecting circuit breaker state."""
        if name not in self._tools:
            return None

        cb = self._circuit_breakers[name]
        if cb["state"] == "open":
            if cb["last_failure_time"]:
                elapsed = (datetime.utcnow() - cb["last_failure_time"]).total_seconds()
                if elapsed > cb["recovery_timeout_s"]:
                    cb["state"] = "half_open"
                    logger.info(f"Circuit breaker half-open for tool: {name}")
                else:
                    fallback_name = self._tools[name].get("fallback")
                    if fallback_name and fallback_name in self._tools:
                        logger.warning(f"Tool {name} circuit open, using fallback: {fallback_name}")
                        return self._tools[fallback_name]["function"]
                    return None

        self._usage_counts[name] += 1
        return self._tools[name]["function"]

    def record_success(self, name: str):
        """Record a successful tool execution."""
        cb = self._circuit_breakers[name]
        if cb["state"] == "half_open":
            cb["state"] = "closed"
            cb["failure_count"] = 0
            logger.info(f"Circuit breaker closed for tool: {name}")
        self._error_counts[name] = max(0, self._error_counts[name] - 1)

    def record_failure(self, name: str):
        """Record a tool execution failure, potentially opening circuit breaker."""
        self._error_counts[name] += 1
        cb = self._circuit_breakers[name]
        cb["failure_count"] += 1
        cb["last_failure_time"] = datetime.utcnow()

        if cb["failure_count"] >= cb["failure_threshold"]:
            cb["state"] = "open"
            logger.error(f"Circuit breaker OPEN for tool: {name} after {cb['failure_count']} failures")

    def discover_by_category(self, category: str) -> list[str]:
        """Find all healthy tools in a category."""
        return [
            name for name, meta in self._tools.items()
            if meta["category"] == category and self._health_status.get(name, False)
        ]

    def get_metrics(self) -> dict:
        """Return usage and health metrics for all tools."""
        return {
            name: {
                "usage_count": self._usage_counts.get(name, 0),
                "error_count": self._error_counts.get(name, 0),
                "circuit_breaker": self._circuit_breakers[name]["state"],
                "healthy": self._health_status.get(name, False),
                "version": meta["version"],
            }
            for name, meta in self._tools.items()
        }


# Singleton registry
tool_registry = ToolRegistry()


# ---------------------------------------------------------------------------
# Caching Decorator
# ---------------------------------------------------------------------------
def cached_tool(ttl_seconds: int = 300, key_prefix: str = "tool"):
    """
    Decorator that caches tool results in ElastiCache Redis.

    Cache key = prefix:hash(arguments)
    Skips cache on cache miss or Redis failure (fail-open).
    """
    def decorator(func):
        @wraps(func)
        def wrapper(*args, **kwargs):
            # Build a deterministic cache key from arguments
            cache_input = json.dumps(kwargs, sort_keys=True, default=str)
            cache_key = f"{key_prefix}:{hashlib.sha256(cache_input.encode()).hexdigest()[:16]}"

            # Try cache first
            try:
                cached = redis_client.get(cache_key)
                if cached:
                    result = json.loads(cached)
                    result["metadata"]["cached"] = True
                    logger.debug(f"Cache HIT for {func.__name__}: {cache_key}")
                    return result
            except Exception as e:
                logger.warning(f"Redis read failed for {func.__name__}: {e}")

            # Execute tool
            result = func(*args, **kwargs)

            # Cache successful results only
            if isinstance(result, dict) and result.get("success"):
                try:
                    redis_client.setex(cache_key, ttl_seconds, json.dumps(result, default=str))
                    logger.debug(f"Cache SET for {func.__name__}: {cache_key} TTL={ttl_seconds}s")
                except Exception as e:
                    logger.warning(f"Redis write failed for {func.__name__}: {e}")

            return result
        return wrapper
    return decorator


# ---------------------------------------------------------------------------
# Parameter Validation Helpers
# ---------------------------------------------------------------------------
class ParamValidator:
    """
    Validation utilities for tool parameters.

    Handles:
    - Type coercion (FM sometimes sends strings for numeric params)
    - Japanese text validation (encoding, length)
    - Enum validation with fuzzy matching
    - Range constraints
    """

    @staticmethod
    def validate_string(
        value: Any,
        param_name: str,
        max_length: int = 500,
        allow_empty: bool = False,
        pattern: Optional[str] = None,
    ) -> tuple[bool, str, Optional[str]]:
        """Validate and sanitize a string parameter. Returns (valid, cleaned_value, error)."""
        if value is None:
            if allow_empty:
                return True, "", None
            return False, "", f"{param_name} is required"

        value = str(value).strip()

        if not allow_empty and len(value) == 0:
            return False, "", f"{param_name} cannot be empty"

        if len(value) > max_length:
            return False, "", f"{param_name} exceeds maximum length of {max_length}"

        # Check for valid UTF-8 (important for Japanese text)
        try:
            value.encode("utf-8").decode("utf-8")
        except UnicodeError:
            return False, "", f"{param_name} contains invalid UTF-8 characters"

        if pattern and not re.match(pattern, value):
            return False, "", f"{param_name} does not match expected pattern"

        return True, value, None

    @staticmethod
    def validate_japanese_text(value: str, param_name: str = "query") -> tuple[bool, str, Optional[str]]:
        """
        Validate text that may contain Japanese characters.

        Accepts: Hiragana, Katakana, Kanji, Latin, digits, common punctuation.
        Rejects: Control characters, unusual Unicode blocks that indicate injection.
        """
        if not value:
            return False, "", f"{param_name} cannot be empty"

        # Allow: Hiragana (3040-309F), Katakana (30A0-30FF), Kanji (4E00-9FFF),
        # CJK Unified Ext A (3400-4DBF), Half-width Katakana (FF65-FF9F),
        # Latin, digits, common punctuation, spaces
        allowed_pattern = re.compile(
            r'^[\u3040-\u309F\u30A0-\u30FF\u4E00-\u9FFF\u3400-\u4DBF'
            r'\uFF65-\uFF9F\uFF01-\uFF5E'
            r'a-zA-Z0-9\s'
            r'\.\,\!\?\-\_\(\)\[\]\{\}\:\;\'\"\/'
            r'\u3000-\u303F'  # CJK punctuation
            r']+$'
        )

        if not allowed_pattern.match(value):
            # Find the offending character for a helpful error message
            for i, char in enumerate(value):
                if not re.match(
                    r'[\u3040-\u309F\u30A0-\u30FF\u4E00-\u9FFF\u3400-\u4DBF'
                    r'\uFF65-\uFF9F\uFF01-\uFF5Ea-zA-Z0-9\s'
                    r'\.\,\!\?\-\_\(\)\[\]\{\}\:\;\'\"\/'
                    r'\u3000-\u303F]', char
                ):
                    return (
                        False, "",
                        f"{param_name} contains unsupported character U+{ord(char):04X} at position {i}"
                    )

        # Length check (Japanese chars are ~2-3 tokens each)
        if len(value) > ToolConfig.MAX_JAPANESE_QUERY_LENGTH:
            return False, "", f"{param_name} exceeds {ToolConfig.MAX_JAPANESE_QUERY_LENGTH} chars for Japanese text"

        return True, value, None

    @staticmethod
    def validate_enum(
        value: Any, param_name: str, valid_values: list[str], fuzzy: bool = True
    ) -> tuple[bool, str, Optional[str]]:
        """Validate an enum parameter with optional fuzzy matching."""
        if value is None:
            return False, "", f"{param_name} is required"

        value_str = str(value).strip().lower().replace(" ", "_").replace("-", "_")

        if value_str in valid_values:
            return True, value_str, None

        if fuzzy:
            # Try partial match
            matches = [v for v in valid_values if value_str in v or v in value_str]
            if len(matches) == 1:
                return True, matches[0], None

        return False, "", f"{param_name} must be one of: {', '.join(valid_values)}"

    @staticmethod
    def validate_number(
        value: Any,
        param_name: str,
        min_val: Optional[float] = None,
        max_val: Optional[float] = None,
        coerce_from_string: bool = True,
    ) -> tuple[bool, float, Optional[str]]:
        """Validate a numeric parameter with optional type coercion."""
        if value is None:
            return False, 0, f"{param_name} is required"

        if coerce_from_string and isinstance(value, str):
            try:
                value = float(value.replace(",", "").replace(" ", ""))
            except ValueError:
                return False, 0, f"{param_name} is not a valid number: {value}"

        if not isinstance(value, (int, float)):
            return False, 0, f"{param_name} must be a number, got {type(value).__name__}"

        if min_val is not None and value < min_val:
            return False, 0, f"{param_name} must be >= {min_val}"

        if max_val is not None and value > max_val:
            return False, 0, f"{param_name} must be <= {max_val}"

        return True, value, None

    @staticmethod
    def validate_order_id(value: Any) -> tuple[bool, str, Optional[str]]:
        """Validate a MangaAssist order ID format: MA-YYYYMMDD-XXXXX."""
        if not value:
            return False, "", "order_id is required"

        value = str(value).strip().upper()
        pattern = r'^MA-\d{8}-[A-Z0-9]{5}$'

        if not re.match(pattern, value):
            return False, "", f"Invalid order ID format. Expected: MA-YYYYMMDD-XXXXX, got: {value}"

        return True, value, None


# ---------------------------------------------------------------------------
# AWS Client Initialization (lazy singletons)
# ---------------------------------------------------------------------------
_clients = {}

def get_dynamodb():
    if "dynamodb" not in _clients:
        _clients["dynamodb"] = boto3.resource("dynamodb", region_name="us-east-1")
    return _clients["dynamodb"]

def get_opensearch():
    if "opensearch" not in _clients:
        from opensearchpy import OpenSearch, RequestsHttpConnection
        from requests_aws4auth import AWS4Auth
        credentials = boto3.Session().get_credentials()
        auth = AWS4Auth(
            credentials.access_key, credentials.secret_key,
            "us-east-1", "aoss",
            session_token=credentials.token,
        )
        _clients["opensearch"] = OpenSearch(
            hosts=[{"host": ToolConfig.OPENSEARCH_ENDPOINT.replace("https://", ""), "port": 443}],
            http_auth=auth,
            use_ssl=True,
            connection_class=RequestsHttpConnection,
        )
    return _clients["opensearch"]

def get_bedrock_runtime():
    if "bedrock_runtime" not in _clients:
        _clients["bedrock_runtime"] = boto3.client("bedrock-runtime", region_name="us-east-1")
    return _clients["bedrock_runtime"]

def get_redis():
    if "redis" not in _clients:
        import redis as redis_lib
        _clients["redis"] = redis_lib.Redis(
            host=ToolConfig.REDIS_ENDPOINT, port=6379, decode_responses=True,
            socket_timeout=1, socket_connect_timeout=1,  # Aggressive timeouts
        )
    return _clients["redis"]

redis_client = None  # Lazy init; replaced at startup


# ---------------------------------------------------------------------------
# Tool 1: Product Search
# ---------------------------------------------------------------------------
@tool
def product_search(
    query: str,
    genre: Optional[str] = None,
    price_min: Optional[float] = None,
    price_max: Optional[float] = None,
    sort_by: str = "relevance",
    limit: int = 10,
    in_stock_only: bool = True,
) -> dict:
    """
    Search the MangaAssist product catalog for manga titles, authors, or genres.

    Use this tool when a customer asks about finding manga, browsing titles,
    or searching for specific series. Supports Japanese and English queries.

    Args:
        query: Search text in Japanese or English (e.g., "進撃の巨人" or "Attack on Titan")
        genre: Optional genre filter (shonen, shojo, seinen, josei, isekai, etc.)
        price_min: Minimum price in JPY (e.g., 400)
        price_max: Maximum price in JPY (e.g., 2000)
        sort_by: Sort order - relevance, price_asc, price_desc, rating, release_date, popularity
        limit: Number of results to return (1-50, default 10)
        in_stock_only: If True, only return items currently in stock

    Returns:
        dict with 'success', 'data' (list of products), and 'metadata'
    """
    start_time = time.time()
    errors = []

    # --- Parameter Validation ---
    valid, query, err = ParamValidator.validate_string(query, "query", max_length=500)
    if not valid:
        errors.append(err)

    # Check if query contains Japanese; apply stricter length limit
    has_japanese = bool(re.search(r'[\u3040-\u309F\u30A0-\u30FF\u4E00-\u9FFF]', query or ""))
    if has_japanese and query:
        valid, query, err = ParamValidator.validate_japanese_text(query, "query")
        if not valid:
            errors.append(err)

    if genre:
        valid, genre, err = ParamValidator.validate_enum(genre, "genre", ToolConfig.VALID_GENRES)
        if not valid:
            errors.append(err)

    valid, sort_by, err = ParamValidator.validate_enum(sort_by, "sort_by", ToolConfig.VALID_SORT_FIELDS)
    if not valid:
        errors.append(err)

    if price_min is not None:
        valid, price_min, err = ParamValidator.validate_number(
            price_min, "price_min", min_val=ToolConfig.PRICE_MIN_JPY, max_val=ToolConfig.PRICE_MAX_JPY
        )
        if not valid:
            errors.append(err)

    if price_max is not None:
        valid, price_max, err = ParamValidator.validate_number(
            price_max, "price_max", min_val=ToolConfig.PRICE_MIN_JPY, max_val=ToolConfig.PRICE_MAX_JPY
        )
        if not valid:
            errors.append(err)

    if price_min is not None and price_max is not None and price_min > price_max:
        errors.append("price_min cannot be greater than price_max")

    valid, limit, err = ParamValidator.validate_number(
        limit, "limit", min_val=1, max_val=ToolConfig.MAX_SEARCH_LIMIT
    )
    if not valid:
        errors.append(err)
    limit = int(limit)

    if errors:
        return ToolResponse(
            success=False,
            error_code="VALIDATION_ERROR",
            error_message="; ".join(errors),
        ).to_dict()

    # --- Build OpenSearch Query ---
    try:
        # Generate embedding for semantic search
        bedrock = get_bedrock_runtime()
        embed_response = bedrock.invoke_model(
            modelId=ToolConfig.BEDROCK_EMBED_MODEL,
            body=json.dumps({"inputText": query}),
        )
        embedding = json.loads(embed_response["body"].read())["embedding"]

        # Hybrid search: keyword + vector
        opensearch_query = {
            "size": limit,
            "query": {
                "bool": {
                    "should": [
                        {
                            "multi_match": {
                                "query": query,
                                "fields": [
                                    "title_ja^3", "title_en^2",
                                    "author_ja^2", "author_en",
                                    "description_ja", "description_en",
                                    "genre", "tags",
                                ],
                                "type": "best_fields",
                                "fuzziness": "AUTO",
                            }
                        },
                        {
                            "knn": {
                                "embedding": {
                                    "vector": embedding,
                                    "k": limit,
                                }
                            }
                        },
                    ],
                    "filter": [],
                }
            },
        }

        # Apply filters
        if genre:
            opensearch_query["query"]["bool"]["filter"].append(
                {"term": {"genre": genre}}
            )
        if in_stock_only:
            opensearch_query["query"]["bool"]["filter"].append(
                {"term": {"in_stock": True}}
            )
        if price_min is not None or price_max is not None:
            price_range = {}
            if price_min is not None:
                price_range["gte"] = price_min
            if price_max is not None:
                price_range["lte"] = price_max
            opensearch_query["query"]["bool"]["filter"].append(
                {"range": {"price_jpy": price_range}}
            )

        # Apply sorting
        sort_mapping = {
            "relevance": "_score",
            "price_asc": {"price_jpy": {"order": "asc"}},
            "price_desc": {"price_jpy": {"order": "desc"}},
            "rating": {"average_rating": {"order": "desc"}},
            "release_date": {"release_date": {"order": "desc"}},
            "popularity": {"sales_rank": {"order": "asc"}},
        }
        if sort_by != "relevance":
            opensearch_query["sort"] = [sort_mapping[sort_by]]

        # Execute search
        os_client = get_opensearch()
        response = os_client.search(index="manga-products", body=opensearch_query)

        # Format results
        products = []
        for hit in response["hits"]["hits"]:
            src = hit["_source"]
            products.append({
                "product_id": src["product_id"],
                "title_ja": src.get("title_ja", ""),
                "title_en": src.get("title_en", ""),
                "author": src.get("author_ja", src.get("author_en", "")),
                "genre": src.get("genre", ""),
                "price_jpy": src.get("price_jpy", 0),
                "average_rating": src.get("average_rating", 0),
                "in_stock": src.get("in_stock", False),
                "volume_count": src.get("volume_count", 1),
                "cover_image_url": src.get("cover_image_url", ""),
                "relevance_score": round(hit["_score"], 3),
            })

        elapsed_ms = round((time.time() - start_time) * 1000, 1)

        return ToolResponse(
            success=True,
            data={
                "products": products,
                "total_results": response["hits"]["total"]["value"],
                "returned": len(products),
                "query_interpreted": query,
            },
            metadata={
                "execution_time_ms": elapsed_ms,
                "search_type": "hybrid_knn_keyword",
                "has_japanese_query": has_japanese,
            },
        ).to_dict()

    except Exception as e:
        elapsed_ms = round((time.time() - start_time) * 1000, 1)
        logger.error(f"product_search failed: {e}", exc_info=True)
        return ToolResponse(
            success=False,
            error_code="SEARCH_ERROR",
            error_message=f"Product search failed: {str(e)}",
            metadata={"execution_time_ms": elapsed_ms},
        ).to_dict()


# ---------------------------------------------------------------------------
# Tool 2: Order Lookup
# ---------------------------------------------------------------------------
@tool
def order_lookup(
    order_id: Optional[str] = None,
    customer_email: Optional[str] = None,
    status_filter: Optional[str] = None,
    days_back: int = 30,
) -> dict:
    """
    Look up order details for a MangaAssist customer.

    Use this tool when a customer asks about their order status, delivery tracking,
    or order history. Requires either an order_id or customer_email.

    Args:
        order_id: Specific order ID (format: MA-YYYYMMDD-XXXXX)
        customer_email: Customer email to look up all recent orders
        status_filter: Filter by status (pending, processing, shipped, delivered, cancelled, returned)
        days_back: How many days of history to search (1-365, default 30)

    Returns:
        dict with order details including items, status, tracking, and delivery estimates
    """
    start_time = time.time()
    errors = []

    if not order_id and not customer_email:
        return ToolResponse(
            success=False,
            error_code="VALIDATION_ERROR",
            error_message="Either order_id or customer_email is required",
        ).to_dict()

    if order_id:
        valid, order_id, err = ParamValidator.validate_order_id(order_id)
        if not valid:
            errors.append(err)

    if customer_email:
        valid, customer_email, err = ParamValidator.validate_string(
            customer_email, "customer_email", max_length=254,
            pattern=r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$'
        )
        if not valid:
            errors.append(err)

    if status_filter:
        valid, status_filter, err = ParamValidator.validate_enum(
            status_filter, "status_filter", ToolConfig.VALID_ORDER_STATUSES
        )
        if not valid:
            errors.append(err)

    valid, days_back, err = ParamValidator.validate_number(days_back, "days_back", min_val=1, max_val=365)
    if not valid:
        errors.append(err)
    days_back = int(days_back)

    if errors:
        return ToolResponse(
            success=False, error_code="VALIDATION_ERROR", error_message="; ".join(errors),
        ).to_dict()

    try:
        dynamodb = get_dynamodb()
        orders_table = dynamodb.Table(ToolConfig.DYNAMODB_ORDERS_TABLE)

        if order_id:
            # Direct lookup by order_id
            response = orders_table.get_item(Key={"order_id": order_id})
            if "Item" not in response:
                return ToolResponse(
                    success=True,
                    data={"orders": [], "message": f"No order found with ID {order_id}"},
                ).to_dict()
            orders = [response["Item"]]
        else:
            # Query by email with date range
            from boto3.dynamodb.conditions import Key as DDBKey, Attr
            cutoff_date = (datetime.utcnow() - timedelta(days=days_back)).isoformat()

            response = orders_table.query(
                IndexName="email-date-index",
                KeyConditionExpression=(
                    DDBKey("customer_email").eq(customer_email)
                    & DDBKey("order_date").gte(cutoff_date)
                ),
            )
            orders = response.get("Items", [])

        if status_filter:
            orders = [o for o in orders if o.get("status", "").lower() == status_filter]

        formatted_orders = []
        for order in orders:
            formatted_orders.append({
                "order_id": order["order_id"],
                "status": order.get("status", "unknown"),
                "order_date": order.get("order_date", ""),
                "items": order.get("items", []),
                "total_jpy": order.get("total_jpy", 0),
                "shipping_method": order.get("shipping_method", "standard"),
                "tracking_number": order.get("tracking_number"),
                "estimated_delivery": order.get("estimated_delivery"),
                "delivery_address_prefecture": order.get("delivery_prefecture", ""),
            })

        elapsed_ms = round((time.time() - start_time) * 1000, 1)

        return ToolResponse(
            success=True,
            data={"orders": formatted_orders, "total_found": len(formatted_orders)},
            metadata={"execution_time_ms": elapsed_ms},
        ).to_dict()

    except Exception as e:
        elapsed_ms = round((time.time() - start_time) * 1000, 1)
        logger.error(f"order_lookup failed: {e}", exc_info=True)
        return ToolResponse(
            success=False,
            error_code="ORDER_LOOKUP_ERROR",
            error_message=f"Order lookup failed: {str(e)}",
            metadata={"execution_time_ms": elapsed_ms},
        ).to_dict()


# ---------------------------------------------------------------------------
# Tool 3: Recommendation Engine
# ---------------------------------------------------------------------------
@tool
def recommendation_engine(
    based_on: str,
    input_type: str = "title",
    max_recommendations: int = 5,
    exclude_owned: bool = True,
    customer_id: Optional[str] = None,
) -> dict:
    """
    Get personalized manga recommendations for a customer.

    Use this tool when a customer asks for suggestions, similar titles, or
    "what should I read next" type questions.

    Args:
        based_on: The title, genre, or author to base recommendations on
        input_type: What 'based_on' represents - "title", "genre", "author", or "description"
        max_recommendations: Number of recommendations (1-20, default 5)
        exclude_owned: Exclude titles the customer already purchased (requires customer_id)
        customer_id: Customer ID for personalization and purchase-history exclusion

    Returns:
        dict with recommended manga titles, similarity scores, and reasons
    """
    start_time = time.time()
    errors = []

    valid, based_on, err = ParamValidator.validate_string(based_on, "based_on", max_length=300)
    if not valid:
        errors.append(err)

    valid_input_types = ["title", "genre", "author", "description"]
    valid, input_type, err = ParamValidator.validate_enum(input_type, "input_type", valid_input_types)
    if not valid:
        errors.append(err)

    valid, max_recommendations, err = ParamValidator.validate_number(
        max_recommendations, "max_recommendations", min_val=1, max_val=20
    )
    if not valid:
        errors.append(err)
    max_recommendations = int(max_recommendations)

    if errors:
        return ToolResponse(
            success=False, error_code="VALIDATION_ERROR", error_message="; ".join(errors),
        ).to_dict()

    try:
        # Step 1: Generate embedding for the input
        bedrock = get_bedrock_runtime()
        prompt_text = f"Manga {input_type}: {based_on}"
        embed_response = bedrock.invoke_model(
            modelId=ToolConfig.BEDROCK_EMBED_MODEL,
            body=json.dumps({"inputText": prompt_text}),
        )
        embedding = json.loads(embed_response["body"].read())["embedding"]

        # Step 2: Find similar manga via OpenSearch KNN
        os_client = get_opensearch()
        knn_query = {
            "size": max_recommendations + 10,  # Fetch extra for filtering
            "query": {
                "knn": {
                    "embedding": {
                        "vector": embedding,
                        "k": max_recommendations + 10,
                    }
                }
            },
        }

        response = os_client.search(index="manga-products", body=knn_query)

        # Step 3: Filter and rank
        candidates = []
        owned_ids = set()

        if exclude_owned and customer_id:
            dynamodb = get_dynamodb()
            orders_table = dynamodb.Table(ToolConfig.DYNAMODB_ORDERS_TABLE)
            purchase_response = orders_table.query(
                IndexName="customer-id-index",
                KeyConditionExpression=boto3.dynamodb.conditions.Key("customer_id").eq(customer_id),
                ProjectionExpression="items",
            )
            for order in purchase_response.get("Items", []):
                for item in order.get("items", []):
                    owned_ids.add(item.get("product_id"))

        for hit in response["hits"]["hits"]:
            src = hit["_source"]
            product_id = src["product_id"]

            if product_id in owned_ids:
                continue

            # Skip the exact input title if searching by title
            if input_type == "title" and (
                src.get("title_ja", "").lower() == based_on.lower()
                or src.get("title_en", "").lower() == based_on.lower()
            ):
                continue

            candidates.append({
                "product_id": product_id,
                "title_ja": src.get("title_ja", ""),
                "title_en": src.get("title_en", ""),
                "author": src.get("author_ja", src.get("author_en", "")),
                "genre": src.get("genre", ""),
                "average_rating": src.get("average_rating", 0),
                "price_jpy": src.get("price_jpy", 0),
                "similarity_score": round(hit["_score"], 4),
                "reason": _generate_recommendation_reason(src, input_type, based_on),
            })

            if len(candidates) >= max_recommendations:
                break

        elapsed_ms = round((time.time() - start_time) * 1000, 1)

        return ToolResponse(
            success=True,
            data={
                "recommendations": candidates,
                "based_on": based_on,
                "input_type": input_type,
                "total_returned": len(candidates),
            },
            metadata={
                "execution_time_ms": elapsed_ms,
                "owned_excluded": len(owned_ids),
            },
        ).to_dict()

    except Exception as e:
        elapsed_ms = round((time.time() - start_time) * 1000, 1)
        logger.error(f"recommendation_engine failed: {e}", exc_info=True)
        return ToolResponse(
            success=False,
            error_code="RECOMMENDATION_ERROR",
            error_message=f"Recommendation engine failed: {str(e)}",
            metadata={"execution_time_ms": elapsed_ms},
        ).to_dict()


def _generate_recommendation_reason(product: dict, input_type: str, based_on: str) -> str:
    """Generate a human-readable reason for a recommendation."""
    genre = product.get("genre", "")
    author = product.get("author_ja", product.get("author_en", ""))
    rating = product.get("average_rating", 0)

    if input_type == "title":
        return f"Similar to {based_on} in the {genre} genre, rated {rating}/5"
    elif input_type == "author":
        return f"By {author or 'the same author'}, rated {rating}/5"
    elif input_type == "genre":
        return f"Top-rated {genre} manga, rated {rating}/5"
    else:
        return f"Matches your description, rated {rating}/5"


# ---------------------------------------------------------------------------
# Tool 4: Inventory Check
# ---------------------------------------------------------------------------
@tool
def inventory_check(
    product_ids: list[str],
    include_restock_date: bool = True,
    include_nearby_stores: bool = False,
    prefecture: Optional[str] = None,
) -> dict:
    """
    Check real-time inventory availability for manga products.

    Use this tool to verify stock levels before recommending products or when
    customers ask about availability. Can check multiple products at once.

    Args:
        product_ids: List of product IDs to check (max 25 per call)
        include_restock_date: Include expected restock date for out-of-stock items
        include_nearby_stores: Check physical store inventory (requires prefecture)
        prefecture: Customer's prefecture for nearby store lookup (e.g., "Tokyo", "Osaka")

    Returns:
        dict with inventory status for each product_id
    """
    start_time = time.time()
    errors = []

    if not product_ids:
        return ToolResponse(
            success=False,
            error_code="VALIDATION_ERROR",
            error_message="product_ids list is required and cannot be empty",
        ).to_dict()

    if not isinstance(product_ids, list):
        try:
            product_ids = [str(product_ids)]
        except Exception:
            return ToolResponse(
                success=False,
                error_code="VALIDATION_ERROR",
                error_message="product_ids must be a list of product ID strings",
            ).to_dict()

    if len(product_ids) > 25:
        return ToolResponse(
            success=False,
            error_code="VALIDATION_ERROR",
            error_message="Maximum 25 product IDs per inventory check",
        ).to_dict()

    if include_nearby_stores and not prefecture:
        errors.append("prefecture is required when include_nearby_stores is True")

    if errors:
        return ToolResponse(
            success=False, error_code="VALIDATION_ERROR", error_message="; ".join(errors),
        ).to_dict()

    try:
        dynamodb = get_dynamodb()
        inventory_table = dynamodb.Table(ToolConfig.DYNAMODB_INVENTORY_TABLE)

        # Batch get for efficiency
        response = dynamodb.batch_get_item(
            RequestItems={
                ToolConfig.DYNAMODB_INVENTORY_TABLE: {
                    "Keys": [{"product_id": pid} for pid in product_ids],
                }
            }
        )

        items = {
            item["product_id"]: item
            for item in response.get("Responses", {}).get(ToolConfig.DYNAMODB_INVENTORY_TABLE, [])
        }

        inventory_results = []
        for pid in product_ids:
            if pid in items:
                item = items[pid]
                result = {
                    "product_id": pid,
                    "in_stock": item.get("quantity", 0) > 0,
                    "quantity_available": item.get("quantity", 0),
                    "warehouse_location": item.get("warehouse", ""),
                    "shipping_estimate_days": _estimate_shipping(item.get("warehouse", "")),
                }
                if include_restock_date and item.get("quantity", 0) == 0:
                    result["expected_restock_date"] = item.get("restock_date", "unknown")
                inventory_results.append(result)
            else:
                inventory_results.append({
                    "product_id": pid,
                    "in_stock": False,
                    "quantity_available": 0,
                    "error": "Product not found in inventory system",
                })

        elapsed_ms = round((time.time() - start_time) * 1000, 1)

        return ToolResponse(
            success=True,
            data={
                "inventory": inventory_results,
                "checked_count": len(product_ids),
                "in_stock_count": sum(1 for r in inventory_results if r.get("in_stock")),
            },
            metadata={"execution_time_ms": elapsed_ms},
        ).to_dict()

    except Exception as e:
        elapsed_ms = round((time.time() - start_time) * 1000, 1)
        logger.error(f"inventory_check failed: {e}", exc_info=True)
        return ToolResponse(
            success=False,
            error_code="INVENTORY_ERROR",
            error_message=f"Inventory check failed: {str(e)}",
            metadata={"execution_time_ms": elapsed_ms},
        ).to_dict()


def _estimate_shipping(warehouse: str) -> int:
    """Estimate shipping days based on warehouse location."""
    estimates = {
        "tokyo": 1, "osaka": 1, "nagoya": 2, "fukuoka": 2,
        "sapporo": 3, "sendai": 2, "hiroshima": 2,
    }
    return estimates.get(warehouse.lower(), 3)


# ---------------------------------------------------------------------------
# Agent Assembly: Wiring Tools into the Strands Agent
# ---------------------------------------------------------------------------
def create_mangaassist_agent(
    model_tier: str = "sonnet",
    customer_context: Optional[dict] = None,
) -> Agent:
    """
    Factory function to create a fully configured MangaAssist Strands agent.

    Args:
        model_tier: "sonnet" for complex queries, "haiku" for simple lookups
        customer_context: Optional dict with customer_id, name, preferences

    Returns:
        Configured Strands Agent with all tools registered
    """
    model_ids = {
        "sonnet": "anthropic.claude-3-sonnet-20240229-v1:0",
        "haiku": "anthropic.claude-3-haiku-20240307-v1:0",
    }

    model = BedrockModel(
        model_id=model_ids.get(model_tier, model_ids["sonnet"]),
        max_tokens=2048,
        temperature=0.1,  # Low temperature for factual tool-based responses
    )

    # Build customer-aware system prompt
    system_prompt = """You are MangaAssist, a helpful manga store assistant for a Japanese manga e-commerce platform.

CORE BEHAVIORS:
- Always respond in the same language the customer uses (Japanese or English)
- Use tools to look up real data; NEVER fabricate product details, prices, or availability
- If a tool returns an error, explain the situation honestly and suggest alternatives
- For product recommendations, always verify inventory before suggesting

TOOL USAGE GUIDELINES:
- product_search: Use for finding manga by title, author, genre, or description
- order_lookup: Use when customer asks about orders, shipping, or delivery
- recommendation_engine: Use for "suggest", "recommend", "similar to" requests
- inventory_check: Always verify stock before confirming availability

TOOL CHAINING:
- For recommendations: recommendation_engine -> inventory_check -> present only in-stock items
- For orders with items: order_lookup -> product_search (for item details) if needed
- For availability: product_search -> inventory_check

RESPONSE FORMAT:
- Keep responses concise but informative
- Include prices in JPY with the yen symbol
- Include both Japanese and English titles when available"""

    if customer_context:
        system_prompt += f"""

CUSTOMER CONTEXT:
- Customer ID: {customer_context.get('customer_id', 'unknown')}
- Name: {customer_context.get('name', 'Customer')}
- Preferred Language: {customer_context.get('language', 'ja')}
- Membership Tier: {customer_context.get('tier', 'standard')}"""

    agent = Agent(
        model=model,
        tools=[product_search, order_lookup, recommendation_engine, inventory_check],
        system_prompt=system_prompt,
    )

    # Register tools in the registry for monitoring
    tool_registry.register("product_search", product_search, category="catalog", version="2.1.0",
                           description="Search manga catalog", timeout_ms=3000)
    tool_registry.register("order_lookup", order_lookup, category="orders", version="1.3.0",
                           description="Look up order details", timeout_ms=2000)
    tool_registry.register("recommendation_engine", recommendation_engine, category="catalog", version="1.5.0",
                           description="Generate manga recommendations", timeout_ms=4000,
                           fallback="product_search")
    tool_registry.register("inventory_check", inventory_check, category="inventory", version="1.1.0",
                           description="Check product availability", timeout_ms=2000)

    return agent


# ---------------------------------------------------------------------------
# Lambda Handler: Entry Point for Tool Invocations
# ---------------------------------------------------------------------------
def lambda_handler(event: dict, context) -> dict:
    """
    AWS Lambda handler for MangaAssist tool invocations.

    Supports both direct tool calls and agent-mediated conversations.
    Implements error handling, parameter validation, and response formatting.
    """
    request_id = context.aws_request_id if context else "local"
    logger.info(f"[{request_id}] Received event: {json.dumps(event)[:500]}")

    try:
        action = event.get("action", "chat")

        if action == "chat":
            # Agent-mediated conversation
            customer_context = event.get("customer_context")
            message = event.get("message", "")
            model_tier = event.get("model_tier", "sonnet")

            if not message:
                return {"statusCode": 400, "body": {"error": "message is required"}}

            agent = create_mangaassist_agent(model_tier=model_tier, customer_context=customer_context)
            response = agent(message)

            return {
                "statusCode": 200,
                "body": {
                    "response": str(response),
                    "tool_metrics": tool_registry.get_metrics(),
                },
            }

        elif action == "direct_tool":
            # Direct tool invocation (for testing or orchestrator bypass)
            tool_name = event.get("tool_name")
            tool_params = event.get("tool_params", {})

            tool_fn = tool_registry.get_tool(tool_name)
            if not tool_fn:
                return {
                    "statusCode": 404,
                    "body": {"error": f"Tool not found or unavailable: {tool_name}"},
                }

            result = tool_fn(**tool_params)
            tool_registry.record_success(tool_name)

            return {"statusCode": 200, "body": result}

        elif action == "health":
            return {
                "statusCode": 200,
                "body": {
                    "status": "healthy",
                    "tools": tool_registry.get_metrics(),
                },
            }

        else:
            return {"statusCode": 400, "body": {"error": f"Unknown action: {action}"}}

    except Exception as e:
        logger.error(f"[{request_id}] Handler error: {e}", exc_info=True)
        return {
            "statusCode": 500,
            "body": {"error": "Internal server error", "request_id": request_id},
        }

Standardized Function Schema Specification

MangaAssist enforces a schema contract for every tool. Below is the schema for product_search as the FM sees it:

{
  "name": "product_search",
  "description": "Search the MangaAssist product catalog for manga titles, authors, or genres. Supports Japanese and English queries.",
  "input_schema": {
    "type": "object",
    "properties": {
      "query": {
        "type": "string",
        "description": "Search text in Japanese or English (e.g., '進撃の巨人' or 'Attack on Titan')",
        "maxLength": 500
      },
      "genre": {
        "type": "string",
        "enum": ["shonen", "shojo", "seinen", "josei", "kodomomuke", "isekai", "mecha", "slice_of_life", "horror", "romance", "action", "comedy", "drama", "fantasy", "sci_fi", "sports", "mystery", "psychological", "historical"],
        "description": "Genre filter"
      },
      "price_min": {
        "type": "number",
        "minimum": 0,
        "maximum": 100000,
        "description": "Minimum price in JPY"
      },
      "price_max": {
        "type": "number",
        "minimum": 0,
        "maximum": 100000,
        "description": "Maximum price in JPY"
      },
      "sort_by": {
        "type": "string",
        "enum": ["relevance", "price_asc", "price_desc", "rating", "release_date", "popularity"],
        "default": "relevance",
        "description": "Sort order for results"
      },
      "limit": {
        "type": "integer",
        "minimum": 1,
        "maximum": 50,
        "default": 10,
        "description": "Number of results"
      },
      "in_stock_only": {
        "type": "boolean",
        "default": true,
        "description": "Only return in-stock items"
      }
    },
    "required": ["query"]
  }
}

Tool Discovery and Registration Lifecycle

Tool Deployment and Discovery Flow
====================================

  Build Time                    Deploy Time                   Runtime
  ----------                    -----------                   -------

 [Tool Code]              [CloudFormation/CDK]         [Strands Agent Startup]
      |                          |                            |
      v                          v                            v
 Type-hint based         Deploy Lambda + API          Load tool manifests
 schema generation       endpoints for tools           from DynamoDB or S3
      |                          |                            |
      v                          v                            v
 Generate JSON           Register in Service           Validate schemas
 Schema from             Discovery (Cloud Map          against expected
 @tool decorators        or DynamoDB registry)         contract
      |                          |                            |
      v                          v                            v
 Store schemas           Health check endpoints         Bind tools to
 in tool manifest        registered in Route 53         agent context
 (tool_manifest.json)    for availability               |
                                                        v
                                                   [Agent Ready]
                                                        |
                                                   On each request:
                                                        |
                                                   1. Check circuit breakers
                                                   2. Verify tool health
                                                   3. Route to tool or fallback
                                                   4. Record metrics

Tool Chaining Patterns in MangaAssist

Pattern 1: Sequential Pipeline

async def handle_recommendation_request(user_message: str, customer_id: str):
    """
    Chain: search -> recommend -> inventory -> format

    Used when customer says: "Recommend something like Naruto that's in stock"
    """
    # Step 1: Find the source manga
    search_result = product_search(query=user_message, limit=1)
    if not search_result["success"] or not search_result["data"]["products"]:
        return "I couldn't find that manga title. Could you double-check the name?"

    source_manga = search_result["data"]["products"][0]

    # Step 2: Get recommendations
    rec_result = recommendation_engine(
        based_on=source_manga["title_en"] or source_manga["title_ja"],
        input_type="title",
        max_recommendations=10,
        customer_id=customer_id,
    )
    if not rec_result["success"]:
        return "I'm having trouble generating recommendations right now. Please try again."

    # Step 3: Check inventory for all recommendations
    rec_ids = [r["product_id"] for r in rec_result["data"]["recommendations"]]
    inv_result = inventory_check(product_ids=rec_ids)

    # Step 4: Merge and filter
    in_stock_ids = set()
    if inv_result["success"]:
        in_stock_ids = {
            item["product_id"]
            for item in inv_result["data"]["inventory"]
            if item.get("in_stock")
        }

    # Return only in-stock recommendations
    final_recs = [
        r for r in rec_result["data"]["recommendations"]
        if r["product_id"] in in_stock_ids
    ][:5]

    return final_recs

Pattern 2: Parallel Fan-Out

import asyncio

async def handle_product_detail_request(product_id: str, customer_id: str):
    """
    Fan-out: [inventory, reviews, related] executed in parallel.

    Used when customer says: "Tell me about One Piece volume 100"
    """
    # Execute all lookups in parallel to stay under 3s budget
    inventory_task = asyncio.create_task(
        asyncio.to_thread(inventory_check, product_ids=[product_id])
    )
    search_task = asyncio.create_task(
        asyncio.to_thread(product_search, query=product_id, limit=1)
    )
    rec_task = asyncio.create_task(
        asyncio.to_thread(
            recommendation_engine,
            based_on=product_id,
            input_type="title",
            max_recommendations=3,
            customer_id=customer_id,
        )
    )

    # Gather results (timeout after 2.5s to leave 0.5s for response generation)
    results = await asyncio.gather(
        inventory_task, search_task, rec_task,
        return_exceptions=True,
    )

    inventory_result, product_result, related_result = results

    # Compose response even if some tools failed
    response_data = {"product_id": product_id}

    if not isinstance(product_result, Exception) and product_result.get("success"):
        response_data["product"] = product_result["data"]["products"][0] if product_result["data"]["products"] else None

    if not isinstance(inventory_result, Exception) and inventory_result.get("success"):
        response_data["availability"] = inventory_result["data"]["inventory"][0] if inventory_result["data"]["inventory"] else None

    if not isinstance(related_result, Exception) and related_result.get("success"):
        response_data["related"] = related_result["data"]["recommendations"]
    else:
        response_data["related"] = []  # Graceful degradation

    return response_data

Pattern 3: Conditional Branching

def route_customer_intent(intent: str, params: dict) -> dict:
    """
    Route to different tool chains based on detected intent.

    The FM classifies intent first, then the orchestrator picks the right chain.
    """
    chains = {
        "product_search": lambda p: product_search(**p),
        "order_status": lambda p: order_lookup(**p),
        "recommendation": lambda p: _recommendation_chain(p),
        "availability": lambda p: _availability_chain(p),
        "order_and_track": lambda p: order_lookup(order_id=p.get("order_id")),
    }

    chain_fn = chains.get(intent)
    if not chain_fn:
        return ToolResponse(
            success=False,
            error_code="UNKNOWN_INTENT",
            error_message=f"No tool chain for intent: {intent}",
        ).to_dict()

    return chain_fn(params)


def _recommendation_chain(params: dict) -> dict:
    """Recommendation with automatic inventory filtering."""
    recs = recommendation_engine(**params)
    if not recs["success"]:
        return recs

    product_ids = [r["product_id"] for r in recs["data"]["recommendations"]]
    if product_ids:
        inv = inventory_check(product_ids=product_ids)
        if inv["success"]:
            in_stock = {i["product_id"] for i in inv["data"]["inventory"] if i["in_stock"]}
            recs["data"]["recommendations"] = [
                {**r, "in_stock": r["product_id"] in in_stock}
                for r in recs["data"]["recommendations"]
            ]
    return recs


def _availability_chain(params: dict) -> dict:
    """Search first, then check inventory."""
    search = product_search(query=params.get("query", ""), limit=5)
    if not search["success"]:
        return search

    product_ids = [p["product_id"] for p in search["data"]["products"]]
    if product_ids:
        inv = inventory_check(product_ids=product_ids)
        if inv["success"]:
            inv_map = {i["product_id"]: i for i in inv["data"]["inventory"]}
            for product in search["data"]["products"]:
                inv_info = inv_map.get(product["product_id"], {})
                product["availability"] = {
                    "in_stock": inv_info.get("in_stock", False),
                    "quantity": inv_info.get("quantity_available", 0),
                }
    return search

Key Takeaways

1. Every Tool Needs a Standardized Response Envelope

The FM must reliably parse tool outputs. MangaAssist wraps every tool response in a ToolResponse with success, data, error, and metadata fields. This eliminates ambiguity -- the FM never has to guess whether a tool call succeeded.

2. Parameter Validation Is the First Line of Defense

FMs generate tool parameters from natural language, which means type mismatches, out-of-range values, and encoding issues are common. MangaAssist validates every parameter before execution using the ParamValidator class with type coercion (string-to-number), Japanese text validation, and enum fuzzy matching.

3. Tool Definitions Drive FM Behavior

The quality of your @tool docstrings and type hints directly determines how well the FM uses your tools. Vague descriptions lead to wrong tool selection; missing constraints lead to invalid parameters. Invest heavily in tool descriptions, parameter docs, and examples.

4. Tool Registry Enables Operational Control

The ToolRegistry pattern provides circuit breakers, fallback routing, health checking, and usage metrics. When recommendation_engine fails, MangaAssist automatically falls back to product_search. This keeps the chatbot functional even during partial outages.

5. Tool Chaining Must Be Fault-Tolerant

Sequential chains fail completely if any step fails. MangaAssist uses three patterns -- sequential (for dependent data), parallel fan-out (for independent lookups), and conditional branching (for intent routing) -- with graceful degradation at every step. A failed inventory check still returns recommendations; it just cannot confirm stock.

6. Cache Aggressively, Fail Open

Product searches and recommendations are cached in Redis (5-10 minute TTL). If Redis is down, the tool executes normally without cache -- fail-open. This keeps the 3-second latency target achievable at 1M messages/day scale.

7. Strands Simplifies the Wiring

The Strands @tool decorator, Agent class, and BedrockModel integration handle schema generation, tool binding, and conversation management. The developer focuses on tool logic and validation, not on plumbing the FM-to-tool interface.

8. Cost Awareness in Tool Design

MangaAssist uses Haiku ($0.25/$1.25 per 1M tokens) for simple lookups and Sonnet ($3/$15) for complex recommendations. Tool definitions are kept concise to minimize context window consumption. At 1M messages/day, every unnecessary token in a tool schema costs real money.


Exam-Relevant Concepts (AWS AIP-C01)

Concept MangaAssist Implementation Why It Matters
Strands API custom behaviors @tool decorator, Agent class, tool registry Core framework for building agentic tool use
Standardized function definitions JSON Schema, type hints, docstrings FM relies on schemas to call tools correctly
Lambda for error handling lambda_handler with try/except, circuit breakers Serverless tool execution with built-in resilience
Parameter validation ParamValidator class, type coercion, Japanese text Prevents malformed FM-generated params from breaking tools
Tool chaining Sequential, parallel, conditional patterns Complex queries require multi-tool orchestration
Graceful degradation Fallback tools, partial results, fail-open cache System stays useful even during partial failures
Tool discovery ToolRegistry.discover_by_category(), health checks Dynamic tool availability management