LOCAL PREVIEW View on GitHub

Standardized Function Definitions and Error Handling Patterns

MangaAssist context: JP Manga store chatbot on AWS — Bedrock Claude 3 (Sonnet at $3/$15 per 1M tokens input/output, Haiku at $0.25/$1.25), OpenSearch Serverless (vector store), DynamoDB (sessions/products), ECS Fargate (orchestrator), API Gateway WebSocket, ElastiCache Redis. Target: useful answer in under 3 seconds, 1M messages/day scale.


Skill Mapping

Field Value
Certification AWS AI Practitioner (AIP-C01)
Domain 2 — Implementation and Integration of FM-Powered Applications
Task 2.1 — Select and implement appropriate FM integration strategies
Skill 2.1.6 — Implement intelligent tool integrations to extend FM capabilities and ensure reliable tool operations
Focus Areas Strands API for custom behaviors, standardized function definitions, Lambda for error handling and parameter validation

Mindmap: Function Definitions and Error Handling

mindmap
  root((2.1.6 Function Defs<br/>& Error Handling))
    Standardized Function Definitions
      JSON Schema Contracts
        Parameter types and constraints
        Required vs optional fields
        Enum values for controlled vocab
        Nested object schemas
      OpenAPI Spec Integration
        Tool as REST endpoint descriptor
        Request/response schema binding
        Version negotiation headers
      Bedrock Tool Use Format
        toolSpec name/description/inputSchema
        Auto-marshalling from Python types
        Multi-tool convoLoop pattern
    Strands API Custom Behaviors
      @tool Decorator Registration
        Type-hint driven schema gen
        Docstring to description mapping
        Async and sync tool variants
      Tool Chaining Orchestration
        Sequential pipeline A then B then C
        Parallel fan-out with asyncio.gather
        Conditional branching on tool output
        Context propagation across chain
      StrandsToolBinder
        Binds tools to agent at init
        Runtime tool enable/disable
        Per-turn tool filtering
    Error Handling Patterns
      Lambda Validation Layer
        Pre-invocation param checking
        Type coercion string to int
        Japanese encoding UTF-8 guard
        Injection detection in params
      Retry Strategies
        Exponential backoff with jitter
        Max retry ceiling 3 attempts
        Idempotency key for safe retries
      Circuit Breaker
        Failure threshold 5 in 60s window
        Half-open probe after cooldown
        Fallback tool on open circuit
      Dead Letter Queue
        SQS for unrecoverable failures
        CloudWatch alarm on DLQ depth
        Manual replay capability
    Production Patterns
      ToolRegistry singleton
        Register/discover/invoke
        Health check per tool
        Version management
      ParameterValidator
        JSON Schema validate
        Custom constraint rules
        Sanitization pipeline
      ToolErrorHandler
        Classify error severity
        User-friendly message map
        Metrics emission per error type
      Graceful Degradation
        Partial result assembly
        Cached fallback responses
        Static default when all tools fail

1. Standardized Function Definitions

1.1 Why Standardized Definitions Matter

When a foundation model decides to invoke a tool, it relies entirely on the tool definition to understand what the tool does, what parameters it expects, and what it returns. A poorly defined tool causes the FM to: - Hallucinate parameter values instead of asking the user - Pass wrong types (string instead of integer) breaking downstream services - Miss required fields causing silent failures - Misunderstand tool purpose leading to wrong tool selection

In MangaAssist, where we handle 1M messages/day with a 3-second latency budget, every malformed tool call wastes a round-trip that we cannot afford.

1.2 JSON Schema as the Contract Language

AWS Bedrock's tool-use API expects tool definitions in JSON Schema format. This is the universal contract between the FM and your tool implementations.

Anatomy of a Bedrock Tool Definition:

{
  "toolSpec": {
    "name": "product_search",
    "description": "Search the MangaAssist product catalog by title, author, genre, or ISBN. Returns matching manga products with pricing and availability. Use when the customer asks about finding, browsing, or looking for manga titles.",
    "inputSchema": {
      "json": {
        "type": "object",
        "properties": {
          "query": {
            "type": "string",
            "description": "The search query — can be a manga title, author name, or keyword. Supports Japanese characters (e.g., '進撃の巨人').",
            "minLength": 1,
            "maxLength": 200
          },
          "genre": {
            "type": "string",
            "description": "Filter results by manga genre.",
            "enum": ["shonen", "shojo", "seinen", "josei", "kodomomuke", "isekai", "mecha", "slice_of_life", "horror", "sports"]
          },
          "max_results": {
            "type": "integer",
            "description": "Maximum number of results to return. Defaults to 10 if not specified.",
            "minimum": 1,
            "maximum": 50,
            "default": 10
          },
          "in_stock_only": {
            "type": "boolean",
            "description": "If true, only return products currently in stock. Defaults to false.",
            "default": false
          },
          "price_range": {
            "type": "object",
            "description": "Optional price filter in JPY.",
            "properties": {
              "min_price": {
                "type": "number",
                "minimum": 0,
                "description": "Minimum price in JPY."
              },
              "max_price": {
                "type": "number",
                "minimum": 0,
                "description": "Maximum price in JPY."
              }
            }
          },
          "sort_by": {
            "type": "string",
            "description": "How to sort results.",
            "enum": ["relevance", "price_asc", "price_desc", "release_date", "popularity"],
            "default": "relevance"
          }
        },
        "required": ["query"]
      }
    }
  }
}

1.3 MangaAssist Tool Definition Catalog

Tool: order_lookup

{
  "toolSpec": {
    "name": "order_lookup",
    "description": "Look up an existing order by order ID or by the customer's email address. Returns order status, items, shipping details, and tracking information. Use when the customer asks about order status, delivery tracking, or order history.",
    "inputSchema": {
      "json": {
        "type": "object",
        "properties": {
          "order_id": {
            "type": "string",
            "description": "The order ID in format ORD-XXXXXXXXXX (e.g., ORD-2024031542).",
            "pattern": "^ORD-[0-9]{10}$"
          },
          "customer_email": {
            "type": "string",
            "description": "Customer email address to look up all orders. Must be a valid email format.",
            "format": "email"
          },
          "include_items": {
            "type": "boolean",
            "description": "Whether to include the full list of ordered items. Defaults to true.",
            "default": true
          },
          "include_tracking": {
            "type": "boolean",
            "description": "Whether to include shipping tracking details. Defaults to true.",
            "default": true
          }
        },
        "required": [],
        "oneOf": [
          { "required": ["order_id"] },
          { "required": ["customer_email"] }
        ]
      }
    }
  }
}

Tool: recommendation_engine

{
  "toolSpec": {
    "name": "recommendation_engine",
    "description": "Generate personalized manga recommendations based on customer purchase history, browsing behavior, or a specific manga title they enjoyed. Use when the customer asks for suggestions, 'what should I read next', or similar discovery queries.",
    "inputSchema": {
      "json": {
        "type": "object",
        "properties": {
          "customer_id": {
            "type": "string",
            "description": "The customer ID for personalized recommendations based on history."
          },
          "seed_title": {
            "type": "string",
            "description": "A manga title the customer liked — recommendations will be similar to this title."
          },
          "preferred_genres": {
            "type": "array",
            "description": "List of preferred genres to bias recommendations toward.",
            "items": {
              "type": "string",
              "enum": ["shonen", "shojo", "seinen", "josei", "kodomomuke", "isekai", "mecha", "slice_of_life", "horror", "sports"]
            },
            "maxItems": 5
          },
          "exclude_owned": {
            "type": "boolean",
            "description": "If true and customer_id is provided, exclude titles the customer already owns.",
            "default": true
          },
          "num_recommendations": {
            "type": "integer",
            "description": "Number of recommendations to return.",
            "minimum": 1,
            "maximum": 20,
            "default": 5
          },
          "diversity_factor": {
            "type": "number",
            "description": "Value between 0.0 (very similar) and 1.0 (very diverse) controlling recommendation variety.",
            "minimum": 0.0,
            "maximum": 1.0,
            "default": 0.3
          }
        },
        "required": []
      }
    }
  }
}

Tool: inventory_check

