Standardized Function Definitions and Error Handling Patterns
MangaAssist context: JP Manga store chatbot on AWS — Bedrock Claude 3 (Sonnet at $3/$15 per 1M tokens input/output, Haiku at $0.25/$1.25), OpenSearch Serverless (vector store), DynamoDB (sessions/products), ECS Fargate (orchestrator), API Gateway WebSocket, ElastiCache Redis. Target: useful answer in under 3 seconds, 1M messages/day scale.
Skill Mapping
| Field | Value |
|---|---|
| Certification | AWS AI Practitioner (AIP-C01) |
| Domain | 2 — Implementation and Integration of FM-Powered Applications |
| Task | 2.1 — Select and implement appropriate FM integration strategies |
| Skill | 2.1.6 — Implement intelligent tool integrations to extend FM capabilities and ensure reliable tool operations |
| Focus Areas | Strands API for custom behaviors, standardized function definitions, Lambda for error handling and parameter validation |
Mindmap: Function Definitions and Error Handling
mindmap
root((2.1.6 Function Defs<br/>& Error Handling))
Standardized Function Definitions
JSON Schema Contracts
Parameter types and constraints
Required vs optional fields
Enum values for controlled vocab
Nested object schemas
OpenAPI Spec Integration
Tool as REST endpoint descriptor
Request/response schema binding
Version negotiation headers
Bedrock Tool Use Format
toolSpec name/description/inputSchema
Auto-marshalling from Python types
Multi-tool convoLoop pattern
Strands API Custom Behaviors
@tool Decorator Registration
Type-hint driven schema gen
Docstring to description mapping
Async and sync tool variants
Tool Chaining Orchestration
Sequential pipeline A then B then C
Parallel fan-out with asyncio.gather
Conditional branching on tool output
Context propagation across chain
StrandsToolBinder
Binds tools to agent at init
Runtime tool enable/disable
Per-turn tool filtering
Error Handling Patterns
Lambda Validation Layer
Pre-invocation param checking
Type coercion string to int
Japanese encoding UTF-8 guard
Injection detection in params
Retry Strategies
Exponential backoff with jitter
Max retry ceiling 3 attempts
Idempotency key for safe retries
Circuit Breaker
Failure threshold 5 in 60s window
Half-open probe after cooldown
Fallback tool on open circuit
Dead Letter Queue
SQS for unrecoverable failures
CloudWatch alarm on DLQ depth
Manual replay capability
Production Patterns
ToolRegistry singleton
Register/discover/invoke
Health check per tool
Version management
ParameterValidator
JSON Schema validate
Custom constraint rules
Sanitization pipeline
ToolErrorHandler
Classify error severity
User-friendly message map
Metrics emission per error type
Graceful Degradation
Partial result assembly
Cached fallback responses
Static default when all tools fail
1. Standardized Function Definitions
1.1 Why Standardized Definitions Matter
When a foundation model decides to invoke a tool, it relies entirely on the tool definition to understand what the tool does, what parameters it expects, and what it returns. A poorly defined tool causes the FM to: - Hallucinate parameter values instead of asking the user - Pass wrong types (string instead of integer) breaking downstream services - Miss required fields causing silent failures - Misunderstand tool purpose leading to wrong tool selection
In MangaAssist, where we handle 1M messages/day with a 3-second latency budget, every malformed tool call wastes a round-trip that we cannot afford.
1.2 JSON Schema as the Contract Language
AWS Bedrock's tool-use API expects tool definitions in JSON Schema format. This is the universal contract between the FM and your tool implementations.
Anatomy of a Bedrock Tool Definition:
{
"toolSpec": {
"name": "product_search",
"description": "Search the MangaAssist product catalog by title, author, genre, or ISBN. Returns matching manga products with pricing and availability. Use when the customer asks about finding, browsing, or looking for manga titles.",
"inputSchema": {
"json": {
"type": "object",
"properties": {
"query": {
"type": "string",
"description": "The search query — can be a manga title, author name, or keyword. Supports Japanese characters (e.g., '進撃の巨人').",
"minLength": 1,
"maxLength": 200
},
"genre": {
"type": "string",
"description": "Filter results by manga genre.",
"enum": ["shonen", "shojo", "seinen", "josei", "kodomomuke", "isekai", "mecha", "slice_of_life", "horror", "sports"]
},
"max_results": {
"type": "integer",
"description": "Maximum number of results to return. Defaults to 10 if not specified.",
"minimum": 1,
"maximum": 50,
"default": 10
},
"in_stock_only": {
"type": "boolean",
"description": "If true, only return products currently in stock. Defaults to false.",
"default": false
},
"price_range": {
"type": "object",
"description": "Optional price filter in JPY.",
"properties": {
"min_price": {
"type": "number",
"minimum": 0,
"description": "Minimum price in JPY."
},
"max_price": {
"type": "number",
"minimum": 0,
"description": "Maximum price in JPY."
}
}
},
"sort_by": {
"type": "string",
"description": "How to sort results.",
"enum": ["relevance", "price_asc", "price_desc", "release_date", "popularity"],
"default": "relevance"
}
},
"required": ["query"]
}
}
}
}
1.3 MangaAssist Tool Definition Catalog
Tool: order_lookup
{
"toolSpec": {
"name": "order_lookup",
"description": "Look up an existing order by order ID or by the customer's email address. Returns order status, items, shipping details, and tracking information. Use when the customer asks about order status, delivery tracking, or order history.",
"inputSchema": {
"json": {
"type": "object",
"properties": {
"order_id": {
"type": "string",
"description": "The order ID in format ORD-XXXXXXXXXX (e.g., ORD-2024031542).",
"pattern": "^ORD-[0-9]{10}$"
},
"customer_email": {
"type": "string",
"description": "Customer email address to look up all orders. Must be a valid email format.",
"format": "email"
},
"include_items": {
"type": "boolean",
"description": "Whether to include the full list of ordered items. Defaults to true.",
"default": true
},
"include_tracking": {
"type": "boolean",
"description": "Whether to include shipping tracking details. Defaults to true.",
"default": true
}
},
"required": [],
"oneOf": [
{ "required": ["order_id"] },
{ "required": ["customer_email"] }
]
}
}
}
}
Tool: recommendation_engine
{
"toolSpec": {
"name": "recommendation_engine",
"description": "Generate personalized manga recommendations based on customer purchase history, browsing behavior, or a specific manga title they enjoyed. Use when the customer asks for suggestions, 'what should I read next', or similar discovery queries.",
"inputSchema": {
"json": {
"type": "object",
"properties": {
"customer_id": {
"type": "string",
"description": "The customer ID for personalized recommendations based on history."
},
"seed_title": {
"type": "string",
"description": "A manga title the customer liked — recommendations will be similar to this title."
},
"preferred_genres": {
"type": "array",
"description": "List of preferred genres to bias recommendations toward.",
"items": {
"type": "string",
"enum": ["shonen", "shojo", "seinen", "josei", "kodomomuke", "isekai", "mecha", "slice_of_life", "horror", "sports"]
},
"maxItems": 5
},
"exclude_owned": {
"type": "boolean",
"description": "If true and customer_id is provided, exclude titles the customer already owns.",
"default": true
},
"num_recommendations": {
"type": "integer",
"description": "Number of recommendations to return.",
"minimum": 1,
"maximum": 20,
"default": 5
},
"diversity_factor": {
"type": "number",
"description": "Value between 0.0 (very similar) and 1.0 (very diverse) controlling recommendation variety.",
"minimum": 0.0,
"maximum": 1.0,
"default": 0.3
}
},
"required": []
}
}
}
}
Tool: inventory_check
{
"toolSpec": {
"name": "inventory_check",
"description": "Check real-time inventory and availability for a specific manga product. Returns stock count, warehouse location, and estimated restock date if out of stock. Use when the customer asks if something is available or in stock.",
"inputSchema": {
"json": {
"type": "object",
"properties": {
"product_id": {
"type": "string",
"description": "The product ID (SKU) to check inventory for.",
"pattern": "^MNG-[A-Z0-9]{8}$"
},
"isbn": {
"type": "string",
"description": "ISBN-13 of the manga volume.",
"pattern": "^978[0-9]{10}$"
},
"warehouse_region": {
"type": "string",
"description": "Check inventory at a specific warehouse region.",
"enum": ["tokyo", "osaka", "fukuoka", "sapporo", "all"],
"default": "all"
}
},
"required": [],
"anyOf": [
{ "required": ["product_id"] },
{ "required": ["isbn"] }
]
}
}
}
}
1.4 OpenAPI Spec Integration
For tools backed by REST APIs (common in microservice architectures), you can generate Bedrock tool definitions from OpenAPI specs:
import yaml
import json
from typing import Dict, Any
def openapi_to_bedrock_tool(openapi_path: str, operation_id: str) -> Dict[str, Any]:
"""
Convert an OpenAPI operation to a Bedrock tool definition.
This allows teams to maintain a single source of truth (the OpenAPI spec)
and auto-generate tool definitions for the FM.
"""
with open(openapi_path, "r") as f:
spec = yaml.safe_load(f)
# Find the operation by operationId
for path, methods in spec.get("paths", {}).items():
for method, operation in methods.items():
if operation.get("operationId") == operation_id:
# Extract request body schema or query parameters
input_schema = _extract_input_schema(operation, spec)
return {
"toolSpec": {
"name": operation_id,
"description": operation.get("summary", "")
+ " "
+ operation.get("description", ""),
"inputSchema": {"json": input_schema},
}
}
raise ValueError(f"Operation '{operation_id}' not found in OpenAPI spec")
def _extract_input_schema(operation: Dict, spec: Dict) -> Dict[str, Any]:
"""Build a JSON Schema from OpenAPI parameters and request body."""
properties = {}
required = []
# Query/path parameters
for param in operation.get("parameters", []):
param_schema = _resolve_ref(param.get("schema", {}), spec)
properties[param["name"]] = {
"type": param_schema.get("type", "string"),
"description": param.get("description", ""),
}
if param.get("required", False):
required.append(param["name"])
# Request body
request_body = operation.get("requestBody", {})
if request_body:
content = request_body.get("content", {})
json_schema = content.get("application/json", {}).get("schema", {})
resolved = _resolve_ref(json_schema, spec)
if resolved.get("properties"):
properties.update(resolved["properties"])
required.extend(resolved.get("required", []))
return {
"type": "object",
"properties": properties,
"required": required,
}
def _resolve_ref(schema: Dict, spec: Dict) -> Dict:
"""Resolve $ref pointers in OpenAPI schema."""
if "$ref" in schema:
ref_path = schema["$ref"].lstrip("#/").split("/")
resolved = spec
for part in ref_path:
resolved = resolved[part]
return resolved
return schema
2. Strands API Custom Behaviors and Tool Chaining
2.1 Strands Agent SDK Overview
The Strands Agents SDK provides a Python-native way to define tools, bind them to agents, and orchestrate tool chaining. It is the recommended approach for building agentic workflows on AWS Bedrock.
2.2 Tool Registration with @tool Decorator
"""
strands_tools.py — MangaAssist tool definitions using Strands Agents SDK.
Each tool is registered via the @tool decorator which:
1. Auto-generates JSON Schema from type hints
2. Extracts description from the docstring
3. Registers the tool in the global tool registry
4. Wraps execution with error handling hooks
"""
import asyncio
import json
import logging
from datetime import datetime
from typing import Any, Dict, List, Optional
import boto3
from strands import Agent, tool
from strands.types.tools import ToolResult, ToolUse
logger = logging.getLogger("mangaassist.tools")
# ---------------------------------------------------------------------------
# Tool 1: Product Search
# ---------------------------------------------------------------------------
@tool
def product_search(
query: str,
genre: Optional[str] = None,
max_results: int = 10,
in_stock_only: bool = False,
sort_by: str = "relevance",
) -> Dict[str, Any]:
"""
Search the MangaAssist product catalog for manga titles.
Searches across title, author, and description fields using
OpenSearch Serverless vector and keyword hybrid search.
Supports Japanese and English queries.
Args:
query: Search query — title, author, keyword, or ISBN.
genre: Filter by genre (shonen, shojo, seinen, josei, etc.).
max_results: Max results to return (1-50, default 10).
in_stock_only: Only return in-stock items.
sort_by: Sort order — relevance, price_asc, price_desc, release_date, popularity.
Returns:
Dict with 'results' list and 'total_count' integer.
"""
from mangaassist.services.opensearch import OpenSearchClient
client = OpenSearchClient()
filters = {}
if genre:
filters["genre"] = genre
if in_stock_only:
filters["in_stock"] = True
results = client.hybrid_search(
query=query,
filters=filters,
max_results=max_results,
sort_by=sort_by,
)
return {
"results": results,
"total_count": len(results),
"query": query,
"filters_applied": filters,
}
# ---------------------------------------------------------------------------
# Tool 2: Order Lookup
# ---------------------------------------------------------------------------
@tool
def order_lookup(
order_id: Optional[str] = None,
customer_email: Optional[str] = None,
include_items: bool = True,
include_tracking: bool = True,
) -> Dict[str, Any]:
"""
Look up an existing manga order by order ID or customer email.
Returns order status, items purchased, shipping info, and
tracking details. Requires either order_id or customer_email.
Args:
order_id: Order ID in format ORD-XXXXXXXXXX.
customer_email: Customer email to look up all their orders.
include_items: Include ordered item details (default True).
include_tracking: Include shipping tracking info (default True).
Returns:
Dict with order details or list of orders.
"""
if not order_id and not customer_email:
return {
"error": "Either order_id or customer_email is required.",
"error_code": "MISSING_IDENTIFIER",
}
dynamodb = boto3.resource("dynamodb")
table = dynamodb.Table("MangaAssist-Orders")
if order_id:
response = table.get_item(Key={"order_id": order_id})
order = response.get("Item")
if not order:
return {
"error": f"Order {order_id} not found.",
"error_code": "ORDER_NOT_FOUND",
}
return _format_order(order, include_items, include_tracking)
else:
response = table.query(
IndexName="email-index",
KeyConditionExpression="customer_email = :email",
ExpressionAttributeValues={":email": customer_email},
Limit=20,
)
orders = response.get("Items", [])
return {
"orders": [
_format_order(o, include_items, include_tracking) for o in orders
],
"total_orders": len(orders),
}
def _format_order(
order: Dict, include_items: bool, include_tracking: bool
) -> Dict[str, Any]:
"""Format a raw DynamoDB order record for FM consumption."""
result = {
"order_id": order["order_id"],
"status": order["status"],
"order_date": order["order_date"],
"total_amount": f"¥{order['total_jpy']:,}",
}
if include_items:
result["items"] = order.get("items", [])
if include_tracking:
result["tracking"] = {
"carrier": order.get("carrier", "N/A"),
"tracking_number": order.get("tracking_number", "N/A"),
"estimated_delivery": order.get("estimated_delivery", "N/A"),
}
return result
# ---------------------------------------------------------------------------
# Tool 3: Recommendation Engine
# ---------------------------------------------------------------------------
@tool
def recommendation_engine(
customer_id: Optional[str] = None,
seed_title: Optional[str] = None,
preferred_genres: Optional[List[str]] = None,
exclude_owned: bool = True,
num_recommendations: int = 5,
diversity_factor: float = 0.3,
) -> Dict[str, Any]:
"""
Generate personalized manga recommendations.
Uses collaborative filtering and content-based signals from
OpenSearch kNN vectors to recommend manga titles.
Args:
customer_id: Customer ID for history-based recommendations.
seed_title: A manga title to find similar titles for.
preferred_genres: Genres to bias toward (max 5).
exclude_owned: Exclude titles the customer already owns.
num_recommendations: Number of recs to return (1-20, default 5).
diversity_factor: 0.0 = very similar, 1.0 = very diverse (default 0.3).
Returns:
Dict with 'recommendations' list.
"""
from mangaassist.services.recommendation import RecommendationService
svc = RecommendationService()
recs = svc.generate(
customer_id=customer_id,
seed_title=seed_title,
genres=preferred_genres or [],
exclude_owned=exclude_owned,
count=num_recommendations,
diversity=diversity_factor,
)
return {
"recommendations": recs,
"count": len(recs),
"strategy": "hybrid" if customer_id and seed_title else (
"collaborative" if customer_id else "content_based"
),
}
# ---------------------------------------------------------------------------
# Tool 4: Inventory Check
# ---------------------------------------------------------------------------
@tool
def inventory_check(
product_id: Optional[str] = None,
isbn: Optional[str] = None,
warehouse_region: str = "all",
) -> Dict[str, Any]:
"""
Check real-time inventory for a manga product.
Queries the inventory service for current stock levels,
warehouse location, and restock estimates.
Args:
product_id: Product SKU in format MNG-XXXXXXXX.
isbn: ISBN-13 of the manga volume.
warehouse_region: Region to check — tokyo, osaka, fukuoka, sapporo, or all.
Returns:
Dict with stock info per warehouse.
"""
if not product_id and not isbn:
return {
"error": "Either product_id or isbn is required.",
"error_code": "MISSING_IDENTIFIER",
}
from mangaassist.services.inventory import InventoryService
svc = InventoryService()
stock = svc.check(
product_id=product_id,
isbn=isbn,
region=warehouse_region,
)
return stock
2.3 Tool Chaining with Strands
Tool chaining allows the agent to compose multiple tool calls into a single workflow. For example, a customer asking "Do you have the latest volume of Attack on Titan in stock?" requires:
product_search— find the latest volumeinventory_check— check if it is in stock
"""
tool_chains.py — Predefined tool chain patterns for MangaAssist.
Tool chains define multi-step workflows that the agent can execute.
Each chain specifies the sequence of tools, how data flows between
them, and how to handle failures at each step.
"""
import asyncio
import logging
from dataclasses import dataclass, field
from typing import Any, Callable, Dict, List, Optional
logger = logging.getLogger("mangaassist.chains")
@dataclass
class ChainStep:
"""One step in a tool chain."""
tool_name: str
param_mapper: Callable[[Dict[str, Any]], Dict[str, Any]]
required: bool = True # If False, chain continues on failure
timeout_ms: int = 2000
fallback_value: Optional[Dict[str, Any]] = None
@dataclass
class ToolChain:
"""
A sequence of tool calls with data flow between them.
Each step receives the accumulated context from all prior steps
and produces output that is merged into the context for subsequent steps.
"""
name: str
steps: List[ChainStep]
description: str = ""
max_total_ms: int = 5000
async def execute(
self, initial_context: Dict[str, Any], tool_registry: "ToolRegistry"
) -> Dict[str, Any]:
"""Execute the chain, flowing data through each step."""
context = dict(initial_context)
results = {}
elapsed_ms = 0
for i, step in enumerate(self.steps):
remaining_ms = self.max_total_ms - elapsed_ms
if remaining_ms <= 0:
logger.warning(
f"Chain '{self.name}' exceeded total timeout at step {i}"
)
break
step_timeout = min(step.timeout_ms, remaining_ms)
try:
params = step.param_mapper(context)
start = asyncio.get_event_loop().time()
result = await asyncio.wait_for(
tool_registry.invoke_async(step.tool_name, params),
timeout=step_timeout / 1000,
)
elapsed_ms += int(
(asyncio.get_event_loop().time() - start) * 1000
)
results[step.tool_name] = result
context[f"step_{i}_result"] = result
context.update(result if isinstance(result, dict) else {})
except asyncio.TimeoutError:
logger.error(
f"Chain '{self.name}' step {i} ({step.tool_name}) timed out"
)
if step.required:
return {
"error": f"Tool '{step.tool_name}' timed out",
"chain": self.name,
"failed_step": i,
"partial_results": results,
}
elif step.fallback_value:
results[step.tool_name] = step.fallback_value
context[f"step_{i}_result"] = step.fallback_value
except Exception as e:
logger.error(
f"Chain '{self.name}' step {i} ({step.tool_name}) failed: {e}"
)
if step.required:
return {
"error": str(e),
"chain": self.name,
"failed_step": i,
"partial_results": results,
}
elif step.fallback_value:
results[step.tool_name] = step.fallback_value
return {"chain": self.name, "results": results, "success": True}
# ---------------------------------------------------------------------------
# Predefined Chains for MangaAssist
# ---------------------------------------------------------------------------
search_and_check_stock = ToolChain(
name="search_and_check_stock",
description=(
"Search for a manga title then check its inventory. "
"Used when customer asks if a specific title is available."
),
steps=[
ChainStep(
tool_name="product_search",
param_mapper=lambda ctx: {
"query": ctx["query"],
"max_results": 1,
"in_stock_only": False,
},
timeout_ms=1500,
),
ChainStep(
tool_name="inventory_check",
param_mapper=lambda ctx: {
"product_id": ctx.get("results", [{}])[0].get("product_id", ""),
},
timeout_ms=1000,
),
],
max_total_ms=3000,
)
search_and_recommend = ToolChain(
name="search_and_recommend",
description=(
"Search for a title then generate similar recommendations. "
"Used when customer asks 'I liked X, what else should I read?'"
),
steps=[
ChainStep(
tool_name="product_search",
param_mapper=lambda ctx: {
"query": ctx["query"],
"max_results": 1,
},
timeout_ms=1500,
),
ChainStep(
tool_name="recommendation_engine",
param_mapper=lambda ctx: {
"seed_title": ctx.get("results", [{}])[0].get("title", ctx["query"]),
"num_recommendations": ctx.get("num_recs", 5),
},
timeout_ms=1500,
),
],
max_total_ms=3500,
)
order_with_item_details = ToolChain(
name="order_with_item_details",
description=(
"Look up an order then enrich with current product details. "
"Used when customer asks about a past order and wants current pricing."
),
steps=[
ChainStep(
tool_name="order_lookup",
param_mapper=lambda ctx: {
"order_id": ctx["order_id"],
"include_items": True,
},
timeout_ms=1500,
),
ChainStep(
tool_name="product_search",
param_mapper=lambda ctx: {
"query": ctx.get("items", [{}])[0].get("title", ""),
"max_results": 1,
},
required=False,
fallback_value={"results": [], "note": "Product details unavailable"},
timeout_ms=1000,
),
],
max_total_ms=3000,
)
2.4 StrandsToolBinder — Binding Tools to Agents
"""
strands_binder.py — Binds tools to Strands agents with runtime control.
The StrandsToolBinder manages which tools are available to the agent
at any given point, supporting per-turn filtering, health-based
disabling, and dynamic tool loading.
"""
import logging
import time
from dataclasses import dataclass, field
from typing import Any, Callable, Dict, List, Optional, Set
from strands import Agent
logger = logging.getLogger("mangaassist.binder")
@dataclass
class ToolHealth:
"""Tracks the health of a registered tool."""
name: str
is_healthy: bool = True
consecutive_failures: int = 0
last_failure_time: float = 0.0
failure_threshold: int = 5
cooldown_seconds: float = 60.0
def record_success(self) -> None:
self.consecutive_failures = 0
self.is_healthy = True
def record_failure(self) -> None:
self.consecutive_failures += 1
self.last_failure_time = time.time()
if self.consecutive_failures >= self.failure_threshold:
self.is_healthy = False
logger.warning(
f"Tool '{self.name}' marked unhealthy after "
f"{self.consecutive_failures} consecutive failures"
)
def check_recovery(self) -> bool:
"""Check if enough time has passed to try the tool again."""
if self.is_healthy:
return True
if time.time() - self.last_failure_time > self.cooldown_seconds:
logger.info(f"Tool '{self.name}' entering half-open recovery")
return True
return False
class StrandsToolBinder:
"""
Manages tool availability for Strands agents.
Responsibilities:
- Binds tools to agent at initialization
- Tracks tool health via circuit breaker pattern
- Supports per-turn tool filtering (e.g., disable order_lookup for unauthenticated users)
- Enables dynamic tool loading at runtime
"""
def __init__(self) -> None:
self._tools: Dict[str, Callable] = {}
self._health: Dict[str, ToolHealth] = {}
self._disabled: Set[str] = set()
self._tool_metadata: Dict[str, Dict[str, Any]] = {}
def register(
self,
tool_fn: Callable,
name: Optional[str] = None,
failure_threshold: int = 5,
cooldown_seconds: float = 60.0,
metadata: Optional[Dict[str, Any]] = None,
) -> None:
"""Register a tool function with health tracking."""
tool_name = name or getattr(tool_fn, "__name__", str(tool_fn))
self._tools[tool_name] = tool_fn
self._health[tool_name] = ToolHealth(
name=tool_name,
failure_threshold=failure_threshold,
cooldown_seconds=cooldown_seconds,
)
self._tool_metadata[tool_name] = metadata or {}
logger.info(f"Registered tool: {tool_name}")
def disable(self, tool_name: str) -> None:
"""Temporarily disable a tool (e.g., for unauthenticated sessions)."""
self._disabled.add(tool_name)
logger.info(f"Disabled tool: {tool_name}")
def enable(self, tool_name: str) -> None:
"""Re-enable a previously disabled tool."""
self._disabled.discard(tool_name)
logger.info(f"Enabled tool: {tool_name}")
def get_available_tools(self) -> List[Callable]:
"""Return list of tools that are healthy and not disabled."""
available = []
for name, fn in self._tools.items():
if name in self._disabled:
continue
health = self._health[name]
if health.is_healthy or health.check_recovery():
available.append(fn)
else:
logger.debug(
f"Tool '{name}' unavailable — circuit open, "
f"cooldown remaining: "
f"{health.cooldown_seconds - (time.time() - health.last_failure_time):.1f}s"
)
return available
def bind_to_agent(
self,
system_prompt: str,
model_id: str = "us.anthropic.claude-3-5-sonnet-20241022-v2:0",
**agent_kwargs: Any,
) -> Agent:
"""Create a Strands Agent with the currently available tools."""
tools = self.get_available_tools()
logger.info(
f"Binding {len(tools)} tools to agent (model={model_id})"
)
return Agent(
system_prompt=system_prompt,
model=model_id,
tools=tools,
**agent_kwargs,
)
def record_result(self, tool_name: str, success: bool) -> None:
"""Record a tool invocation result for health tracking."""
if tool_name in self._health:
if success:
self._health[tool_name].record_success()
else:
self._health[tool_name].record_failure()
def get_health_report(self) -> Dict[str, Dict[str, Any]]:
"""Return health status for all registered tools."""
report = {}
for name, health in self._health.items():
report[name] = {
"is_healthy": health.is_healthy,
"consecutive_failures": health.consecutive_failures,
"is_disabled": name in self._disabled,
"available": (
name not in self._disabled
and (health.is_healthy or health.check_recovery())
),
}
return report
3. Lambda-Based Error Handling and Parameter Validation
3.1 Architecture: Lambda as Validation and Error Handling Layer
In MangaAssist, a Lambda function sits between the FM's tool-use output and the actual tool execution. This "tool gateway" Lambda: 1. Validates parameters before forwarding to the tool implementation 2. Handles errors from tool execution and translates them for the FM 3. Enforces timeouts to keep within the 3-second latency budget 4. Logs metrics for observability
FM produces tool_use block
|
v
Lambda: Tool Gateway
+-- Parameter Validation (JSON Schema)
+-- Encoding Validation (UTF-8 for Japanese)
+-- Injection Detection
+-- Rate Limiting Check
|
v
Tool Execution (OpenSearch / DynamoDB / etc.)
|
v
Error Handling
+-- Retry if transient
+-- Circuit break if persistent
+-- Fallback if all retries fail
+-- Format user-friendly error
|
v
tool_result block returned to FM
3.2 Production Code: ParameterValidator
"""
parameter_validator.py — Validates tool call parameters before execution.
This module implements JSON Schema validation, custom constraint rules,
type coercion, and security checks for tool parameters.
"""
import json
import logging
import re
from typing import Any, Dict, List, Optional, Tuple
import jsonschema
from jsonschema import Draft7Validator, ValidationError
logger = logging.getLogger("mangaassist.validator")
class ParameterValidator:
"""
Validates parameters for MangaAssist tool calls.
Validation pipeline:
1. JSON Schema structural validation
2. Type coercion (FM sometimes sends "10" instead of 10)
3. Custom constraint checks (business rules)
4. Encoding validation (Japanese UTF-8)
5. Injection detection (prompt injection in params)
"""
# Known schemas for each tool
SCHEMAS: Dict[str, Dict[str, Any]] = {}
# Patterns that indicate prompt injection attempts
INJECTION_PATTERNS = [
r"ignore\s+(previous|above|all)\s+instructions",
r"you\s+are\s+now\s+a",
r"system\s*:\s*",
r"<\s*/?system\s*>",
r"ADMIN\s*MODE",
r"\bsudo\b",
r"override\s+safety",
]
def __init__(self) -> None:
self._compiled_injections = [
re.compile(p, re.IGNORECASE) for p in self.INJECTION_PATTERNS
]
@classmethod
def register_schema(cls, tool_name: str, schema: Dict[str, Any]) -> None:
"""Register a JSON Schema for a tool's parameters."""
# Pre-compile the validator for fast repeated use
Draft7Validator.check_schema(schema)
cls.SCHEMAS[tool_name] = schema
logger.info(f"Registered schema for tool: {tool_name}")
def validate(
self, tool_name: str, params: Dict[str, Any]
) -> Tuple[bool, Dict[str, Any], List[str]]:
"""
Validate and coerce parameters for a tool call.
Returns:
(is_valid, coerced_params, error_messages)
"""
errors: List[str] = []
# Step 1: Schema validation
schema = self.SCHEMAS.get(tool_name)
if schema:
schema_errors = self._validate_schema(schema, params)
errors.extend(schema_errors)
# Step 2: Type coercion (attempt fixes for common FM mistakes)
coerced = self._coerce_types(tool_name, params)
# Re-validate after coercion if there were type errors
if errors and schema:
recheck_errors = self._validate_schema(schema, coerced)
if len(recheck_errors) < len(errors):
errors = recheck_errors
# Step 3: Custom business constraints
constraint_errors = self._check_constraints(tool_name, coerced)
errors.extend(constraint_errors)
# Step 4: Encoding validation
encoding_errors = self._validate_encoding(coerced)
errors.extend(encoding_errors)
# Step 5: Injection detection
injection_errors = self._detect_injection(coerced)
errors.extend(injection_errors)
return (len(errors) == 0, coerced, errors)
def _validate_schema(
self, schema: Dict[str, Any], params: Dict[str, Any]
) -> List[str]:
"""Run JSON Schema validation."""
validator = Draft7Validator(schema)
errors = []
for error in sorted(validator.iter_errors(params), key=lambda e: list(e.path)):
path = ".".join(str(p) for p in error.path) or "(root)"
errors.append(f"Schema error at '{path}': {error.message}")
return errors
def _coerce_types(
self, tool_name: str, params: Dict[str, Any]
) -> Dict[str, Any]:
"""
Attempt to coerce parameters to expected types.
FMs frequently make these mistakes:
- Sending "10" (string) instead of 10 (integer)
- Sending "true" (string) instead of true (boolean)
- Sending 5.0 (float) instead of 5 (integer)
"""
schema = self.SCHEMAS.get(tool_name, {})
properties = schema.get("properties", {})
coerced = dict(params)
for key, value in coerced.items():
if key not in properties:
continue
expected_type = properties[key].get("type")
if expected_type == "integer" and isinstance(value, str):
try:
coerced[key] = int(value)
logger.debug(f"Coerced '{key}': str -> int ({value} -> {coerced[key]})")
except ValueError:
pass
elif expected_type == "integer" and isinstance(value, float):
coerced[key] = int(value)
logger.debug(f"Coerced '{key}': float -> int ({value} -> {coerced[key]})")
elif expected_type == "number" and isinstance(value, str):
try:
coerced[key] = float(value)
logger.debug(f"Coerced '{key}': str -> float ({value} -> {coerced[key]})")
except ValueError:
pass
elif expected_type == "boolean" and isinstance(value, str):
if value.lower() in ("true", "1", "yes"):
coerced[key] = True
elif value.lower() in ("false", "0", "no"):
coerced[key] = False
logger.debug(f"Coerced '{key}': str -> bool ({value} -> {coerced[key]})")
return coerced
def _check_constraints(
self, tool_name: str, params: Dict[str, Any]
) -> List[str]:
"""Check business-level constraints beyond JSON Schema."""
errors = []
if tool_name == "order_lookup":
if not params.get("order_id") and not params.get("customer_email"):
errors.append(
"order_lookup requires either 'order_id' or 'customer_email'"
)
if tool_name == "inventory_check":
if not params.get("product_id") and not params.get("isbn"):
errors.append(
"inventory_check requires either 'product_id' or 'isbn'"
)
if tool_name == "recommendation_engine":
genres = params.get("preferred_genres", [])
if len(genres) > 5:
errors.append(
f"preferred_genres has {len(genres)} items, maximum is 5"
)
if tool_name == "product_search":
query = params.get("query", "")
if len(query) > 200:
errors.append(
f"query length {len(query)} exceeds maximum of 200 characters"
)
return errors
def _validate_encoding(self, params: Dict[str, Any]) -> List[str]:
"""Validate that string parameters are valid UTF-8."""
errors = []
for key, value in params.items():
if isinstance(value, str):
try:
value.encode("utf-8").decode("utf-8")
except UnicodeError:
errors.append(
f"Parameter '{key}' contains invalid UTF-8 characters"
)
return errors
def _detect_injection(self, params: Dict[str, Any]) -> List[str]:
"""Detect potential prompt injection in parameter values."""
errors = []
for key, value in params.items():
if isinstance(value, str):
for pattern in self._compiled_injections:
if pattern.search(value):
errors.append(
f"Potential injection detected in parameter '{key}'"
)
logger.warning(
f"Injection attempt in '{key}': {value[:100]}..."
)
break
return errors
3.3 Production Code: ToolErrorHandler
"""
tool_error_handler.py — Classifies, handles, and formats tool errors.
Provides retry logic, circuit breaker, and user-friendly error messages
for the MangaAssist chatbot.
"""
import asyncio
import logging
import random
import time
from dataclasses import dataclass, field
from enum import Enum
from typing import Any, Callable, Dict, List, Optional
import boto3
logger = logging.getLogger("mangaassist.error_handler")
class ErrorSeverity(Enum):
"""Classification of error severity."""
TRANSIENT = "transient" # Network blip, throttle — retry
VALIDATION = "validation" # Bad params — do not retry, fix params
RESOURCE = "resource" # Not found — do not retry
PERMISSION = "permission" # Auth failure — do not retry
CAPACITY = "capacity" # Rate limit / quota — retry with backoff
INTERNAL = "internal" # Bug in tool code — do not retry
TIMEOUT = "timeout" # Tool took too long — maybe retry
class ErrorAction(Enum):
"""What to do about an error."""
RETRY = "retry"
FALLBACK = "fallback"
FAIL_GRACEFULLY = "fail_gracefully"
FAIL_HARD = "fail_hard"
@dataclass
class ClassifiedError:
"""An error that has been classified with severity and action."""
original_error: Exception
severity: ErrorSeverity
action: ErrorAction
user_message: str
technical_detail: str
tool_name: str
retry_after_ms: int = 0
# User-friendly error messages (Japanese and English)
USER_MESSAGES = {
"ORDER_NOT_FOUND": (
"I couldn't find that order. Could you double-check the order ID? "
"It should look like ORD-XXXXXXXXXX."
),
"PRODUCT_NOT_FOUND": (
"I wasn't able to find that manga title in our catalog. "
"Could you try a different search term or check the spelling?"
),
"SEARCH_TIMEOUT": (
"The search is taking longer than usual. Let me try again with "
"a simpler query."
),
"SERVICE_UNAVAILABLE": (
"I'm having trouble connecting to one of our services right now. "
"Please try again in a moment."
),
"RATE_LIMITED": (
"We're experiencing high traffic right now. I'll retry your "
"request in just a moment."
),
"INVALID_PARAMETERS": (
"I need a bit more information to help you. Could you provide "
"more details about what you're looking for?"
),
"INTERNAL_ERROR": (
"Something went wrong on our end. I've logged the issue and "
"our team will look into it. Can I help you with something else?"
),
}
class ToolErrorHandler:
"""
Handles errors from tool invocations in MangaAssist.
Responsibilities:
1. Classify errors by severity
2. Determine action (retry, fallback, fail)
3. Execute retries with exponential backoff + jitter
4. Generate user-friendly error messages
5. Emit CloudWatch metrics for monitoring
"""
def __init__(
self,
max_retries: int = 3,
base_delay_ms: int = 200,
max_delay_ms: int = 2000,
cloudwatch_namespace: str = "MangaAssist/Tools",
) -> None:
self.max_retries = max_retries
self.base_delay_ms = base_delay_ms
self.max_delay_ms = max_delay_ms
self.cw_namespace = cloudwatch_namespace
self._cw_client = boto3.client("cloudwatch")
def classify_error(
self, error: Exception, tool_name: str
) -> ClassifiedError:
"""Classify an error and determine the appropriate action."""
error_str = str(error).lower()
error_type = type(error).__name__
# Timeout errors
if isinstance(error, (asyncio.TimeoutError, TimeoutError)):
return ClassifiedError(
original_error=error,
severity=ErrorSeverity.TIMEOUT,
action=ErrorAction.RETRY,
user_message=USER_MESSAGES["SEARCH_TIMEOUT"],
technical_detail=f"Tool '{tool_name}' timed out",
tool_name=tool_name,
retry_after_ms=500,
)
# DynamoDB / service throttling
if "throttl" in error_str or "rate" in error_str or "429" in error_str:
return ClassifiedError(
original_error=error,
severity=ErrorSeverity.CAPACITY,
action=ErrorAction.RETRY,
user_message=USER_MESSAGES["RATE_LIMITED"],
technical_detail=f"Rate limited on '{tool_name}': {error}",
tool_name=tool_name,
retry_after_ms=1000,
)
# Not found
if "not found" in error_str or "404" in error_str:
return ClassifiedError(
original_error=error,
severity=ErrorSeverity.RESOURCE,
action=ErrorAction.FAIL_GRACEFULLY,
user_message=USER_MESSAGES.get(
f"{tool_name.upper()}_NOT_FOUND",
USER_MESSAGES["PRODUCT_NOT_FOUND"],
),
technical_detail=f"Resource not found in '{tool_name}': {error}",
tool_name=tool_name,
)
# Validation errors
if "validat" in error_str or isinstance(error, (ValueError, TypeError)):
return ClassifiedError(
original_error=error,
severity=ErrorSeverity.VALIDATION,
action=ErrorAction.FAIL_GRACEFULLY,
user_message=USER_MESSAGES["INVALID_PARAMETERS"],
technical_detail=f"Validation error in '{tool_name}': {error}",
tool_name=tool_name,
)
# Connection / transient
if any(
kw in error_str
for kw in ["connection", "network", "temporary", "503", "502"]
):
return ClassifiedError(
original_error=error,
severity=ErrorSeverity.TRANSIENT,
action=ErrorAction.RETRY,
user_message=USER_MESSAGES["SERVICE_UNAVAILABLE"],
technical_detail=f"Transient error in '{tool_name}': {error}",
tool_name=tool_name,
retry_after_ms=300,
)
# Auth errors
if "auth" in error_str or "permission" in error_str or "403" in error_str:
return ClassifiedError(
original_error=error,
severity=ErrorSeverity.PERMISSION,
action=ErrorAction.FAIL_HARD,
user_message=USER_MESSAGES["INTERNAL_ERROR"],
technical_detail=f"Permission error in '{tool_name}': {error}",
tool_name=tool_name,
)
# Default: internal error
return ClassifiedError(
original_error=error,
severity=ErrorSeverity.INTERNAL,
action=ErrorAction.FAIL_HARD,
user_message=USER_MESSAGES["INTERNAL_ERROR"],
technical_detail=f"Internal error in '{tool_name}': {error_type}: {error}",
tool_name=tool_name,
)
async def execute_with_retry(
self,
tool_name: str,
tool_fn: Callable,
params: Dict[str, Any],
fallback_fn: Optional[Callable] = None,
) -> Dict[str, Any]:
"""
Execute a tool with retry logic and error handling.
Retry strategy: exponential backoff with jitter
delay = min(base_delay * 2^attempt + random_jitter, max_delay)
"""
last_error: Optional[ClassifiedError] = None
for attempt in range(self.max_retries + 1):
try:
result = await tool_fn(**params)
self._emit_metric(tool_name, "Success", 1)
return result
except Exception as e:
classified = self.classify_error(e, tool_name)
last_error = classified
self._emit_metric(
tool_name, f"Error_{classified.severity.value}", 1
)
logger.warning(
f"Tool '{tool_name}' attempt {attempt + 1}/{self.max_retries + 1} "
f"failed: {classified.technical_detail}"
)
# Should we retry?
if classified.action != ErrorAction.RETRY:
break
if attempt >= self.max_retries:
break
# Calculate backoff delay
delay_ms = min(
self.base_delay_ms * (2**attempt)
+ random.randint(0, 100),
self.max_delay_ms,
)
delay_ms = max(delay_ms, classified.retry_after_ms)
logger.info(f"Retrying '{tool_name}' in {delay_ms}ms...")
await asyncio.sleep(delay_ms / 1000)
# All retries exhausted — try fallback
if fallback_fn and last_error and last_error.action in (
ErrorAction.RETRY,
ErrorAction.FALLBACK,
):
logger.info(f"Using fallback for tool '{tool_name}'")
try:
result = await fallback_fn(**params)
self._emit_metric(tool_name, "FallbackSuccess", 1)
return result
except Exception as fb_err:
logger.error(f"Fallback for '{tool_name}' also failed: {fb_err}")
self._emit_metric(tool_name, "FallbackFailure", 1)
# Return user-friendly error
self._emit_metric(tool_name, "FinalFailure", 1)
return {
"error": True,
"error_code": last_error.severity.value if last_error else "unknown",
"message": last_error.user_message if last_error else USER_MESSAGES["INTERNAL_ERROR"],
"tool": tool_name,
}
def _emit_metric(self, tool_name: str, metric_name: str, value: float) -> None:
"""Emit a CloudWatch metric for tool observability."""
try:
self._cw_client.put_metric_data(
Namespace=self.cw_namespace,
MetricData=[
{
"MetricName": metric_name,
"Dimensions": [
{"Name": "ToolName", "Value": tool_name},
],
"Value": value,
"Unit": "Count",
}
],
)
except Exception as e:
logger.debug(f"Failed to emit metric: {e}")
3.4 Production Code: ToolRegistry
"""
tool_registry.py — Central registry for all MangaAssist tools.
The ToolRegistry is a singleton that manages tool registration,
discovery, invocation, validation, and error handling. It is the
single entry point for all tool operations in the system.
"""
import asyncio
import inspect
import logging
import time
from dataclasses import dataclass, field
from typing import Any, Callable, Dict, List, Optional, Set
from mangaassist.tools.parameter_validator import ParameterValidator
from mangaassist.tools.tool_error_handler import ToolErrorHandler
logger = logging.getLogger("mangaassist.registry")
@dataclass
class RegisteredTool:
"""Metadata for a registered tool."""
name: str
function: Callable
schema: Dict[str, Any]
description: str
version: str = "1.0.0"
tags: List[str] = field(default_factory=list)
fallback_fn: Optional[Callable] = None
timeout_ms: int = 2000
is_async: bool = False
requires_auth: bool = False
class ToolRegistry:
"""
Singleton registry for all MangaAssist tools.
Usage:
registry = ToolRegistry()
registry.register(product_search, schema={...})
result = await registry.invoke("product_search", {"query": "Naruto"})
"""
_instance: Optional["ToolRegistry"] = None
def __new__(cls) -> "ToolRegistry":
if cls._instance is None:
cls._instance = super().__new__(cls)
cls._instance._initialized = False
return cls._instance
def __init__(self) -> None:
if self._initialized:
return
self._tools: Dict[str, RegisteredTool] = {}
self._validator = ParameterValidator()
self._error_handler = ToolErrorHandler()
self._invocation_count: Dict[str, int] = {}
self._total_latency_ms: Dict[str, float] = {}
self._initialized = True
logger.info("ToolRegistry initialized")
def register(
self,
fn: Callable,
schema: Dict[str, Any],
name: Optional[str] = None,
description: Optional[str] = None,
version: str = "1.0.0",
tags: Optional[List[str]] = None,
fallback_fn: Optional[Callable] = None,
timeout_ms: int = 2000,
requires_auth: bool = False,
) -> None:
"""Register a tool with its schema and metadata."""
tool_name = name or fn.__name__
tool_desc = description or (fn.__doc__ or "").strip().split("\n")[0]
is_async = inspect.iscoroutinefunction(fn)
tool = RegisteredTool(
name=tool_name,
function=fn,
schema=schema,
description=tool_desc,
version=version,
tags=tags or [],
fallback_fn=fallback_fn,
timeout_ms=timeout_ms,
is_async=is_async,
requires_auth=requires_auth,
)
self._tools[tool_name] = tool
self._invocation_count[tool_name] = 0
self._total_latency_ms[tool_name] = 0.0
# Register schema with the validator
ParameterValidator.register_schema(tool_name, schema)
logger.info(
f"Registered tool '{tool_name}' v{version} "
f"(async={is_async}, auth={requires_auth})"
)
def discover(
self,
tags: Optional[List[str]] = None,
requires_auth: Optional[bool] = None,
) -> List[Dict[str, Any]]:
"""Discover registered tools, optionally filtered by tags or auth."""
results = []
for tool in self._tools.values():
if tags and not any(t in tool.tags for t in tags):
continue
if requires_auth is not None and tool.requires_auth != requires_auth:
continue
results.append(
{
"name": tool.name,
"description": tool.description,
"version": tool.version,
"tags": tool.tags,
"requires_auth": tool.requires_auth,
"schema": tool.schema,
}
)
return results
async def invoke(
self, tool_name: str, params: Dict[str, Any]
) -> Dict[str, Any]:
"""
Invoke a tool by name with parameter validation and error handling.
Pipeline:
1. Look up tool
2. Validate parameters
3. Execute with retry/error handling
4. Record metrics
5. Return result
"""
# Step 1: Look up
tool = self._tools.get(tool_name)
if not tool:
return {
"error": True,
"error_code": "TOOL_NOT_FOUND",
"message": f"Tool '{tool_name}' is not registered.",
}
# Step 2: Validate
is_valid, coerced_params, errors = self._validator.validate(
tool_name, params
)
if not is_valid:
logger.warning(
f"Validation failed for '{tool_name}': {errors}"
)
return {
"error": True,
"error_code": "VALIDATION_FAILED",
"message": f"Invalid parameters: {'; '.join(errors)}",
"validation_errors": errors,
}
# Step 3: Execute with error handling
start_ms = time.time() * 1000
# Wrap sync functions as async
if tool.is_async:
tool_fn = tool.function
else:
async def tool_fn(**kwargs: Any) -> Any:
return tool.function(**kwargs)
result = await self._error_handler.execute_with_retry(
tool_name=tool_name,
tool_fn=tool_fn,
params=coerced_params,
fallback_fn=tool.fallback_fn,
)
# Step 4: Record metrics
elapsed_ms = time.time() * 1000 - start_ms
self._invocation_count[tool_name] += 1
self._total_latency_ms[tool_name] += elapsed_ms
logger.info(
f"Tool '{tool_name}' completed in {elapsed_ms:.1f}ms "
f"(success={not result.get('error', False)})"
)
return result
async def invoke_async(
self, tool_name: str, params: Dict[str, Any]
) -> Dict[str, Any]:
"""Alias for invoke — both are async."""
return await self.invoke(tool_name, params)
def get_metrics(self) -> Dict[str, Dict[str, Any]]:
"""Return invocation metrics for all tools."""
metrics = {}
for name in self._tools:
count = self._invocation_count.get(name, 0)
total_lat = self._total_latency_ms.get(name, 0.0)
metrics[name] = {
"invocation_count": count,
"avg_latency_ms": (
round(total_lat / count, 2) if count > 0 else 0
),
"total_latency_ms": round(total_lat, 2),
}
return metrics
def get_bedrock_tool_config(self) -> Dict[str, Any]:
"""Generate Bedrock-compatible toolConfig from all registered tools."""
tools = []
for tool in self._tools.values():
tools.append(
{
"toolSpec": {
"name": tool.name,
"description": tool.description,
"inputSchema": {"json": tool.schema},
}
}
)
return {"tools": tools}
4. Tool Retry Strategies and Fallback Patterns
4.1 Retry Strategy Matrix
| Error Type | Retry? | Max Attempts | Backoff | Fallback |
|---|---|---|---|---|
| Timeout | Yes | 2 | 500ms, 1000ms | Cached result or simplified query |
| Throttle (429) | Yes | 3 | 1s, 2s, 4s | Queue for async processing |
| Connection Error | Yes | 2 | 300ms, 600ms | Cached result |
| Not Found (404) | No | 0 | N/A | User-friendly "not found" |
| Validation Error | No | 0 | N/A | Ask FM to reformulate |
| Auth Error (403) | No | 0 | N/A | Escalate to human |
| Internal Error (500) | Yes (once) | 1 | 1000ms | Static fallback |
4.2 Exponential Backoff with Jitter
Delay = min(base_delay_ms * 2^attempt + random(0, 100), max_delay_ms)
Example (base=200ms, max=2000ms):
Attempt 0: 200 + rand(0,100) = ~230ms
Attempt 1: 400 + rand(0,100) = ~460ms
Attempt 2: 800 + rand(0,100) = ~850ms
Attempt 3: 1600 + rand(0,100) = ~1650ms (capped at 2000ms)
The jitter prevents the "thundering herd" problem where all retries from concurrent requests land at the same time.
4.3 Circuit Breaker Pattern
"""
circuit_breaker.py — Circuit breaker for tool invocations.
States:
CLOSED — Normal operation, requests flow through
OPEN — Too many failures, requests are rejected immediately
HALF_OPEN — After cooldown, one probe request allowed through
"""
import logging
import time
from dataclasses import dataclass
from enum import Enum
from typing import Any, Dict, Optional
logger = logging.getLogger("mangaassist.circuit_breaker")
class CircuitState(Enum):
CLOSED = "closed"
OPEN = "open"
HALF_OPEN = "half_open"
@dataclass
class CircuitBreaker:
"""
Circuit breaker for a single tool.
Configuration:
- failure_threshold: Number of failures to trip the breaker (default 5)
- recovery_timeout_s: Seconds before trying again (default 60)
- success_threshold: Successes in HALF_OPEN to close (default 2)
"""
tool_name: str
failure_threshold: int = 5
recovery_timeout_s: float = 60.0
success_threshold: int = 2
state: CircuitState = CircuitState.CLOSED
failure_count: int = 0
success_count: int = 0
last_failure_time: float = 0.0
last_state_change: float = 0.0
def can_execute(self) -> bool:
"""Check if a request should be allowed through."""
if self.state == CircuitState.CLOSED:
return True
if self.state == CircuitState.OPEN:
if time.time() - self.last_failure_time >= self.recovery_timeout_s:
self._transition(CircuitState.HALF_OPEN)
return True
return False
if self.state == CircuitState.HALF_OPEN:
return True # Allow probe requests
return False
def record_success(self) -> None:
"""Record a successful invocation."""
if self.state == CircuitState.HALF_OPEN:
self.success_count += 1
if self.success_count >= self.success_threshold:
self._transition(CircuitState.CLOSED)
self.failure_count = 0
self.success_count = 0
elif self.state == CircuitState.CLOSED:
self.failure_count = 0
def record_failure(self) -> None:
"""Record a failed invocation."""
self.failure_count += 1
self.last_failure_time = time.time()
if self.state == CircuitState.HALF_OPEN:
self._transition(CircuitState.OPEN)
self.success_count = 0
elif self.state == CircuitState.CLOSED:
if self.failure_count >= self.failure_threshold:
self._transition(CircuitState.OPEN)
def _transition(self, new_state: CircuitState) -> None:
"""Transition to a new state."""
old_state = self.state
self.state = new_state
self.last_state_change = time.time()
logger.info(
f"Circuit breaker '{self.tool_name}': "
f"{old_state.value} -> {new_state.value}"
)
def get_status(self) -> Dict[str, Any]:
"""Return current circuit breaker status."""
return {
"tool": self.tool_name,
"state": self.state.value,
"failure_count": self.failure_count,
"time_in_state_s": round(
time.time() - self.last_state_change, 1
)
if self.last_state_change
else 0,
}
4.4 Fallback Hierarchy
When a tool fails beyond recovery, the fallback hierarchy is:
Level 1: Retry with exponential backoff (transient errors only)
|
v [all retries exhausted]
Level 2: Invoke fallback tool (e.g., cached OpenSearch -> DynamoDB scan)
|
v [fallback also fails]
Level 3: Return cached response from ElastiCache Redis
|
v [no cache hit]
Level 4: Return static default response
|
v [static response configured]
Level 5: Return user-friendly error with suggested next action
class FallbackChain:
"""Implements the multi-level fallback hierarchy."""
def __init__(self, tool_name: str) -> None:
self.tool_name = tool_name
self._redis_client = None # Lazy init
self._static_defaults: Dict[str, Any] = {
"product_search": {
"results": [],
"total_count": 0,
"message": "Search is temporarily unavailable. Please try again shortly.",
},
"order_lookup": {
"message": "Order lookup is temporarily unavailable. "
"Please check your email for order confirmation details.",
},
"recommendation_engine": {
"recommendations": [],
"message": "Recommendations are being updated. "
"Please browse our popular titles section.",
},
"inventory_check": {
"message": "Inventory status is being updated. "
"Please check the product page for current availability.",
},
}
async def get_cached_response(
self, cache_key: str
) -> Optional[Dict[str, Any]]:
"""Try to fetch a cached response from Redis."""
try:
import redis.asyncio as redis
if not self._redis_client:
self._redis_client = redis.Redis(
host="mangaassist-cache.xxxxx.ng.0001.apne1.cache.amazonaws.com",
port=6379,
decode_responses=True,
)
cached = await self._redis_client.get(f"tool_cache:{cache_key}")
if cached:
import json
return json.loads(cached)
except Exception as e:
logger.debug(f"Cache lookup failed: {e}")
return None
def get_static_default(self) -> Dict[str, Any]:
"""Return a static default response for the tool."""
return self._static_defaults.get(
self.tool_name,
{
"message": "This feature is temporarily unavailable. "
"Please try again later.",
},
)
5. Lambda Tool Gateway — Complete Implementation
5.1 Lambda Handler
"""
lambda_tool_gateway.py — AWS Lambda function that serves as the
validation and error-handling gateway for all MangaAssist tool calls.
This Lambda is invoked by the ECS Fargate orchestrator whenever
the FM produces a tool_use content block. It:
1. Validates the tool name and parameters
2. Invokes the tool with retry and circuit breaker
3. Returns a formatted tool_result block
Environment variables:
OPENSEARCH_ENDPOINT: OpenSearch Serverless collection endpoint
DYNAMODB_TABLE: Orders table name
REDIS_ENDPOINT: ElastiCache Redis endpoint
TOOL_TIMEOUT_MS: Max tool execution time (default 2000)
"""
import asyncio
import json
import logging
import os
import time
from typing import Any, Dict
logger = logging.getLogger()
logger.setLevel(logging.INFO)
# Import our modules (packaged as Lambda layer)
from mangaassist.tools.tool_registry import ToolRegistry
from mangaassist.tools.parameter_validator import ParameterValidator
from mangaassist.tools.tool_error_handler import ToolErrorHandler
# Initialize outside handler for connection reuse across invocations
registry = ToolRegistry()
TOOL_TIMEOUT_MS = int(os.environ.get("TOOL_TIMEOUT_MS", "2000"))
def lambda_handler(event: Dict[str, Any], context: Any) -> Dict[str, Any]:
"""
Handle a tool invocation request.
Expected event format:
{
"tool_use_id": "toolu_abc123",
"tool_name": "product_search",
"parameters": {
"query": "Attack on Titan",
"genre": "shonen"
},
"session_id": "sess_xyz789",
"customer_id": "cust_456" // optional
}
"""
start_time = time.time()
tool_use_id = event.get("tool_use_id", "unknown")
tool_name = event.get("tool_name", "")
params = event.get("parameters", {})
logger.info(
f"Tool gateway invoked: tool={tool_name}, "
f"tool_use_id={tool_use_id}, "
f"remaining_ms={context.get_remaining_time_in_millis()}"
)
# Run the async invocation in the event loop
loop = asyncio.new_event_loop()
try:
result = loop.run_until_complete(
_process_tool_call(tool_name, params, tool_use_id)
)
finally:
loop.close()
elapsed_ms = (time.time() - start_time) * 1000
logger.info(
f"Tool gateway completed: tool={tool_name}, "
f"elapsed_ms={elapsed_ms:.1f}, "
f"success={not result.get('error', False)}"
)
# Return in Bedrock tool_result format
return {
"tool_use_id": tool_use_id,
"content": json.dumps(result, ensure_ascii=False, default=str),
"status": "error" if result.get("error") else "success",
}
async def _process_tool_call(
tool_name: str, params: Dict[str, Any], tool_use_id: str
) -> Dict[str, Any]:
"""Process a single tool call through the validation and execution pipeline."""
# Invoke through the registry (which handles validation + retry + fallback)
try:
result = await asyncio.wait_for(
registry.invoke(tool_name, params),
timeout=TOOL_TIMEOUT_MS / 1000,
)
return result
except asyncio.TimeoutError:
logger.error(f"Tool '{tool_name}' exceeded gateway timeout of {TOOL_TIMEOUT_MS}ms")
return {
"error": True,
"error_code": "GATEWAY_TIMEOUT",
"message": (
"I'm sorry, that request took too long. "
"Let me try a simpler approach."
),
"tool": tool_name,
}
6. Putting It All Together — Agent Initialization
"""
agent_init.py — Initialize the MangaAssist agent with all tools bound.
This is the entry point called by the ECS Fargate orchestrator
when a new customer session begins.
"""
from mangaassist.tools.strands_binder import StrandsToolBinder
from mangaassist.tools.strands_tools import (
inventory_check,
order_lookup,
product_search,
recommendation_engine,
)
SYSTEM_PROMPT = """\
You are MangaAssist, a helpful assistant for a Japanese manga store.
You help customers search for manga, check orders, get recommendations,
and check inventory. Always be polite and helpful.
When searching, prefer using the customer's exact query terms.
When a tool returns an error, explain the issue clearly and suggest alternatives.
Respond in the same language the customer uses.
"""
def create_agent(
session_context: dict,
) -> "Agent":
"""Create a fully-configured MangaAssist agent for a customer session."""
binder = StrandsToolBinder()
# Register all tools with health tracking
binder.register(product_search, failure_threshold=5, cooldown_seconds=60)
binder.register(order_lookup, failure_threshold=3, cooldown_seconds=30)
binder.register(recommendation_engine, failure_threshold=5, cooldown_seconds=60)
binder.register(inventory_check, failure_threshold=5, cooldown_seconds=60)
# Disable auth-required tools for unauthenticated sessions
if not session_context.get("is_authenticated"):
binder.disable("order_lookup")
# Use Haiku for simple queries, Sonnet for complex ones
model_id = (
"us.anthropic.claude-3-5-sonnet-20241022-v2:0"
if session_context.get("complexity", "simple") == "complex"
else "us.anthropic.claude-3-5-haiku-20241022-v1:0"
)
agent = binder.bind_to_agent(
system_prompt=SYSTEM_PROMPT,
model_id=model_id,
)
return agent
7. Key Takeaways
| Principle | MangaAssist Implementation |
|---|---|
| Define tools precisely | JSON Schema with types, constraints, enums, and descriptions — the FM only knows what you tell it |
| Validate before executing | ParameterValidator catches bad params before they hit DynamoDB/OpenSearch |
| Retry intelligently | Exponential backoff + jitter for transient errors only; never retry validation failures |
| Fail gracefully | User-friendly messages in both Japanese and English; never expose stack traces |
| Circuit break | 5 failures in 60s trips the breaker; prevents cascading failures across 1M daily messages |
| Chain with timeouts | Each chain step has its own timeout; total chain respects the 3-second budget |
| Observe everything | CloudWatch metrics per tool, per error type, per latency bucket |
| Bind dynamically | StrandsToolBinder enables/disables tools per session (auth state, feature flags) |
8. Cost Impact Analysis
| Decision | Cost Effect |
|---|---|
| Precise tool descriptions | Reduces incorrect tool selection, saving ~15% of wasted Sonnet invocations ($0.45/1K calls saved) |
| Parameter coercion | Prevents round-trips where FM retries with corrected params, saving ~$0.03 per coerced call |
| Haiku for simple queries | 12x cheaper than Sonnet ($0.25 vs $3.00 per 1M input tokens) for routine lookups |
| Circuit breaker | Prevents thundering herd retries; saves Lambda compute cost during downstream outages |
| Redis cached fallbacks | Avoids repeated tool retries at $0.02 per Lambda invocation + downstream API cost |
| Tool chaining with timeout budget | Prevents runaway multi-tool workflows from consuming excessive FM tokens |
At 1M messages/day: - Without optimization: ~$4,500/day (all Sonnet, frequent retries, no caching) - With optimization: ~$1,800/day (Haiku routing, validation, caching, circuit breaking) - Savings: ~$2,700/day (~60% reduction)