LOCAL PREVIEW View on GitHub

Tool Performance Framework Architecture

AWS AIP-C01 Task 4.3 — Skill 4.3.4: Create tool performance frameworks for FM tool operation monitoring
System: MangaAssist e-commerce chatbot (Bedrock Claude 3 Sonnet/Haiku, OpenSearch Serverless, DynamoDB, ECS Fargate, API Gateway WebSocket)


Skill Mapping

Certification Task Skill Focus
AWS AIP-C01 Task 4.3 — Monitor FM tool operations Skill 4.3.4 Create tool performance frameworks

What this means: Foundation models don't just generate text — they use tools. When Bedrock Claude decides to call product_search or order_lookup, we need to monitor how well the FM uses its tools, not just whether the tools themselves work. This is the intersection of FM evaluation and operational monitoring.


Mind Map — Tool Performance Dimensions

mindmap
  root((Tool Performance<br/>Frameworks))
    Call Pattern Tracking
      Invocation Frequency
      Call Chains
      Dependency Mapping
      Temporal Patterns
      Intent-to-Tool Correlation
    Performance Metrics
      Latency p50/p95/p99
      Success Rate
      Timeout Rate
      Parameter Accuracy
      Retry Rate
      Throughput per Tool
    Multi-Agent Coordination
      Handoff Tracing
      Context Transfer Completeness
      Coordination Overhead
      Agent Routing Accuracy
      Cross-Agent Tool Sharing
    Usage Baselines
      Rolling Averages 7d/30d
      Anomaly Types
        Missing Calls
        Excess Calls
        Wrong Tool Selection
      Health State Machine
      Seasonal Adjustment
      Trend Detection
    Tool Observability
      Instrumentation Decorator
      Structured Logging
      Trace Correlation via X-Ray
      Error Categorization
      Parameter Sanitization

MangaAssist Tool Inventory

These are the tools that the MangaAssist FM (Bedrock Claude 3 Sonnet) can invoke via Bedrock Agent action groups:

Tool Name Purpose Expected Latency (p50) Input Parameters Output Format Downstream Dependencies
product_search Search OpenSearch for products by text query or embedding vector 120ms query: str, category: str?, top_k: int=5 List[Product] with title, price, image_url, score OpenSearch Serverless collection
order_lookup Query DynamoDB for order status, tracking, and history 45ms order_id: str OR customer_id: str Order with status, items, tracking_url, ETA DynamoDB Orders table
inventory_check Check real-time product availability and stock levels 60ms product_id: str, warehouse: str? Inventory with in_stock: bool, quantity, restock_date DynamoDB Inventory table
price_lookup Get current pricing including active promotions and discounts 55ms product_id: str, customer_tier: str? Pricing with base_price, discount, final_price, promo_code DynamoDB Pricing table + Promotions table
recommendation_engine Get personalized manga/product recommendations 200ms customer_id: str, context: str?, limit: int=5 List[Recommendation] with product, score, reason OpenSearch k-NN + DynamoDB user profiles
faq_retrieval Retrieve FAQ answers from the RAG knowledge base 150ms question: str, category: str? List[FAQ] with answer, source, confidence OpenSearch Serverless (FAQ index)
return_initiate Start a return/refund process for an order 80ms order_id: str, reason: str, items: List[str] Return with return_id, label_url, status DynamoDB Returns table + SES for email
shipping_estimate Calculate shipping cost and delivery time 70ms product_ids: List[str], address_zip: str Shipping with options, costs, ETAs External shipping API (cached in ElastiCache)

Tool Call Frequency Distribution (Typical Day)

product_search      ████████████████████████████  42%
order_lookup        ████████████████              24%
faq_retrieval       ████████████                  18%
inventory_check     ██████                         9%
price_lookup        ███                            4%
recommendation      █                              2%
return_initiate     ▏                              0.7%
shipping_estimate   ▏                              0.3%

Architecture — Tool Performance Monitoring Framework

graph LR
    subgraph UserLayer["User Layer"]
        U[Customer via WebSocket]
    end

    subgraph FMLayer["FM Decision Layer"]
        B[Bedrock Claude 3 Sonnet]
        TR[Tool Router<br/>Action Group Dispatcher]
    end

    subgraph ToolExecution["Tool Execution Layer"]
        direction TB
        PS[product_search<br/>OpenSearch]:::healthy
        OL[order_lookup<br/>DynamoDB]:::healthy
        IC[inventory_check<br/>DynamoDB]:::healthy
        PL[price_lookup<br/>DynamoDB]:::healthy
        RE[recommendation_engine<br/>OpenSearch k-NN]:::healthy
        FAQ[faq_retrieval<br/>OpenSearch]:::healthy
    end

    subgraph Instrumentation["Instrumentation Layer"]
        DEC[Decorator Wrapper<br/>@monitor.instrument]
        SL[Structured Logger<br/>JSON to CloudWatch Logs]
        XR[X-Ray Trace<br/>Segment per Tool Call]
    end

    subgraph Monitoring["Monitoring & Aggregation"]
        CW[CloudWatch Metrics<br/>Namespace: MangaAssist/Tools]
        CWL[CloudWatch Logs<br/>Insights Queries]
        XRD[X-Ray Service Map<br/>Tool Dependencies]
    end

    subgraph Analysis["Analysis Layer"]
        BL[Baseline Calculator<br/>7-day Rolling Window]
        AD[Anomaly Detector<br/>Statistical + ML]
        PT[Pattern Tracker<br/>Call Chain Analysis]
        HS[Health State Machine<br/>per-Tool Health]
    end

    subgraph Actions["Action Layer"]
        AL[CloudWatch Alarms<br/>per-Tool Thresholds]:::alert
        FB[Auto-Fallback<br/>Degrade Gracefully]:::alert
        SN[SNS Notifications<br/>PagerDuty/Slack]:::alert
        DB[Dashboard<br/>Tool Performance View]
    end

    U -->|query| B
    B -->|select tools| TR
    TR --> PS & OL & IC & PL & RE & FAQ

    PS & OL & IC & PL & RE & FAQ --> DEC
    DEC --> SL & XR
    SL --> CWL
    XR --> XRD
    DEC --> CW

    CW --> BL & AD & PT & HS
    CWL --> PT

    BL --> AD
    AD --> AL
    HS --> AL & FB
    AL --> SN
    PT --> DB
    HS --> DB

    classDef healthy fill:#d4edda,stroke:#28a745,color:#155724
    classDef alert fill:#f8d7da,stroke:#dc3545,color:#721c24