{
  "toolSpec": {
    "name": "inventory_check",
    "description": "Check real-time inventory and availability for a specific manga product. Returns stock count, warehouse location, and estimated restock date if out of stock. Use when the customer asks if something is available or in stock.",
    "inputSchema": {
      "json": {
        "type": "object",
        "properties": {
          "product_id": {
            "type": "string",
            "description": "The product ID (SKU) to check inventory for.",
            "pattern": "^MNG-[A-Z0-9]{8}$"
          },
          "isbn": {
            "type": "string",
            "description": "ISBN-13 of the manga volume.",
            "pattern": "^978[0-9]{10}$"
          },
          "warehouse_region": {
            "type": "string",
            "description": "Check inventory at a specific warehouse region.",
            "enum": ["tokyo", "osaka", "fukuoka", "sapporo", "all"],
            "default": "all"
          }
        },
        "required": [],
        "anyOf": [
          { "required": ["product_id"] },
          { "required": ["isbn"] }
        ]
      }
    }
  }
}

1.4 OpenAPI Spec Integration

For tools backed by REST APIs (common in microservice architectures), you can generate Bedrock tool definitions from OpenAPI specs:

import yaml
import json
from typing import Dict, Any


def openapi_to_bedrock_tool(openapi_path: str, operation_id: str) -> Dict[str, Any]:
    """
    Convert an OpenAPI operation to a Bedrock tool definition.

    This allows teams to maintain a single source of truth (the OpenAPI spec)
    and auto-generate tool definitions for the FM.
    """
    with open(openapi_path, "r") as f:
        spec = yaml.safe_load(f)

    # Find the operation by operationId
    for path, methods in spec.get("paths", {}).items():
        for method, operation in methods.items():
            if operation.get("operationId") == operation_id:
                # Extract request body schema or query parameters
                input_schema = _extract_input_schema(operation, spec)
                return {
                    "toolSpec": {
                        "name": operation_id,
                        "description": operation.get("summary", "")
                        + " "
                        + operation.get("description", ""),
                        "inputSchema": {"json": input_schema},
                    }
                }

    raise ValueError(f"Operation '{operation_id}' not found in OpenAPI spec")


def _extract_input_schema(operation: Dict, spec: Dict) -> Dict[str, Any]:
    """Build a JSON Schema from OpenAPI parameters and request body."""
    properties = {}
    required = []

    # Query/path parameters
    for param in operation.get("parameters", []):
        param_schema = _resolve_ref(param.get("schema", {}), spec)
        properties[param["name"]] = {
            "type": param_schema.get("type", "string"),
            "description": param.get("description", ""),
        }
        if param.get("required", False):
            required.append(param["name"])

    # Request body
    request_body = operation.get("requestBody", {})
    if request_body:
        content = request_body.get("content", {})
        json_schema = content.get("application/json", {}).get("schema", {})
        resolved = _resolve_ref(json_schema, spec)
        if resolved.get("properties"):
            properties.update(resolved["properties"])
            required.extend(resolved.get("required", []))

    return {
        "type": "object",
        "properties": properties,
        "required": required,
    }


def _resolve_ref(schema: Dict, spec: Dict) -> Dict:
    """Resolve $ref pointers in OpenAPI schema."""
    if "$ref" in schema:
        ref_path = schema["$ref"].lstrip("#/").split("/")
        resolved = spec
        for part in ref_path:
            resolved = resolved[part]
        return resolved
    return schema

2. Strands API Custom Behaviors and Tool Chaining

2.1 Strands Agent SDK Overview

The Strands Agents SDK provides a Python-native way to define tools, bind them to agents, and orchestrate tool chaining. It is the recommended approach for building agentic workflows on AWS Bedrock.

2.2 Tool Registration with @tool Decorator

"""
strands_tools.py — MangaAssist tool definitions using Strands Agents SDK.

Each tool is registered via the @tool decorator which:
1. Auto-generates JSON Schema from type hints
2. Extracts description from the docstring
3. Registers the tool in the global tool registry
4. Wraps execution with error handling hooks
"""

import asyncio
import json
import logging
from datetime import datetime
from typing import Any, Dict, List, Optional

import boto3
from strands import Agent, tool
from strands.types.tools import ToolResult, ToolUse

logger = logging.getLogger("mangaassist.tools")


# ---------------------------------------------------------------------------
# Tool 1: Product Search
# ---------------------------------------------------------------------------
@tool
def product_search(
    query: str,
    genre: Optional[str] = None,
    max_results: int = 10,
    in_stock_only: bool = False,
    sort_by: str = "relevance",
) -> Dict[str, Any]:
    """
    Search the MangaAssist product catalog for manga titles.

    Searches across title, author, and description fields using
    OpenSearch Serverless vector and keyword hybrid search.
    Supports Japanese and English queries.

    Args:
        query: Search query — title, author, keyword, or ISBN.
        genre: Filter by genre (shonen, shojo, seinen, josei, etc.).
        max_results: Max results to return (1-50, default 10).
        in_stock_only: Only return in-stock items.
        sort_by: Sort order — relevance, price_asc, price_desc, release_date, popularity.

    Returns:
        Dict with 'results' list and 'total_count' integer.
    """
    from mangaassist.services.opensearch import OpenSearchClient

    client = OpenSearchClient()
    filters = {}
    if genre:
        filters["genre"] = genre
    if in_stock_only:
        filters["in_stock"] = True

    results = client.hybrid_search(
        query=query,
        filters=filters,
        max_results=max_results,
        sort_by=sort_by,
    )
    return {
        "results": results,
        "total_count": len(results),
        "query": query,
        "filters_applied": filters,
    }


# ---------------------------------------------------------------------------
# Tool 2: Order Lookup
# ---------------------------------------------------------------------------
@tool
def order_lookup(
    order_id: Optional[str] = None,
    customer_email: Optional[str] = None,
    include_items: bool = True,
    include_tracking: bool = True,
) -> Dict[str, Any]:
    """
    Look up an existing manga order by order ID or customer email.

    Returns order status, items purchased, shipping info, and
    tracking details. Requires either order_id or customer_email.

    Args:
        order_id: Order ID in format ORD-XXXXXXXXXX.
        customer_email: Customer email to look up all their orders.
        include_items: Include ordered item details (default True).
        include_tracking: Include shipping tracking info (default True).

    Returns:
        Dict with order details or list of orders.
    """
    if not order_id and not customer_email:
        return {
            "error": "Either order_id or customer_email is required.",
            "error_code": "MISSING_IDENTIFIER",
        }

    dynamodb = boto3.resource("dynamodb")
    table = dynamodb.Table("MangaAssist-Orders")

    if order_id:
        response = table.get_item(Key={"order_id": order_id})
        order = response.get("Item")
        if not order:
            return {
                "error": f"Order {order_id} not found.",
                "error_code": "ORDER_NOT_FOUND",
            }
        return _format_order(order, include_items, include_tracking)
    else:
        response = table.query(
            IndexName="email-index",
            KeyConditionExpression="customer_email = :email",
            ExpressionAttributeValues={":email": customer_email},
            Limit=20,
        )
        orders = response.get("Items", [])
        return {
            "orders": [
                _format_order(o, include_items, include_tracking) for o in orders
            ],
            "total_orders": len(orders),
        }


def _format_order(
    order: Dict, include_items: bool, include_tracking: bool
) -> Dict[str, Any]:
    """Format a raw DynamoDB order record for FM consumption."""
    result = {
        "order_id": order["order_id"],
        "status": order["status"],
        "order_date": order["order_date"],
        "total_amount": f{order['total_jpy']:,}",
    }
    if include_items:
        result["items"] = order.get("items", [])
    if include_tracking:
        result["tracking"] = {
            "carrier": order.get("carrier", "N/A"),
            "tracking_number": order.get("tracking_number", "N/A"),
            "estimated_delivery": order.get("estimated_delivery", "N/A"),
        }
    return result


