Tool Performance Framework Architecture
AWS AIP-C01 Task 4.3 — Skill 4.3.4: Create tool performance frameworks for FM tool operation monitoring
System: MangaAssist e-commerce chatbot (Bedrock Claude 3 Sonnet/Haiku, OpenSearch Serverless, DynamoDB, ECS Fargate, API Gateway WebSocket)
Skill Mapping
| Certification | Task | Skill | Focus |
|---|---|---|---|
| AWS AIP-C01 | Task 4.3 — Monitor FM tool operations | Skill 4.3.4 | Create tool performance frameworks |
What this means: Foundation models don't just generate text — they use tools. When Bedrock Claude decides to call product_search or order_lookup, we need to monitor how well the FM uses its tools, not just whether the tools themselves work. This is the intersection of FM evaluation and operational monitoring.
Mind Map — Tool Performance Dimensions
mindmap
root((Tool Performance<br/>Frameworks))
Call Pattern Tracking
Invocation Frequency
Call Chains
Dependency Mapping
Temporal Patterns
Intent-to-Tool Correlation
Performance Metrics
Latency p50/p95/p99
Success Rate
Timeout Rate
Parameter Accuracy
Retry Rate
Throughput per Tool
Multi-Agent Coordination
Handoff Tracing
Context Transfer Completeness
Coordination Overhead
Agent Routing Accuracy
Cross-Agent Tool Sharing
Usage Baselines
Rolling Averages 7d/30d
Anomaly Types
Missing Calls
Excess Calls
Wrong Tool Selection
Health State Machine
Seasonal Adjustment
Trend Detection
Tool Observability
Instrumentation Decorator
Structured Logging
Trace Correlation via X-Ray
Error Categorization
Parameter Sanitization
MangaAssist Tool Inventory
These are the tools that the MangaAssist FM (Bedrock Claude 3 Sonnet) can invoke via Bedrock Agent action groups:
| Tool Name | Purpose | Expected Latency (p50) | Input Parameters | Output Format | Downstream Dependencies |
|---|---|---|---|---|---|
product_search |
Search OpenSearch for products by text query or embedding vector | 120ms | query: str, category: str?, top_k: int=5 |
List[Product] with title, price, image_url, score |
OpenSearch Serverless collection |
order_lookup |
Query DynamoDB for order status, tracking, and history | 45ms | order_id: str OR customer_id: str |
Order with status, items, tracking_url, ETA |
DynamoDB Orders table |
inventory_check |
Check real-time product availability and stock levels | 60ms | product_id: str, warehouse: str? |
Inventory with in_stock: bool, quantity, restock_date |
DynamoDB Inventory table |
price_lookup |
Get current pricing including active promotions and discounts | 55ms | product_id: str, customer_tier: str? |
Pricing with base_price, discount, final_price, promo_code |
DynamoDB Pricing table + Promotions table |
recommendation_engine |
Get personalized manga/product recommendations | 200ms | customer_id: str, context: str?, limit: int=5 |
List[Recommendation] with product, score, reason |
OpenSearch k-NN + DynamoDB user profiles |
faq_retrieval |
Retrieve FAQ answers from the RAG knowledge base | 150ms | question: str, category: str? |
List[FAQ] with answer, source, confidence |
OpenSearch Serverless (FAQ index) |
return_initiate |
Start a return/refund process for an order | 80ms | order_id: str, reason: str, items: List[str] |
Return with return_id, label_url, status |
DynamoDB Returns table + SES for email |
shipping_estimate |
Calculate shipping cost and delivery time | 70ms | product_ids: List[str], address_zip: str |
Shipping with options, costs, ETAs |
External shipping API (cached in ElastiCache) |
Tool Call Frequency Distribution (Typical Day)
product_search ████████████████████████████ 42%
order_lookup ████████████████ 24%
faq_retrieval ████████████ 18%
inventory_check ██████ 9%
price_lookup ███ 4%
recommendation █ 2%
return_initiate ▏ 0.7%
shipping_estimate ▏ 0.3%
Architecture — Tool Performance Monitoring Framework
graph LR
subgraph UserLayer["User Layer"]
U[Customer via WebSocket]
end
subgraph FMLayer["FM Decision Layer"]
B[Bedrock Claude 3 Sonnet]
TR[Tool Router<br/>Action Group Dispatcher]
end
subgraph ToolExecution["Tool Execution Layer"]
direction TB
PS[product_search<br/>OpenSearch]:::healthy
OL[order_lookup<br/>DynamoDB]:::healthy
IC[inventory_check<br/>DynamoDB]:::healthy
PL[price_lookup<br/>DynamoDB]:::healthy
RE[recommendation_engine<br/>OpenSearch k-NN]:::healthy
FAQ[faq_retrieval<br/>OpenSearch]:::healthy
end
subgraph Instrumentation["Instrumentation Layer"]
DEC[Decorator Wrapper<br/>@monitor.instrument]
SL[Structured Logger<br/>JSON to CloudWatch Logs]
XR[X-Ray Trace<br/>Segment per Tool Call]
end
subgraph Monitoring["Monitoring & Aggregation"]
CW[CloudWatch Metrics<br/>Namespace: MangaAssist/Tools]
CWL[CloudWatch Logs<br/>Insights Queries]
XRD[X-Ray Service Map<br/>Tool Dependencies]
end
subgraph Analysis["Analysis Layer"]
BL[Baseline Calculator<br/>7-day Rolling Window]
AD[Anomaly Detector<br/>Statistical + ML]
PT[Pattern Tracker<br/>Call Chain Analysis]
HS[Health State Machine<br/>per-Tool Health]
end
subgraph Actions["Action Layer"]
AL[CloudWatch Alarms<br/>per-Tool Thresholds]:::alert
FB[Auto-Fallback<br/>Degrade Gracefully]:::alert
SN[SNS Notifications<br/>PagerDuty/Slack]:::alert
DB[Dashboard<br/>Tool Performance View]
end
U -->|query| B
B -->|select tools| TR
TR --> PS & OL & IC & PL & RE & FAQ
PS & OL & IC & PL & RE & FAQ --> DEC
DEC --> SL & XR
SL --> CWL
XR --> XRD
DEC --> CW
CW --> BL & AD & PT & HS
CWL --> PT
BL --> AD
AD --> AL
HS --> AL & FB
AL --> SN
PT --> DB
HS --> DB
classDef healthy fill:#d4edda,stroke:#28a745,color:#155724
classDef alert fill:#f8d7da,stroke:#dc3545,color:#721c24
Tool Call Flow — Sequence Diagram
A complete multi-tool call showing real monitoring instrumentation:
sequenceDiagram
participant User
participant WS as API Gateway<br/>WebSocket
participant ECS as ECS Fargate<br/>Orchestrator
participant FM as Bedrock Claude 3<br/>Sonnet
participant MON as ToolPerformance<br/>Monitor
participant PS as product_search<br/>OpenSearch
participant IC as inventory_check<br/>DynamoDB
participant PL as price_lookup<br/>DynamoDB
participant RE as recommendation<br/>OpenSearch k-NN
participant CW as CloudWatch
User->>WS: "Is One Piece Vol 107 in stock?<br/>What's the price and similar options?"
WS->>ECS: Route message
ECS->>FM: InvokeModel (with tool definitions)
Note over FM: FM plans: product_search →<br/>inventory_check + price_lookup (parallel) →<br/>recommendation_engine
rect rgb(220, 240, 220)
Note over FM,PS: Step 1: product_search
FM->>ECS: tool_use: product_search(query="One Piece Volume 107")
ECS->>MON: start_call("product_search", params)
MON->>MON: Record start timestamp
ECS->>PS: Search query + embedding
PS-->>ECS: [{product_id: "OP-107", title: "One Piece Vol 107", ...}]
ECS->>MON: end_call(duration=115ms, status=success)
MON->>CW: PutMetricData(Latency=115, ToolName=product_search)
MON->>MON: Check baseline: 115ms < p95(180ms) ✓
end
ECS->>FM: tool_result: product found (product_id=OP-107)
rect rgb(255, 230, 230)
Note over FM,IC: Step 2a: inventory_check (TIMEOUT)
FM->>ECS: tool_use: inventory_check(product_id="OP-107")
ECS->>MON: start_call("inventory_check", params)
ECS->>IC: GetItem(PK=OP-107)
Note over IC: DynamoDB throttled!<br/>3000ms timeout
IC--xECS: Timeout after 3000ms
ECS->>MON: end_call(duration=3000ms, status=timeout)
MON->>CW: PutMetricData(Timeout=1, ToolName=inventory_check)
MON->>MON: Check baseline: TIMEOUT exceeds p95(90ms) ⚠️
MON->>CW: PutMetricData(AnomalyDetected=1, AnomalyType=latency_spike)
MON->>MON: Update health: Healthy → Degraded
ECS->>ECS: Fallback: return cached inventory<br/>(stale_ok=true, cache_age=5min)
end
rect rgb(220, 240, 220)
Note over FM,PL: Step 2b: price_lookup (parallel with 2a)
FM->>ECS: tool_use: price_lookup(product_id="OP-107")
ECS->>MON: start_call("price_lookup", params)
ECS->>PL: GetItem(PK=OP-107#pricing)
PL-->>ECS: {base: $9.99, discount: 10%, final: $8.99}
ECS->>MON: end_call(duration=48ms, status=success)
MON->>CW: PutMetricData(Latency=48, ToolName=price_lookup)
end
ECS->>FM: tool_results: inventory(fallback), pricing(success)
rect rgb(220, 240, 220)
Note over FM,RE: Step 3: recommendation_engine
FM->>ECS: tool_use: recommendation_engine(customer_id="C-123", context="One Piece")
ECS->>MON: start_call("recommendation_engine", params)
ECS->>RE: k-NN search + collaborative filtering
RE-->>ECS: [{Jujutsu Kaisen Vol 25, ...}, {Chainsaw Man Vol 17, ...}]
ECS->>MON: end_call(duration=195ms, status=success)
MON->>CW: PutMetricData(Latency=195, ToolName=recommendation_engine)
end
ECS->>FM: tool_result: recommendations
FM->>ECS: Final response with all data
ECS->>MON: record_chain(pattern=conditional, tools=4, total=3358ms, bottleneck=inventory_check)
MON->>CW: PutMetricData(ChainDuration=3358, ChainPattern=conditional)
ECS->>WS: Stream response
WS->>User: "One Piece Vol 107 is likely in stock<br/>(last checked 5min ago) at $8.99 (10% off!).<br/>You might also like Jujutsu Kaisen Vol 25..."
Tool Call Pattern Categories
stateDiagram-v2
[*] --> IntentClassification: User query arrives
IntentClassification --> SingleTool: Simple factual query
IntentClassification --> SequentialChain: Multi-step lookup
IntentClassification --> ParallelCalls: Independent data needs
IntentClassification --> ConditionalChain: Depends on first result
IntentClassification --> Iterative: Refinement needed
state SingleTool {
[*] --> CallToolA
CallToolA --> ReturnResult
ReturnResult --> [*]
note right of CallToolA
Example: "What's my order status?"
→ order_lookup(order_id)
Expected: 1 tool, ~50ms
end note
}
state SequentialChain {
[*] --> ToolA_Seq
ToolA_Seq --> ToolB_Seq: A output → B input
ToolB_Seq --> ToolC_Seq: B output → C input
ToolC_Seq --> MergeResults_Seq
MergeResults_Seq --> [*]
note right of ToolA_Seq
Example: "Can I return my last order?"
→ order_lookup → return_initiate
Expected: 2-3 tools, sequential
end note
}
state ParallelCalls {
[*] --> ForkPoint
ForkPoint --> ToolA_Par
ForkPoint --> ToolB_Par
ToolA_Par --> JoinPoint
ToolB_Par --> JoinPoint
JoinPoint --> [*]
note right of ForkPoint
Example: "Compare price and stock of OP vol 107"
→ inventory_check ‖ price_lookup
Expected: 2 tools, parallel, ~max(60,55)ms
end note
}
state ConditionalChain {
[*] --> ToolA_Cond
ToolA_Cond --> CheckCondition
CheckCondition --> ToolB_Cond: Condition TRUE
CheckCondition --> ToolC_Cond: Condition FALSE
ToolB_Cond --> [*]
ToolC_Cond --> [*]
note right of CheckCondition
Example: "Is OP 107 in stock? If yes, price?"
→ product_search → if found → price_lookup
→ if not → faq_retrieval
end note
}
state Iterative {
[*] --> FirstCall
FirstCall --> EvaluateResult
EvaluateResult --> RefinedCall: Not sufficient
EvaluateResult --> [*]: Sufficient
RefinedCall --> EvaluateResult
note right of RefinedCall
Example: "Find manga like Naruto under $10"
→ product_search("Naruto similar under 10")
→ product_search("shonen action manga budget")
Expected: 2-4 calls, same tool
end note
}
Pattern-to-Intent Mapping
| User Intent Category | Expected Pattern | Typical Tools | Expected Total Latency |
|---|---|---|---|
| Order status check | Single | order_lookup |
45-80ms |
| Product search | Single | product_search |
120-200ms |
| FAQ / general question | Single | faq_retrieval |
150-250ms |
| "Is X in stock at what price?" | Parallel | inventory_check ‖ price_lookup |
60-100ms |
| Product + recommendations | Sequential Chain | product_search → recommendation_engine |
320-450ms |
| Complex purchase query | Conditional Chain | product_search → inventory_check → price_lookup → recommendation_engine |
400-600ms |
| Refine search results | Iterative | product_search × 2-3 |
240-600ms |
| Return request | Sequential Chain | order_lookup → return_initiate |
125-200ms |
HLD: Tool Performance Data Model
from dataclasses import dataclass, field
from typing import Dict, List, Optional
from datetime import datetime
from enum import Enum
class ToolCallStatus(Enum):
"""Outcome of a single tool invocation."""
SUCCESS = "success"
TIMEOUT = "timeout"
ERROR = "error"
FALLBACK = "fallback"
RETRIED = "retried"
class CallPattern(Enum):
"""How tools were orchestrated within a single user request."""
SINGLE = "single"
SEQUENTIAL_CHAIN = "sequential_chain"
PARALLEL = "parallel"
CONDITIONAL = "conditional"
ITERATIVE = "iterative"
class AnomalyType(Enum):
"""Types of tool usage anomalies the FM can exhibit."""
MISSING_CALL = "missing_call" # FM should have called a tool but didn't
EXCESS_CALL = "excess_call" # FM called a tool unnecessarily
WRONG_TOOL = "wrong_tool" # FM picked the wrong tool for the intent
WRONG_PARAMETERS = "wrong_parameters" # FM sent incorrect/incomplete params
LATENCY_SPIKE = "latency_spike" # Tool took much longer than baseline
REPEATED_FAILURE = "repeated_failure" # Same tool failing across requests
class HealthState(Enum):
"""Health state of a tool (state machine)."""
HEALTHY = "healthy"
DEGRADED = "degraded"
UNHEALTHY = "unhealthy"
FAILED = "failed"
RECOVERING = "recovering"
@dataclass
class ToolCallRecord:
"""Record of a single tool invocation."""
call_id: str
request_id: str # Ties to user request
session_id: str # Ties to WebSocket session
tool_name: str
timestamp: datetime
duration_ms: float
status: ToolCallStatus
input_parameters: Dict[str, str] # Sanitized — no PII or secrets
output_summary: str # Truncated output for debugging
parameter_accuracy: float # 0.0–1.0: did FM provide correct params?
retry_count: int = 0
parent_call_id: Optional[str] = None # For chained calls
error_message: Optional[str] = None
fallback_used: bool = False
trace_id: Optional[str] = None # X-Ray trace ID
@dataclass
class ToolCallChain:
"""A sequence of related tool calls for one user request."""
chain_id: str
request_id: str
pattern: CallPattern
calls: List[ToolCallRecord] = field(default_factory=list)
total_duration_ms: float = 0.0
chain_success: bool = True
bottleneck_tool: Optional[str] = None # The slowest tool in the chain
intent_category: Optional[str] = None # "order_status", "product_inquiry", etc.
@dataclass
class ToolBaseline:
"""Rolling baseline for a specific tool — used for anomaly detection."""
tool_name: str
window_days: int = 7
avg_latency_ms: float = 0.0
p50_latency_ms: float = 0.0
p95_latency_ms: float = 0.0
p99_latency_ms: float = 0.0
avg_calls_per_hour: float = 0.0
success_rate: float = 1.0
timeout_rate: float = 0.0
avg_parameter_accuracy: float = 1.0
avg_retry_rate: float = 0.0
last_updated: datetime = field(default_factory=datetime.utcnow)
@dataclass
class ToolHealthSnapshot:
"""Current health state of a tool."""
tool_name: str
state: HealthState
since: datetime
consecutive_failures: int = 0
last_success: Optional[datetime] = None
degradation_reason: Optional[str] = None
current_latency_ms: float = 0.0
current_success_rate: float = 1.0
LLD: Tool Performance Monitor
Full production code for the decorator-based monitoring framework:
import boto3
import time
import functools
import json
import logging
import uuid
from datetime import datetime, timedelta
from typing import Dict, List, Callable, Any, Optional
from dataclasses import asdict
from enum import Enum
logger = logging.getLogger("manga_assist.tool_monitor")
logger.setLevel(logging.INFO)
class ToolPerformanceMonitor:
"""
Monitors and tracks performance of all FM tool calls in MangaAssist.
Provides:
- Decorator-based instrumentation for zero-code-change monitoring
- CloudWatch metric emission per tool call
- Baseline tracking with rolling 7-day windows
- Anomaly detection against baselines
- Health state machine per tool
- Call chain tracking for multi-tool requests
"""
# Sensitive parameter keys to redact
SENSITIVE_KEYS = frozenset({
"password", "token", "key", "secret", "auth",
"credit_card", "ssn", "card_number", "cvv"
})
def __init__(self, config: dict):
self.cloudwatch = boto3.client("cloudwatch")
self.namespace = config.get("namespace", "MangaAssist/Tools")
self.environment = config.get("environment", "production")
self.anomaly_threshold = config.get("anomaly_threshold", 1.5) # multiplier on p95
self.health_check_window = config.get("health_check_window_sec", 300)
self._baselines: Dict[str, dict] = {}
self._health_states: Dict[str, dict] = {}
self._active_chains: Dict[str, list] = {} # request_id → list of call records
# ------------------------------------------------------------------ #
# Decorator — primary instrumentation entry point #
# ------------------------------------------------------------------ #
def instrument(self, tool_name: str, timeout_ms: float = 5000):
"""Decorator to instrument a tool function with performance monitoring."""
def decorator(func: Callable) -> Callable:
@functools.wraps(func)
def wrapper(*args, **kwargs) -> Any:
call_id = f"{tool_name}-{uuid.uuid4().hex[:12]}"
request_id = kwargs.pop("_request_id", None)
start = time.monotonic()
status = "success"
error_msg = None
result = None
try:
result = func(*args, **kwargs)
except TimeoutError:
status = "timeout"
error_msg = "Tool execution timed out"
raise
except Exception as e:
status = "error"
error_msg = str(e)[:200] # Truncate to avoid log bloat
raise
finally:
duration_ms = (time.monotonic() - start) * 1000
self._record_call(
call_id=call_id,
request_id=request_id,
tool_name=tool_name,
duration_ms=duration_ms,
status=status,
error_message=error_msg,
parameters=self._sanitize_params(kwargs),
)
return result
return wrapper
return decorator
# ------------------------------------------------------------------ #
# Core metric recording #
# ------------------------------------------------------------------ #
def _record_call(
self,
call_id: str,
request_id: Optional[str],
tool_name: str,
duration_ms: float,
status: str,
error_message: Optional[str],
parameters: dict,
):
"""Record a tool call to CloudWatch metrics and structured logs."""
dimensions = [
{"Name": "ToolName", "Value": tool_name},
{"Name": "Environment", "Value": self.environment},
]
metric_data = [
{
"MetricName": "CallCount",
"Value": 1,
"Unit": "Count",
"Dimensions": dimensions,
},
{
"MetricName": "Latency",
"Value": duration_ms,
"Unit": "Milliseconds",
"Dimensions": dimensions,
"StorageResolution": 1, # High-resolution (1-second)
},
{
"MetricName": "Success",
"Value": 1 if status == "success" else 0,
"Unit": "Count",
"Dimensions": dimensions,
},
{
"MetricName": "Error",
"Value": 1 if status == "error" else 0,
"Unit": "Count",
"Dimensions": dimensions,
},
{
"MetricName": "Timeout",
"Value": 1 if status == "timeout" else 0,
"Unit": "Count",
"Dimensions": dimensions,
},
]
self.cloudwatch.put_metric_data(
Namespace=self.namespace,
MetricData=metric_data,
)
# Structured log for CloudWatch Logs Insights queries
log_record = {
"event": "tool_call",
"call_id": call_id,
"request_id": request_id,
"tool_name": tool_name,
"duration_ms": round(duration_ms, 2),
"status": status,
"error_message": error_message,
"parameters": parameters,
"timestamp": datetime.utcnow().isoformat(),
}
logger.info(json.dumps(log_record))
# Track chain if request_id present
if request_id:
self._active_chains.setdefault(request_id, []).append(log_record)
# Baseline comparison
self._check_baseline(tool_name, duration_ms, status)
# Health state update
self._update_health(tool_name, duration_ms, status)
# ------------------------------------------------------------------ #
# Baseline management #
# ------------------------------------------------------------------ #
def update_baseline(self, tool_name: str, baseline: dict):
"""Set or update the rolling baseline for a tool."""
self._baselines[tool_name] = baseline
logger.info(json.dumps({
"event": "baseline_updated",
"tool_name": tool_name,
"baseline": baseline,
}))
def _check_baseline(self, tool_name: str, duration_ms: float, status: str):
"""Compare current call against baseline and emit anomaly if needed."""
baseline = self._baselines.get(tool_name)
if not baseline:
return
p95 = baseline.get("p95_latency_ms", float("inf"))
threshold = p95 * self.anomaly_threshold
if duration_ms > threshold:
self._emit_anomaly(
tool_name,
"latency_spike",
f"Latency {duration_ms:.0f}ms exceeds {self.anomaly_threshold}x "
f"p95 baseline {p95:.0f}ms (threshold: {threshold:.0f}ms)",
)
expected_success_rate = baseline.get("success_rate", 1.0)
if status != "success" and expected_success_rate > 0.95:
self._emit_anomaly(
tool_name,
"unexpected_failure",
f"Tool failed with status={status} but baseline success rate "
f"is {expected_success_rate:.1%}",
)
# ------------------------------------------------------------------ #
# Anomaly emission #
# ------------------------------------------------------------------ #
def _emit_anomaly(self, tool_name: str, anomaly_type: str, description: str):
"""Emit an anomaly metric and log."""
self.cloudwatch.put_metric_data(
Namespace=self.namespace,
MetricData=[
{
"MetricName": "AnomalyDetected",
"Value": 1,
"Unit": "Count",
"Dimensions": [
{"Name": "ToolName", "Value": tool_name},
{"Name": "AnomalyType", "Value": anomaly_type},
],
}
],
)
logger.warning(json.dumps({
"event": "tool_anomaly",
"tool_name": tool_name,
"anomaly_type": anomaly_type,
"description": description,
"timestamp": datetime.utcnow().isoformat(),
}))
# ------------------------------------------------------------------ #
# Health state machine #
# ------------------------------------------------------------------ #
def _update_health(self, tool_name: str, duration_ms: float, status: str):
"""Update the health state machine for a tool."""
state = self._health_states.setdefault(tool_name, {
"state": "healthy",
"consecutive_failures": 0,
"recent_latencies": [],
"recent_statuses": [],
})
state["recent_latencies"].append(duration_ms)
state["recent_statuses"].append(status)
# Keep only last 20 calls for rolling calculation
state["recent_latencies"] = state["recent_latencies"][-20:]
state["recent_statuses"] = state["recent_statuses"][-20:]
recent_success_rate = (
state["recent_statuses"].count("success")
/ len(state["recent_statuses"])
)
avg_recent_latency = (
sum(state["recent_latencies"]) / len(state["recent_latencies"])
)
baseline = self._baselines.get(tool_name, {})
p95 = baseline.get("p95_latency_ms", 1000)
prev_state = state["state"]
if status != "success":
state["consecutive_failures"] += 1
else:
state["consecutive_failures"] = 0
# State transitions
if recent_success_rate < 0.50 or state["consecutive_failures"] >= 5:
state["state"] = "failed"
elif recent_success_rate < 0.90 or avg_recent_latency > p95 * 2:
state["state"] = "unhealthy"
elif recent_success_rate < 0.97 or avg_recent_latency > p95 * 1.3:
if prev_state in ("unhealthy", "failed"):
state["state"] = "recovering"
else:
state["state"] = "degraded"
else:
if prev_state == "recovering":
state["state"] = "healthy" # Recovery complete
else:
state["state"] = "healthy"
# Emit health state metric
if state["state"] != prev_state:
self.cloudwatch.put_metric_data(
Namespace=self.namespace,
MetricData=[{
"MetricName": "HealthStateChange",
"Value": 1,
"Unit": "Count",
"Dimensions": [
{"Name": "ToolName", "Value": tool_name},
{"Name": "FromState", "Value": prev_state},
{"Name": "ToState", "Value": state["state"]},
],
}],
)
logger.warning(json.dumps({
"event": "health_state_change",
"tool_name": tool_name,
"from": prev_state,
"to": state["state"],
"success_rate": round(recent_success_rate, 3),
"avg_latency_ms": round(avg_recent_latency, 1),
}))
def get_health(self, tool_name: str) -> str:
"""Get current health state of a tool."""
return self._health_states.get(tool_name, {}).get("state", "unknown")
# ------------------------------------------------------------------ #
# Call chain tracking #
# ------------------------------------------------------------------ #
def finalize_chain(self, request_id: str, intent_category: str = "unknown") -> dict:
"""Finalize and record a call chain after all tools complete."""
calls = self._active_chains.pop(request_id, [])
if not calls:
return {}
total_duration = sum(c["duration_ms"] for c in calls)
all_success = all(c["status"] == "success" for c in calls)
bottleneck = max(calls, key=lambda c: c["duration_ms"])
pattern = self._classify_pattern(calls)
chain_record = {
"chain_id": f"chain-{uuid.uuid4().hex[:12]}",
"request_id": request_id,
"pattern": pattern,
"tool_count": len(calls),
"total_duration_ms": round(total_duration, 2),
"chain_success": all_success,
"bottleneck_tool": bottleneck["tool_name"],
"bottleneck_duration_ms": round(bottleneck["duration_ms"], 2),
"intent_category": intent_category,
"tools": [c["tool_name"] for c in calls],
}
# Emit chain-level metrics
self.cloudwatch.put_metric_data(
Namespace=self.namespace,
MetricData=[
{
"MetricName": "ChainDuration",
"Value": total_duration,
"Unit": "Milliseconds",
"Dimensions": [
{"Name": "ChainPattern", "Value": pattern},
{"Name": "IntentCategory", "Value": intent_category},
],
},
{
"MetricName": "ChainToolCount",
"Value": len(calls),
"Unit": "Count",
"Dimensions": [
{"Name": "ChainPattern", "Value": pattern},
],
},
],
)
logger.info(json.dumps({"event": "chain_complete", **chain_record}))
return chain_record
def _classify_pattern(self, calls: list) -> str:
"""Classify the call pattern based on observed calls."""
if len(calls) == 1:
return "single"
tool_names = [c["tool_name"] for c in calls]
if len(set(tool_names)) < len(tool_names):
return "iterative" # Same tool called multiple times
return "sequential_chain" # Default for multi-tool
# ------------------------------------------------------------------ #
# Parameter sanitization #
# ------------------------------------------------------------------ #
def _sanitize_params(self, params: dict) -> dict:
"""Remove sensitive data from parameters before logging."""
sanitized = {}
for key, value in params.items():
if any(sensitive in key.lower() for sensitive in self.SENSITIVE_KEYS):
sanitized[key] = "***REDACTED***"
else:
sanitized[key] = str(value)[:100] # Truncate long values
return sanitized
# ====================================================================== #
# Usage example — how tools are instrumented in MangaAssist #
# ====================================================================== #
monitor = ToolPerformanceMonitor({
"namespace": "MangaAssist/Tools",
"environment": "production",
"anomaly_threshold": 1.5,
})
# Set baselines (normally loaded from DynamoDB or CloudWatch statistics)
monitor.update_baseline("product_search", {
"p50_latency_ms": 120,
"p95_latency_ms": 180,
"p99_latency_ms": 350,
"success_rate": 0.995,
})
monitor.update_baseline("inventory_check", {
"p50_latency_ms": 60,
"p95_latency_ms": 90,
"p99_latency_ms": 150,
"success_rate": 0.998,
})
@monitor.instrument("product_search")
def product_search(query: str, category: str = None, top_k: int = 5) -> list:
"""Search OpenSearch for manga products."""
# ... actual OpenSearch query implementation ...
pass
@monitor.instrument("order_lookup")
def order_lookup(order_id: str = None, customer_id: str = None) -> dict:
"""Look up order from DynamoDB."""
# ... actual DynamoDB GetItem implementation ...
pass
@monitor.instrument("inventory_check", timeout_ms=3000)
def inventory_check(product_id: str, warehouse: str = None) -> dict:
"""Check product inventory levels."""
# ... actual DynamoDB query implementation ...
pass
Multi-Agent Coordination Overview
sequenceDiagram
participant User
participant Sup as Supervisor Agent<br/>(Bedrock Claude 3 Sonnet)
participant MON as Tool Performance<br/>Monitor
participant PA as Product Agent<br/>(Bedrock Claude 3 Haiku)
participant OA as Order Agent<br/>(Bedrock Claude 3 Haiku)
participant SA as Support Agent<br/>(Bedrock Claude 3 Haiku)
participant PS as product_search
participant RE as recommendation
participant OL as order_lookup
participant FAQ as faq_retrieval
User->>Sup: "I ordered One Piece Vol 107 last week,<br/>hasn't arrived. Also any similar manga deals?"
Note over Sup: Intent: order_issue + product_recommendation<br/>Route to Order Agent AND Product Agent
rect rgb(230, 240, 255)
Note over Sup,OA: Handoff 1: Order Agent
Sup->>MON: record_handoff(from=supervisor, to=order_agent, context_size=245 tokens)
Sup->>OA: Delegated: "Order issue — One Piece Vol 107, ordered last week"
OA->>OL: order_lookup(customer_id="C-123")
MON-->>MON: instrument(order_lookup, 42ms, success)
OL-->>OA: {order_id: "ORD-789", status: "in_transit", ETA: "2 days"}
OA-->>Sup: "Order ORD-789 is in transit, arriving in 2 days"
MON-->>MON: record_handoff_return(duration=180ms, context_preserved=true)
end
rect rgb(255, 245, 230)
Note over Sup,PA: Handoff 2: Product Agent (parallel)
Sup->>MON: record_handoff(from=supervisor, to=product_agent, context_size=180 tokens)
Sup->>PA: Delegated: "Find similar manga deals to One Piece"
PA->>PS: product_search(query="manga similar to One Piece deals")
MON-->>MON: instrument(product_search, 125ms, success)
PS-->>PA: [Jujutsu Kaisen, Chainsaw Man, ...]
PA->>RE: recommendation_engine(customer_id="C-123", context="One Piece similar")
MON-->>MON: instrument(recommendation, 190ms, success)
RE-->>PA: [Dragon Ball Super Vol 22 (20% off), My Hero Academia Vol 40]
PA-->>Sup: "Similar: JJK ($9.99), Dragon Ball Super ($7.99, 20% off)"
MON-->>MON: record_handoff_return(duration=420ms, context_preserved=true)
end
Sup->>User: "Your One Piece Vol 107 (ORD-789) is in transit — arriving in ~2 days!<br/>Meanwhile, Dragon Ball Super Vol 22 is 20% off at $7.99..."
MON->>MON: finalize_multi_agent_chain<br/>agents=3, tools=3, total=480ms
Multi-Agent Monitoring Points
| Monitoring Point | What We Measure | Why It Matters |
|---|---|---|
| Supervisor routing decision | Which agent(s) chosen, was it correct? | FM routing accuracy |
| Handoff context size | Tokens transferred to specialist | Context transfer completeness |
| Specialist tool selection | Did specialist pick the right tools? | Per-agent tool accuracy |
| Handoff return | Was specialist's answer incorporated correctly? | Result integration quality |
| Cross-agent latency | Overhead of multi-agent vs single-agent | Coordination cost |
| Context loss detection | Did specialist miss context from user query? | Handoff fidelity |
Tool Health State Machine
stateDiagram-v2
[*] --> Healthy
Healthy --> Degraded: Latency > 1.3x p95 baseline<br/>OR success rate 90-97%
Healthy --> Unhealthy: Latency > 2x p95 baseline<br/>OR success rate < 90%
Degraded --> Healthy: Metrics return to baseline<br/>for 5+ consecutive calls
Degraded --> Unhealthy: Latency > 2x p95 baseline<br/>OR success rate drops < 90%
Unhealthy --> Failed: Success rate < 50%<br/>OR 5 consecutive failures
Unhealthy --> Recovering: Success rate improving<br/>AND latency trending down
Failed --> Recovering: At least 1 successful call<br/>AND latency < 2x p95
Recovering --> Healthy: All metrics within baseline<br/>for 10+ consecutive calls
Recovering --> Unhealthy: Metrics deteriorate again
Recovering --> Failed: 3 consecutive failures during recovery
state Healthy {
[*] --> Normal
Normal: ✅ All metrics within baseline
Normal: Latency ≤ p95
Normal: Success rate ≥ 97%
Normal: No alerts active
}
state Degraded {
[*] --> Warning
Warning: ⚠️ Elevated but functional
Warning: Alert: P3 (informational)
Warning: Action: Log + dashboard update
}
state Unhealthy {
[*] --> Critical
Critical: 🔶 Significant degradation
Critical: Alert: P2 (requires attention)
Critical: Action: Increase fallback rate
Critical: Action: Notify on-call team
}
state Failed {
[*] --> Down
Down: 🔴 Tool effectively unavailable
Down: Alert: P1 (immediate action)
Down: Action: Full fallback mode
Down: Action: Page on-call engineer
}
state Recovering {
[*] --> Improving
Improving: 🔄 Trending toward healthy
Improving: Alert: P3 (monitoring)
Improving: Action: Gradually reduce fallback
}
State Transition Thresholds
| From → To | Trigger Condition | Evaluation Window |
|---|---|---|
| Healthy → Degraded | Latency > 1.3× p95 OR success rate 90–97% | Last 20 calls |
| Healthy → Unhealthy | Latency > 2× p95 OR success rate < 90% | Last 20 calls |
| Degraded → Healthy | All metrics within baseline, 5+ consecutive good calls | Last 5 calls |
| Degraded → Unhealthy | Latency > 2× p95 OR success rate < 90% | Last 20 calls |
| Unhealthy → Failed | Success rate < 50% OR 5 consecutive failures | Last 20 calls OR consecutive counter |
| Unhealthy → Recovering | Success rate trending up + latency trending down | Last 10 calls |
| Failed → Recovering | ≥ 1 success AND latency < 2× p95 | Last 5 calls |
| Recovering → Healthy | All metrics within baseline for 10+ calls | Last 10 calls |
| Recovering → Unhealthy | Metrics deteriorate again | Last 10 calls |
| Recovering → Failed | 3 consecutive failures during recovery | Consecutive counter |
Key Design Decisions
| # | Decision | Choice | Rationale | Trade-off |
|---|---|---|---|---|
| 1 | Instrumentation approach | Python decorator (@monitor.instrument) |
Zero-code-change for tool authors; consistent across all tools | Slight overhead per call (~1ms); requires Python for tools |
| 2 | Metric granularity | Per-tool dimensions + per-chain aggregate | Enables drill-down to individual tool AND holistic request view | Higher CloudWatch cost (custom metrics × tools × environments) |
| 3 | Baseline window | 7-day rolling average | Balances stability with responsiveness to real changes | May miss gradual degradation; holiday patterns need seasonal adj |
| 4 | Anomaly threshold | 1.5× p95 for latency; hard thresholds for success rate | Simple, interpretable, low false-positive rate | May miss subtle degradation; not adaptive to traffic spikes |
| 5 | Fallback strategy | Cached stale data with stale_ok flag in response |
Maintains user experience during tool degradation | User may see outdated data; requires cache infrastructure |
| 6 | Multi-agent tracing | X-Ray segments per agent + custom annotations for handoffs | End-to-end visibility across supervisor → specialist calls | X-Ray sampling may miss traces; annotation limit per segment |
| 7 | Parameter sanitization | Keyword-based blocklist redaction | Prevents PII/secrets from reaching CloudWatch Logs | May over-redact (false positives) or miss novel sensitive fields |
| 8 | Health state thresholds | 5-state machine with hysteresis (separate recovering state) | Prevents flapping between healthy/unhealthy on transient issues | More complex than simple threshold; needs tuning per tool |
| 9 | Metric storage resolution | High-resolution (1-second) for latency, standard for counts | Enables sub-minute anomaly detection during incidents | 3× cost vs standard resolution metrics |
| 10 | Chain pattern classification | Heuristic-based (same-tool = iterative, multi-tool = chain) | Simple to implement; covers 90%+ of real patterns | Cannot distinguish parallel vs sequential without timing info |
Cross-References
| Related Document | Relationship |
|---|---|
| 07-agent-performance-framework.md | Agent-level evaluation metrics that complement tool-level monitoring |
| 02-operational-metrics-dashboards.md | CloudWatch dashboard design patterns used for tool metrics visualization |
| 01-performance-degradation-identification.md | Degradation detection methods applied to tool-level baselines |
| 01-alert-systems-design.md | Alert configuration for tool health state transitions |
| 02-baseline-deviation-analysis.md | Deep dive on baseline calculation and deviation scoring for tools |
| 03-multi-tool-chain-analysis.md | Detailed analysis of call chain patterns and bottleneck identification |
Key Takeaways
-
Decorator-based instrumentation is the foundation — wrapping every tool function with
@monitor.instrumentensures consistent telemetry with zero code changes to tool implementations. The decorator captures latency, status, parameters (sanitized), and emits CloudWatch metrics automatically. -
Baselines make anomalies meaningful — raw latency numbers mean nothing without context. A 200ms
recommendation_enginecall is normal; a 200msorder_lookupcall is a 4× p50 spike. Rolling 7-day baselines per tool establish "what normal looks like" and make alerts actionable. -
Multi-agent coordination adds a monitoring layer — when a Supervisor Agent delegates to Product Agent and Order Agent, we need to trace not just the tools each specialist calls, but the handoff quality: was enough context transferred? Did the specialist choose the right tools? This is unique to agentic systems.
-
Tool call chains reveal FM decision quality — tracking whether the FM chose
product_search → inventory_check → price_lookup(correct 3-tool chain) vsproduct_search → product_search → product_search(incorrect iterative retry) is a direct signal of FM tool-use reasoning quality. -
Parameter accuracy is an underrated FM quality signal — if the FM consistently calls
product_search(query="")with empty queries ororder_lookup(order_id="unknown"), the tools will technically succeed but return useless results. Monitoring parameter quality catches silent failures that success-rate metrics miss. -
The health state machine prevents alert flapping — simple threshold-based alerts create noise during transient issues. The 5-state machine (Healthy → Degraded → Unhealthy → Failed → Recovering → Healthy) with hysteresis ensures alerts fire only for sustained issues, and the "Recovering" state prevents premature all-clear signals.
-
Tool performance monitoring bridges FM evaluation and ops — this framework sits at the intersection of "Is the FM making good tool-use decisions?" (evaluation) and "Are the tools operating correctly?" (ops). Both perspectives are required for a complete picture of agentic system health.