Tool Call Flow — Sequence Diagram

A complete multi-tool call showing real monitoring instrumentation:

sequenceDiagram
    participant User
    participant WS as API Gateway<br/>WebSocket
    participant ECS as ECS Fargate<br/>Orchestrator
    participant FM as Bedrock Claude 3<br/>Sonnet
    participant MON as ToolPerformance<br/>Monitor
    participant PS as product_search<br/>OpenSearch
    participant IC as inventory_check<br/>DynamoDB
    participant PL as price_lookup<br/>DynamoDB
    participant RE as recommendation<br/>OpenSearch k-NN
    participant CW as CloudWatch

    User->>WS: "Is One Piece Vol 107 in stock?<br/>What's the price and similar options?"
    WS->>ECS: Route message
    ECS->>FM: InvokeModel (with tool definitions)

    Note over FM: FM plans: product_search →<br/>inventory_check + price_lookup (parallel) →<br/>recommendation_engine

    rect rgb(220, 240, 220)
        Note over FM,PS: Step 1: product_search
        FM->>ECS: tool_use: product_search(query="One Piece Volume 107")
        ECS->>MON: start_call("product_search", params)
        MON->>MON: Record start timestamp
        ECS->>PS: Search query + embedding
        PS-->>ECS: [{product_id: "OP-107", title: "One Piece Vol 107", ...}]
        ECS->>MON: end_call(duration=115ms, status=success)
        MON->>CW: PutMetricData(Latency=115, ToolName=product_search)
        MON->>MON: Check baseline: 115ms < p95(180ms) ✓
    end

    ECS->>FM: tool_result: product found (product_id=OP-107)

    rect rgb(255, 230, 230)
        Note over FM,IC: Step 2a: inventory_check (TIMEOUT)
        FM->>ECS: tool_use: inventory_check(product_id="OP-107")
        ECS->>MON: start_call("inventory_check", params)
        ECS->>IC: GetItem(PK=OP-107)
        Note over IC: DynamoDB throttled!<br/>3000ms timeout
        IC--xECS: Timeout after 3000ms
        ECS->>MON: end_call(duration=3000ms, status=timeout)
        MON->>CW: PutMetricData(Timeout=1, ToolName=inventory_check)
        MON->>MON: Check baseline: TIMEOUT exceeds p95(90ms) ⚠️
        MON->>CW: PutMetricData(AnomalyDetected=1, AnomalyType=latency_spike)
        MON->>MON: Update health: Healthy → Degraded
        ECS->>ECS: Fallback: return cached inventory<br/>(stale_ok=true, cache_age=5min)
    end

    rect rgb(220, 240, 220)
        Note over FM,PL: Step 2b: price_lookup (parallel with 2a)
        FM->>ECS: tool_use: price_lookup(product_id="OP-107")
        ECS->>MON: start_call("price_lookup", params)
        ECS->>PL: GetItem(PK=OP-107#pricing)
        PL-->>ECS: {base: $9.99, discount: 10%, final: $8.99}
        ECS->>MON: end_call(duration=48ms, status=success)
        MON->>CW: PutMetricData(Latency=48, ToolName=price_lookup)
    end

    ECS->>FM: tool_results: inventory(fallback), pricing(success)

    rect rgb(220, 240, 220)
        Note over FM,RE: Step 3: recommendation_engine
        FM->>ECS: tool_use: recommendation_engine(customer_id="C-123", context="One Piece")
        ECS->>MON: start_call("recommendation_engine", params)
        ECS->>RE: k-NN search + collaborative filtering
        RE-->>ECS: [{Jujutsu Kaisen Vol 25, ...}, {Chainsaw Man Vol 17, ...}]
        ECS->>MON: end_call(duration=195ms, status=success)
        MON->>CW: PutMetricData(Latency=195, ToolName=recommendation_engine)
    end

    ECS->>FM: tool_result: recommendations
    FM->>ECS: Final response with all data
    ECS->>MON: record_chain(pattern=conditional, tools=4, total=3358ms, bottleneck=inventory_check)
    MON->>CW: PutMetricData(ChainDuration=3358, ChainPattern=conditional)
    ECS->>WS: Stream response
    WS->>User: "One Piece Vol 107 is likely in stock<br/>(last checked 5min ago) at $8.99 (10% off!).<br/>You might also like Jujutsu Kaisen Vol 25..."

Tool Call Pattern Categories

stateDiagram-v2
    [*] --> IntentClassification: User query arrives

    IntentClassification --> SingleTool: Simple factual query
    IntentClassification --> SequentialChain: Multi-step lookup
    IntentClassification --> ParallelCalls: Independent data needs
    IntentClassification --> ConditionalChain: Depends on first result
    IntentClassification --> Iterative: Refinement needed

    state SingleTool {
        [*] --> CallToolA
        CallToolA --> ReturnResult
        ReturnResult --> [*]
        note right of CallToolA
            Example: "What's my order status?"
            → order_lookup(order_id)
            Expected: 1 tool, ~50ms
        end note
    }

    state SequentialChain {
        [*] --> ToolA_Seq
        ToolA_Seq --> ToolB_Seq: A output → B input
        ToolB_Seq --> ToolC_Seq: B output → C input
        ToolC_Seq --> MergeResults_Seq
        MergeResults_Seq --> [*]
        note right of ToolA_Seq
            Example: "Can I return my last order?"
            → order_lookup → return_initiate
            Expected: 2-3 tools, sequential
        end note
    }

    state ParallelCalls {
        [*] --> ForkPoint
        ForkPoint --> ToolA_Par
        ForkPoint --> ToolB_Par
        ToolA_Par --> JoinPoint
        ToolB_Par --> JoinPoint
        JoinPoint --> [*]
        note right of ForkPoint
            Example: "Compare price and stock of OP vol 107"
            → inventory_check ‖ price_lookup
            Expected: 2 tools, parallel, ~max(60,55)ms
        end note
    }

    state ConditionalChain {
        [*] --> ToolA_Cond
        ToolA_Cond --> CheckCondition
        CheckCondition --> ToolB_Cond: Condition TRUE
        CheckCondition --> ToolC_Cond: Condition FALSE
        ToolB_Cond --> [*]
        ToolC_Cond --> [*]
        note right of CheckCondition
            Example: "Is OP 107 in stock? If yes, price?"
            → product_search → if found → price_lookup
                              → if not → faq_retrieval
        end note
    }

    state Iterative {
        [*] --> FirstCall
        FirstCall --> EvaluateResult
        EvaluateResult --> RefinedCall: Not sufficient
        EvaluateResult --> [*]: Sufficient
        RefinedCall --> EvaluateResult
        note right of RefinedCall
            Example: "Find manga like Naruto under $10"
            → product_search("Naruto similar under 10")
            → product_search("shonen action manga budget")
            Expected: 2-4 calls, same tool
        end note
    }

Pattern-to-Intent Mapping

User Intent Category Expected Pattern Typical Tools Expected Total Latency
Order status check Single order_lookup 45-80ms
Product search Single product_search 120-200ms
FAQ / general question Single faq_retrieval 150-250ms
"Is X in stock at what price?" Parallel inventory_checkprice_lookup 60-100ms
Product + recommendations Sequential Chain product_searchrecommendation_engine 320-450ms
Complex purchase query Conditional Chain product_searchinventory_checkprice_lookuprecommendation_engine 400-600ms
Refine search results Iterative product_search × 2-3 240-600ms
Return request Sequential Chain order_lookupreturn_initiate 125-200ms

HLD: Tool Performance Data Model

from dataclasses import dataclass, field
from typing import Dict, List, Optional
from datetime import datetime
from enum import Enum


class ToolCallStatus(Enum):
    """Outcome of a single tool invocation."""
    SUCCESS = "success"
    TIMEOUT = "timeout"
    ERROR = "error"
    FALLBACK = "fallback"
    RETRIED = "retried"


class CallPattern(Enum):
    """How tools were orchestrated within a single user request."""
    SINGLE = "single"
    SEQUENTIAL_CHAIN = "sequential_chain"
    PARALLEL = "parallel"
    CONDITIONAL = "conditional"
    ITERATIVE = "iterative"


class AnomalyType(Enum):
    """Types of tool usage anomalies the FM can exhibit."""
    MISSING_CALL = "missing_call"          # FM should have called a tool but didn't
    EXCESS_CALL = "excess_call"            # FM called a tool unnecessarily
    WRONG_TOOL = "wrong_tool"              # FM picked the wrong tool for the intent
    WRONG_PARAMETERS = "wrong_parameters"  # FM sent incorrect/incomplete params
    LATENCY_SPIKE = "latency_spike"        # Tool took much longer than baseline
    REPEATED_FAILURE = "repeated_failure"  # Same tool failing across requests


class HealthState(Enum):
    """Health state of a tool (state machine)."""
    HEALTHY = "healthy"
    DEGRADED = "degraded"
    UNHEALTHY = "unhealthy"
    FAILED = "failed"
    RECOVERING = "recovering"


@dataclass
class ToolCallRecord:
    """Record of a single tool invocation."""
    call_id: str
    request_id: str                        # Ties to user request
    session_id: str                        # Ties to WebSocket session
    tool_name: str
    timestamp: datetime
    duration_ms: float
    status: ToolCallStatus
    input_parameters: Dict[str, str]       # Sanitized — no PII or secrets
    output_summary: str                    # Truncated output for debugging
    parameter_accuracy: float              # 0.0–1.0: did FM provide correct params?
    retry_count: int = 0
    parent_call_id: Optional[str] = None   # For chained calls
    error_message: Optional[str] = None
    fallback_used: bool = False
    trace_id: Optional[str] = None         # X-Ray trace ID


@dataclass
class ToolCallChain:
    """A sequence of related tool calls for one user request."""
    chain_id: str
    request_id: str
    pattern: CallPattern
    calls: List[ToolCallRecord] = field(default_factory=list)
    total_duration_ms: float = 0.0
    chain_success: bool = True
    bottleneck_tool: Optional[str] = None  # The slowest tool in the chain
    intent_category: Optional[str] = None  # "order_status", "product_inquiry", etc.


@dataclass
class ToolBaseline:
    """Rolling baseline for a specific tool — used for anomaly detection."""
    tool_name: str
    window_days: int = 7
    avg_latency_ms: float = 0.0
    p50_latency_ms: float = 0.0
    p95_latency_ms: float = 0.0
    p99_latency_ms: float = 0.0
    avg_calls_per_hour: float = 0.0
    success_rate: float = 1.0
    timeout_rate: float = 0.0
    avg_parameter_accuracy: float = 1.0
    avg_retry_rate: float = 0.0
    last_updated: datetime = field(default_factory=datetime.utcnow)


@dataclass
class ToolHealthSnapshot:
    """Current health state of a tool."""
    tool_name: str
    state: HealthState
    since: datetime
    consecutive_failures: int = 0
    last_success: Optional[datetime] = None
    degradation_reason: Optional[str] = None
    current_latency_ms: float = 0.0
    current_success_rate: float = 1.0

LLD: Tool Performance Monitor

Full production code for the decorator-based monitoring framework:

import boto3
import time
import functools
import json
import logging
import uuid
from datetime import datetime, timedelta
from typing import Dict, List, Callable, Any, Optional
from dataclasses import asdict
from enum import Enum

logger = logging.getLogger("manga_assist.tool_monitor")
logger.setLevel(logging.INFO)


class ToolPerformanceMonitor:
    """
    Monitors and tracks performance of all FM tool calls in MangaAssist.

    Provides:
    - Decorator-based instrumentation for zero-code-change monitoring
    - CloudWatch metric emission per tool call
    - Baseline tracking with rolling 7-day windows
    - Anomaly detection against baselines
    - Health state machine per tool
    - Call chain tracking for multi-tool requests
    """

    # Sensitive parameter keys to redact
    SENSITIVE_KEYS = frozenset({
        "password", "token", "key", "secret", "auth",
        "credit_card", "ssn", "card_number", "cvv"
    })

    def __init__(self, config: dict):
        self.cloudwatch = boto3.client("cloudwatch")
        self.namespace = config.get("namespace", "MangaAssist/Tools")
        self.environment = config.get("environment", "production")
        self.anomaly_threshold = config.get("anomaly_threshold", 1.5)  # multiplier on p95
        self.health_check_window = config.get("health_check_window_sec", 300)

        self._baselines: Dict[str, dict] = {}
        self._health_states: Dict[str, dict] = {}
        self._active_chains: Dict[str, list] = {}  # request_id → list of call records

    # ------------------------------------------------------------------ #
    #  Decorator — primary instrumentation entry point                    #
    # ------------------------------------------------------------------ #

    def instrument(self, tool_name: str, timeout_ms: float = 5000):
        """Decorator to instrument a tool function with performance monitoring."""
        def decorator(func: Callable) -> Callable:
            @functools.wraps(func)
            def wrapper(*args, **kwargs) -> Any:
                call_id = f"{tool_name}-{uuid.uuid4().hex[:12]}"
                request_id = kwargs.pop("_request_id", None)
                start = time.monotonic()
                status = "success"
                error_msg = None
                result = None

                try:
                    result = func(*args, **kwargs)
                except TimeoutError:
                    status = "timeout"
                    error_msg = "Tool execution timed out"
                    raise
                except Exception as e:
                    status = "error"
                    error_msg = str(e)[:200]  # Truncate to avoid log bloat
                    raise
                finally:
                    duration_ms = (time.monotonic() - start) * 1000
                    self._record_call(
                        call_id=call_id,
                        request_id=request_id,
                        tool_name=tool_name,
                        duration_ms=duration_ms,
                        status=status,
                        error_message=error_msg,
                        parameters=self._sanitize_params(kwargs),
                    )
                return result
            return wrapper
        return decorator

    # ------------------------------------------------------------------ #
    #  Core metric recording                                              #
    # ------------------------------------------------------------------ #

    def _record_call(
        self,
        call_id: str,
        request_id: Optional[str],
        tool_name: str,
        duration_ms: float,
        status: str,
        error_message: Optional[str],
        parameters: dict,
    ):
        """Record a tool call to CloudWatch metrics and structured logs."""
        dimensions = [
            {"Name": "ToolName", "Value": tool_name},
            {"Name": "Environment", "Value": self.environment},
        ]

        metric_data = [
            {
                "MetricName": "CallCount",
                "Value": 1,
                "Unit": "Count",
                "Dimensions": dimensions,
            },
            {
                "MetricName": "Latency",
                "Value": duration_ms,
                "Unit": "Milliseconds",
                "Dimensions": dimensions,
                "StorageResolution": 1,  # High-resolution (1-second)
            },
            {
                "MetricName": "Success",
                "Value": 1 if status == "success" else 0,
                "Unit": "Count",
                "Dimensions": dimensions,
            },
            {
                "MetricName": "Error",
                "Value": 1 if status == "error" else 0,
                "Unit": "Count",
                "Dimensions": dimensions,
            },
            {
                "MetricName": "Timeout",
                "Value": 1 if status == "timeout" else 0,
                "Unit": "Count",
                "Dimensions": dimensions,
            },
        ]

        self.cloudwatch.put_metric_data(
            Namespace=self.namespace,
            MetricData=metric_data,
        )

        # Structured log for CloudWatch Logs Insights queries
        log_record = {
            "event": "tool_call",
            "call_id": call_id,
            "request_id": request_id,
            "tool_name": tool_name,
            "duration_ms": round(duration_ms, 2),
            "status": status,
            "error_message": error_message,
            "parameters": parameters,
            "timestamp": datetime.utcnow().isoformat(),
        }
        logger.info(json.dumps(log_record))

        # Track chain if request_id present
        if request_id:
            self._active_chains.setdefault(request_id, []).append(log_record)

        # Baseline comparison
        self._check_baseline(tool_name, duration_ms, status)

        # Health state update
        self._update_health(tool_name, duration_ms, status)

    # ------------------------------------------------------------------ #
    #  Baseline management                                                #
    # ------------------------------------------------------------------ #

    def update_baseline(self, tool_name: str, baseline: dict):
        """Set or update the rolling baseline for a tool."""
        self._baselines[tool_name] = baseline
        logger.info(json.dumps({
            "event": "baseline_updated",
            "tool_name": tool_name,
            "baseline": baseline,
        }))

    def _check_baseline(self, tool_name: str, duration_ms: float, status: str):
        """Compare current call against baseline and emit anomaly if needed."""
        baseline = self._baselines.get(tool_name)
        if not baseline:
            return

        p95 = baseline.get("p95_latency_ms", float("inf"))
        threshold = p95 * self.anomaly_threshold

        if duration_ms > threshold:
            self._emit_anomaly(
                tool_name,
                "latency_spike",
                f"Latency {duration_ms:.0f}ms exceeds {self.anomaly_threshold}x "
                f"p95 baseline {p95:.0f}ms (threshold: {threshold:.0f}ms)",
            )

        expected_success_rate = baseline.get("success_rate", 1.0)
        if status != "success" and expected_success_rate > 0.95:
            self._emit_anomaly(
                tool_name,
                "unexpected_failure",
                f"Tool failed with status={status} but baseline success rate "
                f"is {expected_success_rate:.1%}",
            )

    # ------------------------------------------------------------------ #
    #  Anomaly emission                                                   #
    # ------------------------------------------------------------------ #

    def _emit_anomaly(self, tool_name: str, anomaly_type: str, description: str):
        """Emit an anomaly metric and log."""
        self.cloudwatch.put_metric_data(
            Namespace=self.namespace,
            MetricData=[
                {
                    "MetricName": "AnomalyDetected",
                    "Value": 1,
                    "Unit": "Count",
                    "Dimensions": [
                        {"Name": "ToolName", "Value": tool_name},
                        {"Name": "AnomalyType", "Value": anomaly_type},
                    ],
                }
            ],
        )
        logger.warning(json.dumps({
            "event": "tool_anomaly",
            "tool_name": tool_name,
            "anomaly_type": anomaly_type,
            "description": description,
            "timestamp": datetime.utcnow().isoformat(),
        }))

    # ------------------------------------------------------------------ #
    #  Health state machine                                               #
    # ------------------------------------------------------------------ #

    def _update_health(self, tool_name: str, duration_ms: float, status: str):
        """Update the health state machine for a tool."""
        state = self._health_states.setdefault(tool_name, {
            "state": "healthy",
            "consecutive_failures": 0,
            "recent_latencies": [],
            "recent_statuses": [],
        })

        state["recent_latencies"].append(duration_ms)
        state["recent_statuses"].append(status)

        # Keep only last 20 calls for rolling calculation
        state["recent_latencies"] = state["recent_latencies"][-20:]
        state["recent_statuses"] = state["recent_statuses"][-20:]

        recent_success_rate = (
            state["recent_statuses"].count("success")
            / len(state["recent_statuses"])
        )
        avg_recent_latency = (
            sum(state["recent_latencies"]) / len(state["recent_latencies"])
        )

        baseline = self._baselines.get(tool_name, {})
        p95 = baseline.get("p95_latency_ms", 1000)
        prev_state = state["state"]

        if status != "success":
            state["consecutive_failures"] += 1
        else:
            state["consecutive_failures"] = 0

        # State transitions
        if recent_success_rate < 0.50 or state["consecutive_failures"] >= 5:
            state["state"] = "failed"
        elif recent_success_rate < 0.90 or avg_recent_latency > p95 * 2:
            state["state"] = "unhealthy"
        elif recent_success_rate < 0.97 or avg_recent_latency > p95 * 1.3:
            if prev_state in ("unhealthy", "failed"):
                state["state"] = "recovering"
            else:
                state["state"] = "degraded"
        else:
            if prev_state == "recovering":
                state["state"] = "healthy"  # Recovery complete
            else:
                state["state"] = "healthy"

        # Emit health state metric
        if state["state"] != prev_state:
            self.cloudwatch.put_metric_data(
                Namespace=self.namespace,
                MetricData=[{
                    "MetricName": "HealthStateChange",
                    "Value": 1,
                    "Unit": "Count",
                    "Dimensions": [
                        {"Name": "ToolName", "Value": tool_name},
                        {"Name": "FromState", "Value": prev_state},
                        {"Name": "ToState", "Value": state["state"]},
                    ],
                }],
            )
            logger.warning(json.dumps({
                "event": "health_state_change",
                "tool_name": tool_name,
                "from": prev_state,
                "to": state["state"],
                "success_rate": round(recent_success_rate, 3),
                "avg_latency_ms": round(avg_recent_latency, 1),
            }))

    def get_health(self, tool_name: str) -> str:
        """Get current health state of a tool."""
        return self._health_states.get(tool_name, {}).get("state", "unknown")

    # ------------------------------------------------------------------ #
    #  Call chain tracking                                                #
    # ------------------------------------------------------------------ #

    def finalize_chain(self, request_id: str, intent_category: str = "unknown") -> dict:
        """Finalize and record a call chain after all tools complete."""
        calls = self._active_chains.pop(request_id, [])
        if not calls:
            return {}

        total_duration = sum(c["duration_ms"] for c in calls)
        all_success = all(c["status"] == "success" for c in calls)
        bottleneck = max(calls, key=lambda c: c["duration_ms"])
        pattern = self._classify_pattern(calls)

        chain_record = {
            "chain_id": f"chain-{uuid.uuid4().hex[:12]}",
            "request_id": request_id,
            "pattern": pattern,
            "tool_count": len(calls),
            "total_duration_ms": round(total_duration, 2),
            "chain_success": all_success,
            "bottleneck_tool": bottleneck["tool_name"],
            "bottleneck_duration_ms": round(bottleneck["duration_ms"], 2),
            "intent_category": intent_category,
            "tools": [c["tool_name"] for c in calls],
        }

        # Emit chain-level metrics
        self.cloudwatch.put_metric_data(
            Namespace=self.namespace,
            MetricData=[
                {
                    "MetricName": "ChainDuration",
                    "Value": total_duration,
                    "Unit": "Milliseconds",
                    "Dimensions": [
                        {"Name": "ChainPattern", "Value": pattern},
                        {"Name": "IntentCategory", "Value": intent_category},
                    ],
                },
                {
                    "MetricName": "ChainToolCount",
                    "Value": len(calls),
                    "Unit": "Count",
                    "Dimensions": [
                        {"Name": "ChainPattern", "Value": pattern},
                    ],
                },
            ],
        )

        logger.info(json.dumps({"event": "chain_complete", **chain_record}))
        return chain_record

    def _classify_pattern(self, calls: list) -> str:
        """Classify the call pattern based on observed calls."""
        if len(calls) == 1:
            return "single"
        tool_names = [c["tool_name"] for c in calls]
        if len(set(tool_names)) < len(tool_names):
            return "iterative"   # Same tool called multiple times
        return "sequential_chain"  # Default for multi-tool

    # ------------------------------------------------------------------ #
    #  Parameter sanitization                                             #
    # ------------------------------------------------------------------ #

    def _sanitize_params(self, params: dict) -> dict:
        """Remove sensitive data from parameters before logging."""
        sanitized = {}
        for key, value in params.items():
            if any(sensitive in key.lower() for sensitive in self.SENSITIVE_KEYS):
                sanitized[key] = "***REDACTED***"
            else:
                sanitized[key] = str(value)[:100]  # Truncate long values
        return sanitized


# ====================================================================== #
#  Usage example — how tools are instrumented in MangaAssist              #
# ====================================================================== #

monitor = ToolPerformanceMonitor({
    "namespace": "MangaAssist/Tools",
    "environment": "production",
    "anomaly_threshold": 1.5,
})

# Set baselines (normally loaded from DynamoDB or CloudWatch statistics)
monitor.update_baseline("product_search", {
    "p50_latency_ms": 120,
    "p95_latency_ms": 180,
    "p99_latency_ms": 350,
    "success_rate": 0.995,
})
monitor.update_baseline("inventory_check", {
    "p50_latency_ms": 60,
    "p95_latency_ms": 90,
    "p99_latency_ms": 150,
    "success_rate": 0.998,
})


@monitor.instrument("product_search")
def product_search(query: str, category: str = None, top_k: int = 5) -> list:
    """Search OpenSearch for manga products."""
    # ... actual OpenSearch query implementation ...
    pass


@monitor.instrument("order_lookup")
def order_lookup(order_id: str = None, customer_id: str = None) -> dict:
    """Look up order from DynamoDB."""
    # ... actual DynamoDB GetItem implementation ...
    pass


@monitor.instrument("inventory_check", timeout_ms=3000)
def inventory_check(product_id: str, warehouse: str = None) -> dict:
    """Check product inventory levels."""
    # ... actual DynamoDB query implementation ...
    pass

Multi-Agent Coordination Overview

sequenceDiagram
    participant User
    participant Sup as Supervisor Agent<br/>(Bedrock Claude 3 Sonnet)
    participant MON as Tool Performance<br/>Monitor
    participant PA as Product Agent<br/>(Bedrock Claude 3 Haiku)
    participant OA as Order Agent<br/>(Bedrock Claude 3 Haiku)
    participant SA as Support Agent<br/>(Bedrock Claude 3 Haiku)
    participant PS as product_search
    participant RE as recommendation
    participant OL as order_lookup
    participant FAQ as faq_retrieval

    User->>Sup: "I ordered One Piece Vol 107 last week,<br/>hasn't arrived. Also any similar manga deals?"

    Note over Sup: Intent: order_issue + product_recommendation<br/>Route to Order Agent AND Product Agent

    rect rgb(230, 240, 255)
        Note over Sup,OA: Handoff 1: Order Agent
        Sup->>MON: record_handoff(from=supervisor, to=order_agent, context_size=245 tokens)
        Sup->>OA: Delegated: "Order issue — One Piece Vol 107, ordered last week"
        OA->>OL: order_lookup(customer_id="C-123")
        MON-->>MON: instrument(order_lookup, 42ms, success)
        OL-->>OA: {order_id: "ORD-789", status: "in_transit", ETA: "2 days"}
        OA-->>Sup: "Order ORD-789 is in transit, arriving in 2 days"
        MON-->>MON: record_handoff_return(duration=180ms, context_preserved=true)
    end

    rect rgb(255, 245, 230)
        Note over Sup,PA: Handoff 2: Product Agent (parallel)
        Sup->>MON: record_handoff(from=supervisor, to=product_agent, context_size=180 tokens)
        Sup->>PA: Delegated: "Find similar manga deals to One Piece"
        PA->>PS: product_search(query="manga similar to One Piece deals")
        MON-->>MON: instrument(product_search, 125ms, success)
        PS-->>PA: [Jujutsu Kaisen, Chainsaw Man, ...]
        PA->>RE: recommendation_engine(customer_id="C-123", context="One Piece similar")
        MON-->>MON: instrument(recommendation, 190ms, success)
        RE-->>PA: [Dragon Ball Super Vol 22 (20% off), My Hero Academia Vol 40]
        PA-->>Sup: "Similar: JJK ($9.99), Dragon Ball Super ($7.99, 20% off)"
        MON-->>MON: record_handoff_return(duration=420ms, context_preserved=true)
    end

    Sup->>User: "Your One Piece Vol 107 (ORD-789) is in transit — arriving in ~2 days!<br/>Meanwhile, Dragon Ball Super Vol 22 is 20% off at $7.99..."

    MON->>MON: finalize_multi_agent_chain<br/>agents=3, tools=3, total=480ms

Multi-Agent Monitoring Points

Monitoring Point What We Measure Why It Matters
Supervisor routing decision Which agent(s) chosen, was it correct? FM routing accuracy
Handoff context size Tokens transferred to specialist Context transfer completeness
Specialist tool selection Did specialist pick the right tools? Per-agent tool accuracy
Handoff return Was specialist's answer incorporated correctly? Result integration quality
Cross-agent latency Overhead of multi-agent vs single-agent Coordination cost
Context loss detection Did specialist miss context from user query? Handoff fidelity

Tool Health State Machine

stateDiagram-v2
    [*] --> Healthy

    Healthy --> Degraded: Latency > 1.3x p95 baseline<br/>OR success rate 90-97%
    Healthy --> Unhealthy: Latency > 2x p95 baseline<br/>OR success rate < 90%

    Degraded --> Healthy: Metrics return to baseline<br/>for 5+ consecutive calls
    Degraded --> Unhealthy: Latency > 2x p95 baseline<br/>OR success rate drops < 90%

    Unhealthy --> Failed: Success rate < 50%<br/>OR 5 consecutive failures
    Unhealthy --> Recovering: Success rate improving<br/>AND latency trending down

    Failed --> Recovering: At least 1 successful call<br/>AND latency < 2x p95

    Recovering --> Healthy: All metrics within baseline<br/>for 10+ consecutive calls
    Recovering --> Unhealthy: Metrics deteriorate again
    Recovering --> Failed: 3 consecutive failures during recovery

    state Healthy {
        [*] --> Normal
        Normal: ✅ All metrics within baseline
        Normal: Latency ≤ p95
        Normal: Success rate ≥ 97%
        Normal: No alerts active
    }

    state Degraded {
        [*] --> Warning
        Warning: ⚠️ Elevated but functional
        Warning: Alert: P3 (informational)
        Warning: Action: Log + dashboard update
    }

    state Unhealthy {
        [*] --> Critical
        Critical: 🔶 Significant degradation
        Critical: Alert: P2 (requires attention)
        Critical: Action: Increase fallback rate
        Critical: Action: Notify on-call team
    }

    state Failed {
        [*] --> Down
        Down: 🔴 Tool effectively unavailable
        Down: Alert: P1 (immediate action)
        Down: Action: Full fallback mode
        Down: Action: Page on-call engineer
    }

    state Recovering {
        [*] --> Improving
        Improving: 🔄 Trending toward healthy
        Improving: Alert: P3 (monitoring)
        Improving: Action: Gradually reduce fallback
    }

State Transition Thresholds

From → To Trigger Condition Evaluation Window
Healthy → Degraded Latency > 1.3× p95 OR success rate 90–97% Last 20 calls
Healthy → Unhealthy Latency > 2× p95 OR success rate < 90% Last 20 calls
Degraded → Healthy All metrics within baseline, 5+ consecutive good calls Last 5 calls
Degraded → Unhealthy Latency > 2× p95 OR success rate < 90% Last 20 calls
Unhealthy → Failed Success rate < 50% OR 5 consecutive failures Last 20 calls OR consecutive counter
Unhealthy → Recovering Success rate trending up + latency trending down Last 10 calls
Failed → Recovering ≥ 1 success AND latency < 2× p95 Last 5 calls
Recovering → Healthy All metrics within baseline for 10+ calls Last 10 calls
Recovering → Unhealthy Metrics deteriorate again Last 10 calls
Recovering → Failed 3 consecutive failures during recovery Consecutive counter

Key Design Decisions

# Decision Choice Rationale Trade-off
1 Instrumentation approach Python decorator (@monitor.instrument) Zero-code-change for tool authors; consistent across all tools Slight overhead per call (~1ms); requires Python for tools
2 Metric granularity Per-tool dimensions + per-chain aggregate Enables drill-down to individual tool AND holistic request view Higher CloudWatch cost (custom metrics × tools × environments)
3 Baseline window 7-day rolling average Balances stability with responsiveness to real changes May miss gradual degradation; holiday patterns need seasonal adj
4 Anomaly threshold 1.5× p95 for latency; hard thresholds for success rate Simple, interpretable, low false-positive rate May miss subtle degradation; not adaptive to traffic spikes
5 Fallback strategy Cached stale data with stale_ok flag in response Maintains user experience during tool degradation User may see outdated data; requires cache infrastructure
6 Multi-agent tracing X-Ray segments per agent + custom annotations for handoffs End-to-end visibility across supervisor → specialist calls X-Ray sampling may miss traces; annotation limit per segment
7 Parameter sanitization Keyword-based blocklist redaction Prevents PII/secrets from reaching CloudWatch Logs May over-redact (false positives) or miss novel sensitive fields
8 Health state thresholds 5-state machine with hysteresis (separate recovering state) Prevents flapping between healthy/unhealthy on transient issues More complex than simple threshold; needs tuning per tool
9 Metric storage resolution High-resolution (1-second) for latency, standard for counts Enables sub-minute anomaly detection during incidents 3× cost vs standard resolution metrics
10 Chain pattern classification Heuristic-based (same-tool = iterative, multi-tool = chain) Simple to implement; covers 90%+ of real patterns Cannot distinguish parallel vs sequential without timing info

Cross-References

Related Document Relationship
07-agent-performance-framework.md Agent-level evaluation metrics that complement tool-level monitoring
02-operational-metrics-dashboards.md CloudWatch dashboard design patterns used for tool metrics visualization
01-performance-degradation-identification.md Degradation detection methods applied to tool-level baselines
01-alert-systems-design.md Alert configuration for tool health state transitions
02-baseline-deviation-analysis.md Deep dive on baseline calculation and deviation scoring for tools
03-multi-tool-chain-analysis.md Detailed analysis of call chain patterns and bottleneck identification

Key Takeaways

  1. Decorator-based instrumentation is the foundation — wrapping every tool function with @monitor.instrument ensures consistent telemetry with zero code changes to tool implementations. The decorator captures latency, status, parameters (sanitized), and emits CloudWatch metrics automatically.

  2. Baselines make anomalies meaningful — raw latency numbers mean nothing without context. A 200ms recommendation_engine call is normal; a 200ms order_lookup call is a 4× p50 spike. Rolling 7-day baselines per tool establish "what normal looks like" and make alerts actionable.

  3. Multi-agent coordination adds a monitoring layer — when a Supervisor Agent delegates to Product Agent and Order Agent, we need to trace not just the tools each specialist calls, but the handoff quality: was enough context transferred? Did the specialist choose the right tools? This is unique to agentic systems.

  4. Tool call chains reveal FM decision quality — tracking whether the FM chose product_search → inventory_check → price_lookup (correct 3-tool chain) vs product_search → product_search → product_search (incorrect iterative retry) is a direct signal of FM tool-use reasoning quality.

  5. Parameter accuracy is an underrated FM quality signal — if the FM consistently calls product_search(query="") with empty queries or order_lookup(order_id="unknown"), the tools will technically succeed but return useless results. Monitoring parameter quality catches silent failures that success-rate metrics miss.

  6. The health state machine prevents alert flapping — simple threshold-based alerts create noise during transient issues. The 5-state machine (Healthy → Degraded → Unhealthy → Failed → Recovering → Healthy) with hysteresis ensures alerts fire only for sustained issues, and the "Recovering" state prevents premature all-clear signals.

  7. Tool performance monitoring bridges FM evaluation and ops — this framework sits at the intersection of "Is the FM making good tool-use decisions?" (evaluation) and "Are the tools operating correctly?" (ops). Both perspectives are required for a complete picture of agentic system health.