# ---------------------------------------------------------------------------
# Tool 3: Recommendation Engine
# ---------------------------------------------------------------------------
@tool
def recommendation_engine(
    customer_id: Optional[str] = None,
    seed_title: Optional[str] = None,
    preferred_genres: Optional[List[str]] = None,
    exclude_owned: bool = True,
    num_recommendations: int = 5,
    diversity_factor: float = 0.3,
) -> Dict[str, Any]:
    """
    Generate personalized manga recommendations.

    Uses collaborative filtering and content-based signals from
    OpenSearch kNN vectors to recommend manga titles.

    Args:
        customer_id: Customer ID for history-based recommendations.
        seed_title: A manga title to find similar titles for.
        preferred_genres: Genres to bias toward (max 5).
        exclude_owned: Exclude titles the customer already owns.
        num_recommendations: Number of recs to return (1-20, default 5).
        diversity_factor: 0.0 = very similar, 1.0 = very diverse (default 0.3).

    Returns:
        Dict with 'recommendations' list.
    """
    from mangaassist.services.recommendation import RecommendationService

    svc = RecommendationService()
    recs = svc.generate(
        customer_id=customer_id,
        seed_title=seed_title,
        genres=preferred_genres or [],
        exclude_owned=exclude_owned,
        count=num_recommendations,
        diversity=diversity_factor,
    )
    return {
        "recommendations": recs,
        "count": len(recs),
        "strategy": "hybrid" if customer_id and seed_title else (
            "collaborative" if customer_id else "content_based"
        ),
    }


# ---------------------------------------------------------------------------
# Tool 4: Inventory Check
# ---------------------------------------------------------------------------
@tool
def inventory_check(
    product_id: Optional[str] = None,
    isbn: Optional[str] = None,
    warehouse_region: str = "all",
) -> Dict[str, Any]:
    """
    Check real-time inventory for a manga product.

    Queries the inventory service for current stock levels,
    warehouse location, and restock estimates.

    Args:
        product_id: Product SKU in format MNG-XXXXXXXX.
        isbn: ISBN-13 of the manga volume.
        warehouse_region: Region to check — tokyo, osaka, fukuoka, sapporo, or all.

    Returns:
        Dict with stock info per warehouse.
    """
    if not product_id and not isbn:
        return {
            "error": "Either product_id or isbn is required.",
            "error_code": "MISSING_IDENTIFIER",
        }

    from mangaassist.services.inventory import InventoryService

    svc = InventoryService()
    stock = svc.check(
        product_id=product_id,
        isbn=isbn,
        region=warehouse_region,
    )
    return stock

2.3 Tool Chaining with Strands

Tool chaining allows the agent to compose multiple tool calls into a single workflow. For example, a customer asking "Do you have the latest volume of Attack on Titan in stock?" requires:

  1. product_search — find the latest volume
  2. inventory_check — check if it is in stock
"""
tool_chains.py — Predefined tool chain patterns for MangaAssist.

Tool chains define multi-step workflows that the agent can execute.
Each chain specifies the sequence of tools, how data flows between
them, and how to handle failures at each step.
"""

import asyncio
import logging
from dataclasses import dataclass, field
from typing import Any, Callable, Dict, List, Optional

logger = logging.getLogger("mangaassist.chains")


@dataclass
class ChainStep:
    """One step in a tool chain."""

    tool_name: str
    param_mapper: Callable[[Dict[str, Any]], Dict[str, Any]]
    required: bool = True  # If False, chain continues on failure
    timeout_ms: int = 2000
    fallback_value: Optional[Dict[str, Any]] = None


@dataclass
class ToolChain:
    """
    A sequence of tool calls with data flow between them.

    Each step receives the accumulated context from all prior steps
    and produces output that is merged into the context for subsequent steps.
    """

    name: str
    steps: List[ChainStep]
    description: str = ""
    max_total_ms: int = 5000

    async def execute(
        self, initial_context: Dict[str, Any], tool_registry: "ToolRegistry"
    ) -> Dict[str, Any]:
        """Execute the chain, flowing data through each step."""
        context = dict(initial_context)
        results = {}
        elapsed_ms = 0

        for i, step in enumerate(self.steps):
            remaining_ms = self.max_total_ms - elapsed_ms
            if remaining_ms <= 0:
                logger.warning(
                    f"Chain '{self.name}' exceeded total timeout at step {i}"
                )
                break

            step_timeout = min(step.timeout_ms, remaining_ms)

            try:
                params = step.param_mapper(context)
                start = asyncio.get_event_loop().time()

                result = await asyncio.wait_for(
                    tool_registry.invoke_async(step.tool_name, params),
                    timeout=step_timeout / 1000,
                )

                elapsed_ms += int(
                    (asyncio.get_event_loop().time() - start) * 1000
                )

                results[step.tool_name] = result
                context[f"step_{i}_result"] = result
                context.update(result if isinstance(result, dict) else {})

            except asyncio.TimeoutError:
                logger.error(
                    f"Chain '{self.name}' step {i} ({step.tool_name}) timed out"
                )
                if step.required:
                    return {
                        "error": f"Tool '{step.tool_name}' timed out",
                        "chain": self.name,
                        "failed_step": i,
                        "partial_results": results,
                    }
                elif step.fallback_value:
                    results[step.tool_name] = step.fallback_value
                    context[f"step_{i}_result"] = step.fallback_value

            except Exception as e:
                logger.error(
                    f"Chain '{self.name}' step {i} ({step.tool_name}) failed: {e}"
                )
                if step.required:
                    return {
                        "error": str(e),
                        "chain": self.name,
                        "failed_step": i,
                        "partial_results": results,
                    }
                elif step.fallback_value:
                    results[step.tool_name] = step.fallback_value

        return {"chain": self.name, "results": results, "success": True}


# ---------------------------------------------------------------------------
# Predefined Chains for MangaAssist
# ---------------------------------------------------------------------------

search_and_check_stock = ToolChain(
    name="search_and_check_stock",
    description=(
        "Search for a manga title then check its inventory. "
        "Used when customer asks if a specific title is available."
    ),
    steps=[
        ChainStep(
            tool_name="product_search",
            param_mapper=lambda ctx: {
                "query": ctx["query"],
                "max_results": 1,
                "in_stock_only": False,
            },
            timeout_ms=1500,
        ),
        ChainStep(
            tool_name="inventory_check",
            param_mapper=lambda ctx: {
                "product_id": ctx.get("results", [{}])[0].get("product_id", ""),
            },
            timeout_ms=1000,
        ),
    ],
    max_total_ms=3000,
)

search_and_recommend = ToolChain(
    name="search_and_recommend",
    description=(
        "Search for a title then generate similar recommendations. "
        "Used when customer asks 'I liked X, what else should I read?'"
    ),
    steps=[
        ChainStep(
            tool_name="product_search",
            param_mapper=lambda ctx: {
                "query": ctx["query"],
                "max_results": 1,
            },
            timeout_ms=1500,
        ),
        ChainStep(
            tool_name="recommendation_engine",
            param_mapper=lambda ctx: {
                "seed_title": ctx.get("results", [{}])[0].get("title", ctx["query"]),
                "num_recommendations": ctx.get("num_recs", 5),
            },
            timeout_ms=1500,
        ),
    ],
    max_total_ms=3500,
)

order_with_item_details = ToolChain(
    name="order_with_item_details",
    description=(
        "Look up an order then enrich with current product details. "
        "Used when customer asks about a past order and wants current pricing."
    ),
    steps=[
        ChainStep(
            tool_name="order_lookup",
            param_mapper=lambda ctx: {
                "order_id": ctx["order_id"],
                "include_items": True,
            },
            timeout_ms=1500,
        ),
        ChainStep(
            tool_name="product_search",
            param_mapper=lambda ctx: {
                "query": ctx.get("items", [{}])[0].get("title", ""),
                "max_results": 1,
            },
            required=False,
            fallback_value={"results": [], "note": "Product details unavailable"},
            timeout_ms=1000,
        ),
    ],
    max_total_ms=3000,
)

2.4 StrandsToolBinder — Binding Tools to Agents

"""
strands_binder.py — Binds tools to Strands agents with runtime control.

The StrandsToolBinder manages which tools are available to the agent
at any given point, supporting per-turn filtering, health-based
disabling, and dynamic tool loading.
"""

import logging
import time
from dataclasses import dataclass, field
from typing import Any, Callable, Dict, List, Optional, Set

from strands import Agent

logger = logging.getLogger("mangaassist.binder")


@dataclass
class ToolHealth:
    """Tracks the health of a registered tool."""

    name: str
    is_healthy: bool = True
    consecutive_failures: int = 0
    last_failure_time: float = 0.0
    failure_threshold: int = 5
    cooldown_seconds: float = 60.0

    def record_success(self) -> None:
        self.consecutive_failures = 0
        self.is_healthy = True

    def record_failure(self) -> None:
        self.consecutive_failures += 1
        self.last_failure_time = time.time()
        if self.consecutive_failures >= self.failure_threshold:
            self.is_healthy = False
            logger.warning(
                f"Tool '{self.name}' marked unhealthy after "
                f"{self.consecutive_failures} consecutive failures"
            )

    def check_recovery(self) -> bool:
        """Check if enough time has passed to try the tool again."""
        if self.is_healthy:
            return True
        if time.time() - self.last_failure_time > self.cooldown_seconds:
            logger.info(f"Tool '{self.name}' entering half-open recovery")
            return True
        return False


class StrandsToolBinder:
    """
    Manages tool availability for Strands agents.

    Responsibilities:
    - Binds tools to agent at initialization
    - Tracks tool health via circuit breaker pattern
    - Supports per-turn tool filtering (e.g., disable order_lookup for unauthenticated users)
    - Enables dynamic tool loading at runtime
    """

    def __init__(self) -> None:
        self._tools: Dict[str, Callable] = {}
        self._health: Dict[str, ToolHealth] = {}
        self._disabled: Set[str] = set()
        self._tool_metadata: Dict[str, Dict[str, Any]] = {}

    def register(
        self,
        tool_fn: Callable,
        name: Optional[str] = None,
        failure_threshold: int = 5,
        cooldown_seconds: float = 60.0,
        metadata: Optional[Dict[str, Any]] = None,
    ) -> None:
        """Register a tool function with health tracking."""
        tool_name = name or getattr(tool_fn, "__name__", str(tool_fn))
        self._tools[tool_name] = tool_fn
        self._health[tool_name] = ToolHealth(
            name=tool_name,
            failure_threshold=failure_threshold,
            cooldown_seconds=cooldown_seconds,
        )
        self._tool_metadata[tool_name] = metadata or {}
        logger.info(f"Registered tool: {tool_name}")

    def disable(self, tool_name: str) -> None:
        """Temporarily disable a tool (e.g., for unauthenticated sessions)."""
        self._disabled.add(tool_name)
        logger.info(f"Disabled tool: {tool_name}")

    def enable(self, tool_name: str) -> None:
        """Re-enable a previously disabled tool."""
        self._disabled.discard(tool_name)
        logger.info(f"Enabled tool: {tool_name}")

    def get_available_tools(self) -> List[Callable]:
        """Return list of tools that are healthy and not disabled."""
        available = []
        for name, fn in self._tools.items():
            if name in self._disabled:
                continue
            health = self._health[name]
            if health.is_healthy or health.check_recovery():
                available.append(fn)
            else:
                logger.debug(
                    f"Tool '{name}' unavailable — circuit open, "
                    f"cooldown remaining: "
                    f"{health.cooldown_seconds - (time.time() - health.last_failure_time):.1f}s"
                )
        return available

    def bind_to_agent(
        self,
        system_prompt: str,
        model_id: str = "us.anthropic.claude-3-5-sonnet-20241022-v2:0",
        **agent_kwargs: Any,
    ) -> Agent:
        """Create a Strands Agent with the currently available tools."""
        tools = self.get_available_tools()
        logger.info(
            f"Binding {len(tools)} tools to agent (model={model_id})"
        )
        return Agent(
            system_prompt=system_prompt,
            model=model_id,
            tools=tools,
            **agent_kwargs,
        )

    def record_result(self, tool_name: str, success: bool) -> None:
        """Record a tool invocation result for health tracking."""
        if tool_name in self._health:
            if success:
                self._health[tool_name].record_success()
            else:
                self._health[tool_name].record_failure()

    def get_health_report(self) -> Dict[str, Dict[str, Any]]:
        """Return health status for all registered tools."""
        report = {}
        for name, health in self._health.items():
            report[name] = {
                "is_healthy": health.is_healthy,
                "consecutive_failures": health.consecutive_failures,
                "is_disabled": name in self._disabled,
                "available": (
                    name not in self._disabled
                    and (health.is_healthy or health.check_recovery())
                ),
            }
        return report

3. Lambda-Based Error Handling and Parameter Validation

3.1 Architecture: Lambda as Validation and Error Handling Layer

In MangaAssist, a Lambda function sits between the FM's tool-use output and the actual tool execution. This "tool gateway" Lambda: 1. Validates parameters before forwarding to the tool implementation 2. Handles errors from tool execution and translates them for the FM 3. Enforces timeouts to keep within the 3-second latency budget 4. Logs metrics for observability

FM produces tool_use block
        |
        v
 Lambda: Tool Gateway
   +-- Parameter Validation (JSON Schema)
   +-- Encoding Validation (UTF-8 for Japanese)
   +-- Injection Detection
   +-- Rate Limiting Check
        |
        v
   Tool Execution (OpenSearch / DynamoDB / etc.)
        |
        v
   Error Handling
   +-- Retry if transient
   +-- Circuit break if persistent
   +-- Fallback if all retries fail
   +-- Format user-friendly error
        |
        v
 tool_result block returned to FM

3.2 Production Code: ParameterValidator

"""
parameter_validator.py — Validates tool call parameters before execution.

This module implements JSON Schema validation, custom constraint rules,
type coercion, and security checks for tool parameters.
"""

import json
import logging
import re
from typing import Any, Dict, List, Optional, Tuple

import jsonschema
from jsonschema import Draft7Validator, ValidationError

logger = logging.getLogger("mangaassist.validator")


class ParameterValidator:
    """
    Validates parameters for MangaAssist tool calls.

    Validation pipeline:
    1. JSON Schema structural validation
    2. Type coercion (FM sometimes sends "10" instead of 10)
    3. Custom constraint checks (business rules)
    4. Encoding validation (Japanese UTF-8)
    5. Injection detection (prompt injection in params)
    """

    # Known schemas for each tool
    SCHEMAS: Dict[str, Dict[str, Any]] = {}

    # Patterns that indicate prompt injection attempts
    INJECTION_PATTERNS = [
        r"ignore\s+(previous|above|all)\s+instructions",
        r"you\s+are\s+now\s+a",
        r"system\s*:\s*",
        r"<\s*/?system\s*>",
        r"ADMIN\s*MODE",
        r"\bsudo\b",
        r"override\s+safety",
    ]

    def __init__(self) -> None:
        self._compiled_injections = [
            re.compile(p, re.IGNORECASE) for p in self.INJECTION_PATTERNS
        ]

    @classmethod
    def register_schema(cls, tool_name: str, schema: Dict[str, Any]) -> None:
        """Register a JSON Schema for a tool's parameters."""
        # Pre-compile the validator for fast repeated use
        Draft7Validator.check_schema(schema)
        cls.SCHEMAS[tool_name] = schema
        logger.info(f"Registered schema for tool: {tool_name}")

    def validate(
        self, tool_name: str, params: Dict[str, Any]
    ) -> Tuple[bool, Dict[str, Any], List[str]]:
        """
        Validate and coerce parameters for a tool call.

        Returns:
            (is_valid, coerced_params, error_messages)
        """
        errors: List[str] = []

        # Step 1: Schema validation
        schema = self.SCHEMAS.get(tool_name)
        if schema:
            schema_errors = self._validate_schema(schema, params)
            errors.extend(schema_errors)

        # Step 2: Type coercion (attempt fixes for common FM mistakes)
        coerced = self._coerce_types(tool_name, params)

        # Re-validate after coercion if there were type errors
        if errors and schema:
            recheck_errors = self._validate_schema(schema, coerced)
            if len(recheck_errors) < len(errors):
                errors = recheck_errors

        # Step 3: Custom business constraints
        constraint_errors = self._check_constraints(tool_name, coerced)
        errors.extend(constraint_errors)

        # Step 4: Encoding validation
        encoding_errors = self._validate_encoding(coerced)
        errors.extend(encoding_errors)

        # Step 5: Injection detection
        injection_errors = self._detect_injection(coerced)
        errors.extend(injection_errors)

        return (len(errors) == 0, coerced, errors)

    def _validate_schema(
        self, schema: Dict[str, Any], params: Dict[str, Any]
    ) -> List[str]:
        """Run JSON Schema validation."""
        validator = Draft7Validator(schema)
        errors = []
        for error in sorted(validator.iter_errors(params), key=lambda e: list(e.path)):
            path = ".".join(str(p) for p in error.path) or "(root)"
            errors.append(f"Schema error at '{path}': {error.message}")
        return errors

    def _coerce_types(
        self, tool_name: str, params: Dict[str, Any]
    ) -> Dict[str, Any]:
        """
        Attempt to coerce parameters to expected types.

        FMs frequently make these mistakes:
        - Sending "10" (string) instead of 10 (integer)
        - Sending "true" (string) instead of true (boolean)
        - Sending 5.0 (float) instead of 5 (integer)
        """
        schema = self.SCHEMAS.get(tool_name, {})
        properties = schema.get("properties", {})
        coerced = dict(params)

        for key, value in coerced.items():
            if key not in properties:
                continue

            expected_type = properties[key].get("type")

            if expected_type == "integer" and isinstance(value, str):
                try:
                    coerced[key] = int(value)
                    logger.debug(f"Coerced '{key}': str -> int ({value} -> {coerced[key]})")
                except ValueError:
                    pass

            elif expected_type == "integer" and isinstance(value, float):
                coerced[key] = int(value)
                logger.debug(f"Coerced '{key}': float -> int ({value} -> {coerced[key]})")

            elif expected_type == "number" and isinstance(value, str):
                try:
                    coerced[key] = float(value)
                    logger.debug(f"Coerced '{key}': str -> float ({value} -> {coerced[key]})")
                except ValueError:
                    pass

            elif expected_type == "boolean" and isinstance(value, str):
                if value.lower() in ("true", "1", "yes"):
                    coerced[key] = True
                elif value.lower() in ("false", "0", "no"):
                    coerced[key] = False
                logger.debug(f"Coerced '{key}': str -> bool ({value} -> {coerced[key]})")

        return coerced

    def _check_constraints(
        self, tool_name: str, params: Dict[str, Any]
    ) -> List[str]:
        """Check business-level constraints beyond JSON Schema."""
        errors = []

        if tool_name == "order_lookup":
            if not params.get("order_id") and not params.get("customer_email"):
                errors.append(
                    "order_lookup requires either 'order_id' or 'customer_email'"
                )

        if tool_name == "inventory_check":
            if not params.get("product_id") and not params.get("isbn"):
                errors.append(
                    "inventory_check requires either 'product_id' or 'isbn'"
                )

        if tool_name == "recommendation_engine":
            genres = params.get("preferred_genres", [])
            if len(genres) > 5:
                errors.append(
                    f"preferred_genres has {len(genres)} items, maximum is 5"
                )

        if tool_name == "product_search":
            query = params.get("query", "")
            if len(query) > 200:
                errors.append(
                    f"query length {len(query)} exceeds maximum of 200 characters"
                )

        return errors

    def _validate_encoding(self, params: Dict[str, Any]) -> List[str]:
        """Validate that string parameters are valid UTF-8."""
        errors = []
        for key, value in params.items():
            if isinstance(value, str):
                try:
                    value.encode("utf-8").decode("utf-8")
                except UnicodeError:
                    errors.append(
                        f"Parameter '{key}' contains invalid UTF-8 characters"
                    )
        return errors

    def _detect_injection(self, params: Dict[str, Any]) -> List[str]:
        """Detect potential prompt injection in parameter values."""
        errors = []
        for key, value in params.items():
            if isinstance(value, str):
                for pattern in self._compiled_injections:
                    if pattern.search(value):
                        errors.append(
                            f"Potential injection detected in parameter '{key}'"
                        )
                        logger.warning(
                            f"Injection attempt in '{key}': {value[:100]}..."
                        )
                        break
        return errors

3.3 Production Code: ToolErrorHandler

"""
tool_error_handler.py — Classifies, handles, and formats tool errors.

Provides retry logic, circuit breaker, and user-friendly error messages
for the MangaAssist chatbot.
"""

import asyncio
import logging
import random
import time
from dataclasses import dataclass, field
from enum import Enum
from typing import Any, Callable, Dict, List, Optional

import boto3

logger = logging.getLogger("mangaassist.error_handler")


class ErrorSeverity(Enum):
    """Classification of error severity."""

    TRANSIENT = "transient"        # Network blip, throttle — retry
    VALIDATION = "validation"      # Bad params — do not retry, fix params
    RESOURCE = "resource"          # Not found — do not retry
    PERMISSION = "permission"      # Auth failure — do not retry
    CAPACITY = "capacity"          # Rate limit / quota — retry with backoff
    INTERNAL = "internal"          # Bug in tool code — do not retry
    TIMEOUT = "timeout"            # Tool took too long — maybe retry


class ErrorAction(Enum):
    """What to do about an error."""

    RETRY = "retry"
    FALLBACK = "fallback"
    FAIL_GRACEFULLY = "fail_gracefully"
    FAIL_HARD = "fail_hard"


@dataclass
class ClassifiedError:
    """An error that has been classified with severity and action."""

    original_error: Exception
    severity: ErrorSeverity
    action: ErrorAction
    user_message: str
    technical_detail: str
    tool_name: str
    retry_after_ms: int = 0


# User-friendly error messages (Japanese and English)
USER_MESSAGES = {
    "ORDER_NOT_FOUND": (
        "I couldn't find that order. Could you double-check the order ID? "
        "It should look like ORD-XXXXXXXXXX."
    ),
    "PRODUCT_NOT_FOUND": (
        "I wasn't able to find that manga title in our catalog. "
        "Could you try a different search term or check the spelling?"
    ),
    "SEARCH_TIMEOUT": (
        "The search is taking longer than usual. Let me try again with "
        "a simpler query."
    ),
    "SERVICE_UNAVAILABLE": (
        "I'm having trouble connecting to one of our services right now. "
        "Please try again in a moment."
    ),
    "RATE_LIMITED": (
        "We're experiencing high traffic right now. I'll retry your "
        "request in just a moment."
    ),
    "INVALID_PARAMETERS": (
        "I need a bit more information to help you. Could you provide "
        "more details about what you're looking for?"
    ),
    "INTERNAL_ERROR": (
        "Something went wrong on our end. I've logged the issue and "
        "our team will look into it. Can I help you with something else?"
    ),
}


class ToolErrorHandler:
    """
    Handles errors from tool invocations in MangaAssist.

    Responsibilities:
    1. Classify errors by severity
    2. Determine action (retry, fallback, fail)
    3. Execute retries with exponential backoff + jitter
    4. Generate user-friendly error messages
    5. Emit CloudWatch metrics for monitoring
    """

    def __init__(
        self,
        max_retries: int = 3,
        base_delay_ms: int = 200,
        max_delay_ms: int = 2000,
        cloudwatch_namespace: str = "MangaAssist/Tools",
    ) -> None:
        self.max_retries = max_retries
        self.base_delay_ms = base_delay_ms
        self.max_delay_ms = max_delay_ms
        self.cw_namespace = cloudwatch_namespace
        self._cw_client = boto3.client("cloudwatch")

    def classify_error(
        self, error: Exception, tool_name: str
    ) -> ClassifiedError:
        """Classify an error and determine the appropriate action."""
        error_str = str(error).lower()
        error_type = type(error).__name__

        # Timeout errors
        if isinstance(error, (asyncio.TimeoutError, TimeoutError)):
            return ClassifiedError(
                original_error=error,
                severity=ErrorSeverity.TIMEOUT,
                action=ErrorAction.RETRY,
                user_message=USER_MESSAGES["SEARCH_TIMEOUT"],
                technical_detail=f"Tool '{tool_name}' timed out",
                tool_name=tool_name,
                retry_after_ms=500,
            )

        # DynamoDB / service throttling
        if "throttl" in error_str or "rate" in error_str or "429" in error_str:
            return ClassifiedError(
                original_error=error,
                severity=ErrorSeverity.CAPACITY,
                action=ErrorAction.RETRY,
                user_message=USER_MESSAGES["RATE_LIMITED"],
                technical_detail=f"Rate limited on '{tool_name}': {error}",
                tool_name=tool_name,
                retry_after_ms=1000,
            )

        # Not found
        if "not found" in error_str or "404" in error_str:
            return ClassifiedError(
                original_error=error,
                severity=ErrorSeverity.RESOURCE,
                action=ErrorAction.FAIL_GRACEFULLY,
                user_message=USER_MESSAGES.get(
                    f"{tool_name.upper()}_NOT_FOUND",
                    USER_MESSAGES["PRODUCT_NOT_FOUND"],
                ),
                technical_detail=f"Resource not found in '{tool_name}': {error}",
                tool_name=tool_name,
            )

        # Validation errors
        if "validat" in error_str or isinstance(error, (ValueError, TypeError)):
            return ClassifiedError(
                original_error=error,
                severity=ErrorSeverity.VALIDATION,
                action=ErrorAction.FAIL_GRACEFULLY,
                user_message=USER_MESSAGES["INVALID_PARAMETERS"],
                technical_detail=f"Validation error in '{tool_name}': {error}",
                tool_name=tool_name,
            )

        # Connection / transient
        if any(
            kw in error_str
            for kw in ["connection", "network", "temporary", "503", "502"]
        ):
            return ClassifiedError(
                original_error=error,
                severity=ErrorSeverity.TRANSIENT,
                action=ErrorAction.RETRY,
                user_message=USER_MESSAGES["SERVICE_UNAVAILABLE"],
                technical_detail=f"Transient error in '{tool_name}': {error}",
                tool_name=tool_name,
                retry_after_ms=300,
            )

        # Auth errors
        if "auth" in error_str or "permission" in error_str or "403" in error_str:
            return ClassifiedError(
                original_error=error,
                severity=ErrorSeverity.PERMISSION,
                action=ErrorAction.FAIL_HARD,
                user_message=USER_MESSAGES["INTERNAL_ERROR"],
                technical_detail=f"Permission error in '{tool_name}': {error}",
                tool_name=tool_name,
            )

        # Default: internal error
        return ClassifiedError(
            original_error=error,
            severity=ErrorSeverity.INTERNAL,
            action=ErrorAction.FAIL_HARD,
            user_message=USER_MESSAGES["INTERNAL_ERROR"],
            technical_detail=f"Internal error in '{tool_name}': {error_type}: {error}",
            tool_name=tool_name,
        )

    async def execute_with_retry(
        self,
        tool_name: str,
        tool_fn: Callable,
        params: Dict[str, Any],
        fallback_fn: Optional[Callable] = None,
    ) -> Dict[str, Any]:
        """
        Execute a tool with retry logic and error handling.

        Retry strategy: exponential backoff with jitter
        delay = min(base_delay * 2^attempt + random_jitter, max_delay)
        """
        last_error: Optional[ClassifiedError] = None

        for attempt in range(self.max_retries + 1):
            try:
                result = await tool_fn(**params)
                self._emit_metric(tool_name, "Success", 1)
                return result

            except Exception as e:
                classified = self.classify_error(e, tool_name)
                last_error = classified
                self._emit_metric(
                    tool_name, f"Error_{classified.severity.value}", 1
                )

                logger.warning(
                    f"Tool '{tool_name}' attempt {attempt + 1}/{self.max_retries + 1} "
                    f"failed: {classified.technical_detail}"
                )

                # Should we retry?
                if classified.action != ErrorAction.RETRY:
                    break
                if attempt >= self.max_retries:
                    break

                # Calculate backoff delay
                delay_ms = min(
                    self.base_delay_ms * (2**attempt)
                    + random.randint(0, 100),
                    self.max_delay_ms,
                )
                delay_ms = max(delay_ms, classified.retry_after_ms)

                logger.info(f"Retrying '{tool_name}' in {delay_ms}ms...")
                await asyncio.sleep(delay_ms / 1000)

        # All retries exhausted — try fallback
        if fallback_fn and last_error and last_error.action in (
            ErrorAction.RETRY,
            ErrorAction.FALLBACK,
        ):
            logger.info(f"Using fallback for tool '{tool_name}'")
            try:
                result = await fallback_fn(**params)
                self._emit_metric(tool_name, "FallbackSuccess", 1)
                return result
            except Exception as fb_err:
                logger.error(f"Fallback for '{tool_name}' also failed: {fb_err}")
                self._emit_metric(tool_name, "FallbackFailure", 1)

        # Return user-friendly error
        self._emit_metric(tool_name, "FinalFailure", 1)
        return {
            "error": True,
            "error_code": last_error.severity.value if last_error else "unknown",
            "message": last_error.user_message if last_error else USER_MESSAGES["INTERNAL_ERROR"],
            "tool": tool_name,
        }

    def _emit_metric(self, tool_name: str, metric_name: str, value: float) -> None:
        """Emit a CloudWatch metric for tool observability."""
        try:
            self._cw_client.put_metric_data(
                Namespace=self.cw_namespace,
                MetricData=[
                    {
                        "MetricName": metric_name,
                        "Dimensions": [
                            {"Name": "ToolName", "Value": tool_name},
                        ],
                        "Value": value,
                        "Unit": "Count",
                    }
                ],
            )
        except Exception as e:
            logger.debug(f"Failed to emit metric: {e}")

3.4 Production Code: ToolRegistry

"""
tool_registry.py — Central registry for all MangaAssist tools.

The ToolRegistry is a singleton that manages tool registration,
discovery, invocation, validation, and error handling. It is the
single entry point for all tool operations in the system.
"""

import asyncio
import inspect
import logging
import time
from dataclasses import dataclass, field
from typing import Any, Callable, Dict, List, Optional, Set

from mangaassist.tools.parameter_validator import ParameterValidator
from mangaassist.tools.tool_error_handler import ToolErrorHandler

logger = logging.getLogger("mangaassist.registry")


@dataclass
class RegisteredTool:
    """Metadata for a registered tool."""

    name: str
    function: Callable
    schema: Dict[str, Any]
    description: str
    version: str = "1.0.0"
    tags: List[str] = field(default_factory=list)
    fallback_fn: Optional[Callable] = None
    timeout_ms: int = 2000
    is_async: bool = False
    requires_auth: bool = False


class ToolRegistry:
    """
    Singleton registry for all MangaAssist tools.

    Usage:
        registry = ToolRegistry()
        registry.register(product_search, schema={...})
        result = await registry.invoke("product_search", {"query": "Naruto"})
    """

    _instance: Optional["ToolRegistry"] = None

    def __new__(cls) -> "ToolRegistry":
        if cls._instance is None:
            cls._instance = super().__new__(cls)
            cls._instance._initialized = False
        return cls._instance

    def __init__(self) -> None:
        if self._initialized:
            return
        self._tools: Dict[str, RegisteredTool] = {}
        self._validator = ParameterValidator()
        self._error_handler = ToolErrorHandler()
        self._invocation_count: Dict[str, int] = {}
        self._total_latency_ms: Dict[str, float] = {}
        self._initialized = True
        logger.info("ToolRegistry initialized")

    def register(
        self,
        fn: Callable,
        schema: Dict[str, Any],
        name: Optional[str] = None,
        description: Optional[str] = None,
        version: str = "1.0.0",
        tags: Optional[List[str]] = None,
        fallback_fn: Optional[Callable] = None,
        timeout_ms: int = 2000,
        requires_auth: bool = False,
    ) -> None:
        """Register a tool with its schema and metadata."""
        tool_name = name or fn.__name__
        tool_desc = description or (fn.__doc__ or "").strip().split("\n")[0]
        is_async = inspect.iscoroutinefunction(fn)

        tool = RegisteredTool(
            name=tool_name,
            function=fn,
            schema=schema,
            description=tool_desc,
            version=version,
            tags=tags or [],
            fallback_fn=fallback_fn,
            timeout_ms=timeout_ms,
            is_async=is_async,
            requires_auth=requires_auth,
        )

        self._tools[tool_name] = tool
        self._invocation_count[tool_name] = 0
        self._total_latency_ms[tool_name] = 0.0

        # Register schema with the validator
        ParameterValidator.register_schema(tool_name, schema)

        logger.info(
            f"Registered tool '{tool_name}' v{version} "
            f"(async={is_async}, auth={requires_auth})"
        )

    def discover(
        self,
        tags: Optional[List[str]] = None,
        requires_auth: Optional[bool] = None,
    ) -> List[Dict[str, Any]]:
        """Discover registered tools, optionally filtered by tags or auth."""
        results = []
        for tool in self._tools.values():
            if tags and not any(t in tool.tags for t in tags):
                continue
            if requires_auth is not None and tool.requires_auth != requires_auth:
                continue
            results.append(
                {
                    "name": tool.name,
                    "description": tool.description,
                    "version": tool.version,
                    "tags": tool.tags,
                    "requires_auth": tool.requires_auth,
                    "schema": tool.schema,
                }
            )
        return results

    async def invoke(
        self, tool_name: str, params: Dict[str, Any]
    ) -> Dict[str, Any]:
        """
        Invoke a tool by name with parameter validation and error handling.

        Pipeline:
        1. Look up tool
        2. Validate parameters
        3. Execute with retry/error handling
        4. Record metrics
        5. Return result
        """
        # Step 1: Look up
        tool = self._tools.get(tool_name)
        if not tool:
            return {
                "error": True,
                "error_code": "TOOL_NOT_FOUND",
                "message": f"Tool '{tool_name}' is not registered.",
            }

        # Step 2: Validate
        is_valid, coerced_params, errors = self._validator.validate(
            tool_name, params
        )
        if not is_valid:
            logger.warning(
                f"Validation failed for '{tool_name}': {errors}"
            )
            return {
                "error": True,
                "error_code": "VALIDATION_FAILED",
                "message": f"Invalid parameters: {'; '.join(errors)}",
                "validation_errors": errors,
            }

        # Step 3: Execute with error handling
        start_ms = time.time() * 1000

        # Wrap sync functions as async
        if tool.is_async:
            tool_fn = tool.function
        else:
            async def tool_fn(**kwargs: Any) -> Any:
                return tool.function(**kwargs)

        result = await self._error_handler.execute_with_retry(
            tool_name=tool_name,
            tool_fn=tool_fn,
            params=coerced_params,
            fallback_fn=tool.fallback_fn,
        )

        # Step 4: Record metrics
        elapsed_ms = time.time() * 1000 - start_ms
        self._invocation_count[tool_name] += 1
        self._total_latency_ms[tool_name] += elapsed_ms

        logger.info(
            f"Tool '{tool_name}' completed in {elapsed_ms:.1f}ms "
            f"(success={not result.get('error', False)})"
        )

        return result

    async def invoke_async(
        self, tool_name: str, params: Dict[str, Any]
    ) -> Dict[str, Any]:
        """Alias for invoke — both are async."""
        return await self.invoke(tool_name, params)

    def get_metrics(self) -> Dict[str, Dict[str, Any]]:
        """Return invocation metrics for all tools."""
        metrics = {}
        for name in self._tools:
            count = self._invocation_count.get(name, 0)
            total_lat = self._total_latency_ms.get(name, 0.0)
            metrics[name] = {
                "invocation_count": count,
                "avg_latency_ms": (
                    round(total_lat / count, 2) if count > 0 else 0
                ),
                "total_latency_ms": round(total_lat, 2),
            }
        return metrics

    def get_bedrock_tool_config(self) -> Dict[str, Any]:
        """Generate Bedrock-compatible toolConfig from all registered tools."""
        tools = []
        for tool in self._tools.values():
            tools.append(
                {
                    "toolSpec": {
                        "name": tool.name,
                        "description": tool.description,
                        "inputSchema": {"json": tool.schema},
                    }
                }
            )
        return {"tools": tools}

4. Tool Retry Strategies and Fallback Patterns

4.1 Retry Strategy Matrix

Error Type Retry? Max Attempts Backoff Fallback
Timeout Yes 2 500ms, 1000ms Cached result or simplified query
Throttle (429) Yes 3 1s, 2s, 4s Queue for async processing
Connection Error Yes 2 300ms, 600ms Cached result
Not Found (404) No 0 N/A User-friendly "not found"
Validation Error No 0 N/A Ask FM to reformulate
Auth Error (403) No 0 N/A Escalate to human
Internal Error (500) Yes (once) 1 1000ms Static fallback

4.2 Exponential Backoff with Jitter

Delay = min(base_delay_ms * 2^attempt + random(0, 100), max_delay_ms)

Example (base=200ms, max=2000ms):
  Attempt 0: 200 + rand(0,100) = ~230ms
  Attempt 1: 400 + rand(0,100) = ~460ms
  Attempt 2: 800 + rand(0,100) = ~850ms
  Attempt 3: 1600 + rand(0,100) = ~1650ms  (capped at 2000ms)

The jitter prevents the "thundering herd" problem where all retries from concurrent requests land at the same time.

4.3 Circuit Breaker Pattern

"""
circuit_breaker.py — Circuit breaker for tool invocations.

States:
  CLOSED   — Normal operation, requests flow through
  OPEN     — Too many failures, requests are rejected immediately
  HALF_OPEN — After cooldown, one probe request allowed through
"""

import logging
import time
from dataclasses import dataclass
from enum import Enum
from typing import Any, Dict, Optional

logger = logging.getLogger("mangaassist.circuit_breaker")


class CircuitState(Enum):
    CLOSED = "closed"
    OPEN = "open"
    HALF_OPEN = "half_open"


@dataclass
class CircuitBreaker:
    """
    Circuit breaker for a single tool.

    Configuration:
    - failure_threshold: Number of failures to trip the breaker (default 5)
    - recovery_timeout_s: Seconds before trying again (default 60)
    - success_threshold: Successes in HALF_OPEN to close (default 2)
    """

    tool_name: str
    failure_threshold: int = 5
    recovery_timeout_s: float = 60.0
    success_threshold: int = 2

    state: CircuitState = CircuitState.CLOSED
    failure_count: int = 0
    success_count: int = 0
    last_failure_time: float = 0.0
    last_state_change: float = 0.0

    def can_execute(self) -> bool:
        """Check if a request should be allowed through."""
        if self.state == CircuitState.CLOSED:
            return True

        if self.state == CircuitState.OPEN:
            if time.time() - self.last_failure_time >= self.recovery_timeout_s:
                self._transition(CircuitState.HALF_OPEN)
                return True
            return False

        if self.state == CircuitState.HALF_OPEN:
            return True  # Allow probe requests

        return False

    def record_success(self) -> None:
        """Record a successful invocation."""
        if self.state == CircuitState.HALF_OPEN:
            self.success_count += 1
            if self.success_count >= self.success_threshold:
                self._transition(CircuitState.CLOSED)
                self.failure_count = 0
                self.success_count = 0
        elif self.state == CircuitState.CLOSED:
            self.failure_count = 0

    def record_failure(self) -> None:
        """Record a failed invocation."""
        self.failure_count += 1
        self.last_failure_time = time.time()

        if self.state == CircuitState.HALF_OPEN:
            self._transition(CircuitState.OPEN)
            self.success_count = 0
        elif self.state == CircuitState.CLOSED:
            if self.failure_count >= self.failure_threshold:
                self._transition(CircuitState.OPEN)

    def _transition(self, new_state: CircuitState) -> None:
        """Transition to a new state."""
        old_state = self.state
        self.state = new_state
        self.last_state_change = time.time()
        logger.info(
            f"Circuit breaker '{self.tool_name}': "
            f"{old_state.value} -> {new_state.value}"
        )

    def get_status(self) -> Dict[str, Any]:
        """Return current circuit breaker status."""
        return {
            "tool": self.tool_name,
            "state": self.state.value,
            "failure_count": self.failure_count,
            "time_in_state_s": round(
                time.time() - self.last_state_change, 1
            )
            if self.last_state_change
            else 0,
        }

4.4 Fallback Hierarchy

When a tool fails beyond recovery, the fallback hierarchy is:

Level 1: Retry with exponential backoff (transient errors only)
    |
    v  [all retries exhausted]
Level 2: Invoke fallback tool (e.g., cached OpenSearch -> DynamoDB scan)
    |
    v  [fallback also fails]
Level 3: Return cached response from ElastiCache Redis
    |
    v  [no cache hit]
Level 4: Return static default response
    |
    v  [static response configured]
Level 5: Return user-friendly error with suggested next action
class FallbackChain:
    """Implements the multi-level fallback hierarchy."""

    def __init__(self, tool_name: str) -> None:
        self.tool_name = tool_name
        self._redis_client = None  # Lazy init
        self._static_defaults: Dict[str, Any] = {
            "product_search": {
                "results": [],
                "total_count": 0,
                "message": "Search is temporarily unavailable. Please try again shortly.",
            },
            "order_lookup": {
                "message": "Order lookup is temporarily unavailable. "
                "Please check your email for order confirmation details.",
            },
            "recommendation_engine": {
                "recommendations": [],
                "message": "Recommendations are being updated. "
                "Please browse our popular titles section.",
            },
            "inventory_check": {
                "message": "Inventory status is being updated. "
                "Please check the product page for current availability.",
            },
        }

    async def get_cached_response(
        self, cache_key: str
    ) -> Optional[Dict[str, Any]]:
        """Try to fetch a cached response from Redis."""
        try:
            import redis.asyncio as redis

            if not self._redis_client:
                self._redis_client = redis.Redis(
                    host="mangaassist-cache.xxxxx.ng.0001.apne1.cache.amazonaws.com",
                    port=6379,
                    decode_responses=True,
                )
            cached = await self._redis_client.get(f"tool_cache:{cache_key}")
            if cached:
                import json
                return json.loads(cached)
        except Exception as e:
            logger.debug(f"Cache lookup failed: {e}")
        return None

    def get_static_default(self) -> Dict[str, Any]:
        """Return a static default response for the tool."""
        return self._static_defaults.get(
            self.tool_name,
            {
                "message": "This feature is temporarily unavailable. "
                "Please try again later.",
            },
        )

5. Lambda Tool Gateway — Complete Implementation

5.1 Lambda Handler

"""
lambda_tool_gateway.py — AWS Lambda function that serves as the
validation and error-handling gateway for all MangaAssist tool calls.

This Lambda is invoked by the ECS Fargate orchestrator whenever
the FM produces a tool_use content block. It:
1. Validates the tool name and parameters
2. Invokes the tool with retry and circuit breaker
3. Returns a formatted tool_result block

Environment variables:
    OPENSEARCH_ENDPOINT: OpenSearch Serverless collection endpoint
    DYNAMODB_TABLE: Orders table name
    REDIS_ENDPOINT: ElastiCache Redis endpoint
    TOOL_TIMEOUT_MS: Max tool execution time (default 2000)
"""

import asyncio
import json
import logging
import os
import time
from typing import Any, Dict

logger = logging.getLogger()
logger.setLevel(logging.INFO)

# Import our modules (packaged as Lambda layer)
from mangaassist.tools.tool_registry import ToolRegistry
from mangaassist.tools.parameter_validator import ParameterValidator
from mangaassist.tools.tool_error_handler import ToolErrorHandler

# Initialize outside handler for connection reuse across invocations
registry = ToolRegistry()
TOOL_TIMEOUT_MS = int(os.environ.get("TOOL_TIMEOUT_MS", "2000"))


def lambda_handler(event: Dict[str, Any], context: Any) -> Dict[str, Any]:
    """
    Handle a tool invocation request.

    Expected event format:
    {
        "tool_use_id": "toolu_abc123",
        "tool_name": "product_search",
        "parameters": {
            "query": "Attack on Titan",
            "genre": "shonen"
        },
        "session_id": "sess_xyz789",
        "customer_id": "cust_456"  // optional
    }
    """
    start_time = time.time()
    tool_use_id = event.get("tool_use_id", "unknown")
    tool_name = event.get("tool_name", "")
    params = event.get("parameters", {})

    logger.info(
        f"Tool gateway invoked: tool={tool_name}, "
        f"tool_use_id={tool_use_id}, "
        f"remaining_ms={context.get_remaining_time_in_millis()}"
    )

    # Run the async invocation in the event loop
    loop = asyncio.new_event_loop()
    try:
        result = loop.run_until_complete(
            _process_tool_call(tool_name, params, tool_use_id)
        )
    finally:
        loop.close()

    elapsed_ms = (time.time() - start_time) * 1000
    logger.info(
        f"Tool gateway completed: tool={tool_name}, "
        f"elapsed_ms={elapsed_ms:.1f}, "
        f"success={not result.get('error', False)}"
    )

    # Return in Bedrock tool_result format
    return {
        "tool_use_id": tool_use_id,
        "content": json.dumps(result, ensure_ascii=False, default=str),
        "status": "error" if result.get("error") else "success",
    }


async def _process_tool_call(
    tool_name: str, params: Dict[str, Any], tool_use_id: str
) -> Dict[str, Any]:
    """Process a single tool call through the validation and execution pipeline."""

    # Invoke through the registry (which handles validation + retry + fallback)
    try:
        result = await asyncio.wait_for(
            registry.invoke(tool_name, params),
            timeout=TOOL_TIMEOUT_MS / 1000,
        )
        return result
    except asyncio.TimeoutError:
        logger.error(f"Tool '{tool_name}' exceeded gateway timeout of {TOOL_TIMEOUT_MS}ms")
        return {
            "error": True,
            "error_code": "GATEWAY_TIMEOUT",
            "message": (
                "I'm sorry, that request took too long. "
                "Let me try a simpler approach."
            ),
            "tool": tool_name,
        }

6. Putting It All Together — Agent Initialization

"""
agent_init.py — Initialize the MangaAssist agent with all tools bound.

This is the entry point called by the ECS Fargate orchestrator
when a new customer session begins.
"""

from mangaassist.tools.strands_binder import StrandsToolBinder
from mangaassist.tools.strands_tools import (
    inventory_check,
    order_lookup,
    product_search,
    recommendation_engine,
)

SYSTEM_PROMPT = """\
You are MangaAssist, a helpful assistant for a Japanese manga store.
You help customers search for manga, check orders, get recommendations,
and check inventory. Always be polite and helpful.
When searching, prefer using the customer's exact query terms.
When a tool returns an error, explain the issue clearly and suggest alternatives.
Respond in the same language the customer uses.
"""


def create_agent(
    session_context: dict,
) -> "Agent":
    """Create a fully-configured MangaAssist agent for a customer session."""
    binder = StrandsToolBinder()

    # Register all tools with health tracking
    binder.register(product_search, failure_threshold=5, cooldown_seconds=60)
    binder.register(order_lookup, failure_threshold=3, cooldown_seconds=30)
    binder.register(recommendation_engine, failure_threshold=5, cooldown_seconds=60)
    binder.register(inventory_check, failure_threshold=5, cooldown_seconds=60)

    # Disable auth-required tools for unauthenticated sessions
    if not session_context.get("is_authenticated"):
        binder.disable("order_lookup")

    # Use Haiku for simple queries, Sonnet for complex ones
    model_id = (
        "us.anthropic.claude-3-5-sonnet-20241022-v2:0"
        if session_context.get("complexity", "simple") == "complex"
        else "us.anthropic.claude-3-5-haiku-20241022-v1:0"
    )

    agent = binder.bind_to_agent(
        system_prompt=SYSTEM_PROMPT,
        model_id=model_id,
    )

    return agent

7. Key Takeaways

Principle MangaAssist Implementation
Define tools precisely JSON Schema with types, constraints, enums, and descriptions — the FM only knows what you tell it
Validate before executing ParameterValidator catches bad params before they hit DynamoDB/OpenSearch
Retry intelligently Exponential backoff + jitter for transient errors only; never retry validation failures
Fail gracefully User-friendly messages in both Japanese and English; never expose stack traces
Circuit break 5 failures in 60s trips the breaker; prevents cascading failures across 1M daily messages
Chain with timeouts Each chain step has its own timeout; total chain respects the 3-second budget
Observe everything CloudWatch metrics per tool, per error type, per latency bucket
Bind dynamically StrandsToolBinder enables/disables tools per session (auth state, feature flags)

8. Cost Impact Analysis

Decision Cost Effect
Precise tool descriptions Reduces incorrect tool selection, saving ~15% of wasted Sonnet invocations ($0.45/1K calls saved)
Parameter coercion Prevents round-trips where FM retries with corrected params, saving ~$0.03 per coerced call
Haiku for simple queries 12x cheaper than Sonnet ($0.25 vs $3.00 per 1M input tokens) for routine lookups
Circuit breaker Prevents thundering herd retries; saves Lambda compute cost during downstream outages
Redis cached fallbacks Avoids repeated tool retries at $0.02 per Lambda invocation + downstream API cost
Tool chaining with timeout budget Prevents runaway multi-tool workflows from consuming excessive FM tokens

At 1M messages/day: - Without optimization: ~$4,500/day (all Sonnet, frequent retries, no caching) - With optimization: ~$1,800/day (Haiku routing, validation, caching, circuit breaking) - Savings: ~$2,700/day (~60% reduction)


References