Log Analysis, Trace Debugging, and Error Pattern Recognition
MangaAssist context: JP Manga store chatbot on AWS — Bedrock Claude 3 (Sonnet at $3/$15 per 1M tokens input/output, Haiku at $0.25/$1.25), OpenSearch Serverless (vector store), DynamoDB (sessions/products), ECS Fargate (orchestrator), API Gateway WebSocket, ElastiCache Redis. Target: useful answer in under 3 seconds, 1M messages/day scale.
Skill Mapping
| Attribute | Value |
|---|---|
| Certification | AWS Certified AI Practitioner (AIP-C01) |
| Domain | 2 — Development and Implementation of FM Applications |
| Task | 2.5 — Describe methods to integrate FM applications into existing systems |
| Skill | 2.5.6 — Improve troubleshooting efficiency for FM applications |
| Focus Areas | Log correlation across FM calls, trace analysis for latency debugging, common FM error patterns, Q Developer integration |
1. Log Correlation Across FM Calls
1.1 The Correlation Challenge
A single user message in MangaAssist triggers a chain of 5-8 service calls. Without correlation, a log entry from OpenSearch saying "query took 2100ms" is meaningless unless you can tie it back to the specific user request, the Bedrock call it fed into, and the end-to-end latency the user experienced.
Correlation dimensions in MangaAssist:
| Dimension | Scope | Example Value |
|---|---|---|
request_id |
Single user message | req-a1b2c3d4-e5f6-7890 |
session_id |
Conversation (multi-turn) | sess-manga-user-9876 |
user_id |
All sessions for a user | user-jp-tokyo-001 |
trace_id |
X-Ray distributed trace | 1-65a8f7e2-abc123def456 |
conversation_turn |
Position in conversation | 3 (third message) |
model_invocation_id |
Bedrock-generated ID | inv-bedrock-xyz789 |
1.2 LogCorrelator Implementation
import json
import uuid
import time
import threading
from datetime import datetime, timezone
from typing import Optional, Dict, Any, List, Callable, Set
from dataclasses import dataclass, field
from collections import defaultdict
from contextlib import contextmanager
from enum import Enum
class CorrelationLevel(Enum):
"""Levels at which logs can be correlated."""
REQUEST = "request"
SESSION = "session"
USER = "user"
TRACE = "trace"
class ServiceName(Enum):
"""Downstream services in the MangaAssist call chain."""
API_GATEWAY = "api-gateway"
ORCHESTRATOR = "orchestrator"
REDIS_CACHE = "redis-cache"
DYNAMODB_SESSION = "dynamodb-session"
DYNAMODB_PRODUCT = "dynamodb-product"
OPENSEARCH_VECTOR = "opensearch-vector"
BEDROCK_INVOKE = "bedrock-invoke"
WEBSOCKET_SEND = "websocket-send"
@dataclass
class CorrelatedLogEntry:
"""A single log entry enriched with correlation identifiers."""
timestamp: str
request_id: str
session_id: str
user_id: str
trace_id: str
service: str
operation: str
duration_ms: float
status: str
log_level: str
message: str
metadata: Dict[str, Any] = field(default_factory=dict)
parent_span_id: Optional[str] = None
span_id: Optional[str] = None
def to_dict(self) -> Dict[str, Any]:
"""Convert to dictionary for JSON serialization."""
return {
"timestamp": self.timestamp,
"correlation": {
"request_id": self.request_id,
"session_id": self.session_id,
"user_id": self.user_id,
"trace_id": self.trace_id,
"span_id": self.span_id,
"parent_span_id": self.parent_span_id,
},
"service": self.service,
"operation": self.operation,
"duration_ms": self.duration_ms,
"status": self.status,
"log_level": self.log_level,
"message": self.message,
"metadata": self.metadata,
}
@dataclass
class CorrelationContext:
"""
Thread-safe correlation context passed through the call chain.
Created at the API Gateway entry point and propagated through
every downstream service call within the orchestrator.
"""
request_id: str
session_id: str
user_id: str
trace_id: str
conversation_turn: int
start_time: float
_entries: List[CorrelatedLogEntry] = field(default_factory=list)
_lock: threading.Lock = field(default_factory=threading.Lock)
_span_stack: List[str] = field(default_factory=list)
def generate_span_id(self) -> str:
"""Generate a unique span ID for a service call."""
return uuid.uuid4().hex[:16]
def current_span_id(self) -> Optional[str]:
"""Get the current (innermost) span ID."""
return self._span_stack[-1] if self._span_stack else None
def push_span(self, span_id: str) -> None:
"""Push a new span onto the stack."""
self._span_stack.append(span_id)
def pop_span(self) -> Optional[str]:
"""Pop the current span from the stack."""
return self._span_stack.pop() if self._span_stack else None
def add_entry(self, entry: CorrelatedLogEntry) -> None:
"""Thread-safe addition of a log entry."""
with self._lock:
self._entries.append(entry)
def get_entries(self) -> List[CorrelatedLogEntry]:
"""Return all correlated entries for this request."""
with self._lock:
return list(self._entries)
def get_call_chain(self) -> List[Dict[str, Any]]:
"""
Return the ordered call chain with timing for this request.
This is what you would see in an X-Ray trace view — the
waterfall of service calls with their durations.
"""
with self._lock:
sorted_entries = sorted(self._entries, key=lambda e: e.timestamp)
return [
{
"service": e.service,
"operation": e.operation,
"duration_ms": e.duration_ms,
"status": e.status,
"span_id": e.span_id,
"parent_span_id": e.parent_span_id,
}
for e in sorted_entries
]
def elapsed_ms(self) -> float:
"""Milliseconds elapsed since request start."""
return (time.time() - self.start_time) * 1000
class LogCorrelator:
"""
Correlates logs across multiple services for a single FM request chain.
The correlator provides:
1. Context propagation: Ensures request_id, session_id, and trace_id
flow through every downstream call.
2. Span management: Creates parent-child relationships between service
calls for X-Ray-style waterfall visualization.
3. Call chain reconstruction: Assembles the full call chain from
individual log entries for troubleshooting.
4. Cross-request correlation: Links multiple requests in a session
to track conversation-level patterns.
Usage:
correlator = LogCorrelator(service_name="manga-assist-orchestrator")
# At request entry point (API Gateway handler)
ctx = correlator.create_context(
session_id="sess-123",
user_id="user-456",
conversation_turn=3,
)
# In each service call
with correlator.trace_service_call(ctx, ServiceName.REDIS_CACHE, "get") as span:
result = redis_client.get(key)
with correlator.trace_service_call(ctx, ServiceName.BEDROCK_INVOKE, "invoke_model") as span:
response = bedrock.invoke_model(body=body)
# After request completes
chain = ctx.get_call_chain()
correlator.emit_correlation_log(ctx)
"""
def __init__(self, service_name: str):
self.service_name = service_name
self._session_history: Dict[str, List[str]] = defaultdict(list)
self._active_contexts: Dict[str, CorrelationContext] = {}
self._lock = threading.Lock()
def create_context(
self,
session_id: str,
user_id: str,
conversation_turn: int,
trace_id: Optional[str] = None,
request_id: Optional[str] = None,
) -> CorrelationContext:
"""
Create a new correlation context for an incoming request.
Called once at the API Gateway WebSocket handler when a
new user message arrives.
"""
req_id = request_id or f"req-{uuid.uuid4().hex[:12]}"
t_id = trace_id or f"1-{uuid.uuid4().hex[:8]}-{uuid.uuid4().hex[:24]}"
ctx = CorrelationContext(
request_id=req_id,
session_id=session_id,
user_id=user_id,
trace_id=t_id,
conversation_turn=conversation_turn,
start_time=time.time(),
)
with self._lock:
self._active_contexts[req_id] = ctx
self._session_history[session_id].append(req_id)
return ctx
@contextmanager
def trace_service_call(
self,
ctx: CorrelationContext,
service: ServiceName,
operation: str,
metadata: Optional[Dict[str, Any]] = None,
):
"""
Context manager that traces a single downstream service call.
Creates a span with parent-child relationships and records
timing, status, and error information.
"""
span_id = ctx.generate_span_id()
parent_span_id = ctx.current_span_id()
ctx.push_span(span_id)
start = time.time()
status = "SUCCESS"
error_msg = ""
try:
yield span_id
except Exception as e:
status = "ERROR"
error_msg = str(e)
raise
finally:
duration = (time.time() - start) * 1000
ctx.pop_span()
entry = CorrelatedLogEntry(
timestamp=datetime.now(timezone.utc).isoformat(),
request_id=ctx.request_id,
session_id=ctx.session_id,
user_id=ctx.user_id,
trace_id=ctx.trace_id,
service=service.value,
operation=operation,
duration_ms=round(duration, 2),
status=status,
log_level="ERROR" if status == "ERROR" else "INFO",
message=error_msg if error_msg else f"{service.value}:{operation} completed",
metadata=metadata or {},
span_id=span_id,
parent_span_id=parent_span_id,
)
ctx.add_entry(entry)
def emit_correlation_log(self, ctx: CorrelationContext) -> Dict[str, Any]:
"""
Emit a summary correlation log at the end of a request.
This single log entry contains the full call chain and is
optimized for CloudWatch Logs Insights correlation queries.
"""
chain = ctx.get_call_chain()
entries = ctx.get_entries()
total_ms = ctx.elapsed_ms()
has_errors = any(e.status == "ERROR" for e in entries)
error_services = [e.service for e in entries if e.status == "ERROR"]
summary = {
"log_type": "CORRELATION_SUMMARY",
"request_id": ctx.request_id,
"session_id": ctx.session_id,
"user_id": ctx.user_id,
"trace_id": ctx.trace_id,
"conversation_turn": ctx.conversation_turn,
"total_duration_ms": round(total_ms, 2),
"service_call_count": len(entries),
"has_errors": has_errors,
"error_services": error_services,
"call_chain": chain,
"timing_breakdown": {
e.service: e.duration_ms for e in entries
},
"sla_met": total_ms <= 3000,
"timestamp": datetime.now(timezone.utc).isoformat(),
}
# In production, this would be logger.info(json.dumps(summary))
return summary
def get_session_history(self, session_id: str) -> List[str]:
"""Return all request IDs for a session, in order."""
with self._lock:
return list(self._session_history.get(session_id, []))
def find_correlated_errors(
self,
session_id: str,
) -> List[Dict[str, Any]]:
"""
Find error patterns across all requests in a session.
Useful for detecting recurring failures that affect the
user's conversation experience.
"""
request_ids = self.get_session_history(session_id)
errors = []
with self._lock:
for req_id in request_ids:
ctx = self._active_contexts.get(req_id)
if not ctx:
continue
for entry in ctx.get_entries():
if entry.status == "ERROR":
errors.append({
"request_id": req_id,
"conversation_turn": ctx.conversation_turn,
"service": entry.service,
"operation": entry.operation,
"error_message": entry.message,
"timestamp": entry.timestamp,
})
return errors
2. Trace Analysis for Latency Debugging
2.1 The Latency Budget
MangaAssist has a 3-second end-to-end SLA. Here is how the latency budget is typically allocated:
| Component | Budget (ms) | Typical (ms) | Alarm Threshold (ms) |
|---|---|---|---|
| API Gateway overhead | 50 | 15-30 | 100 |
| Orchestrator processing | 100 | 20-50 | 200 |
| Redis cache check | 20 | 2-5 | 50 |
| DynamoDB session lookup | 50 | 10-25 | 100 |
| OpenSearch vector search | 300 | 100-200 | 500 |
| Bedrock InvokeModel | 2300 | 800-1800 | 2500 |
| Response parsing | 50 | 5-15 | 100 |
| WebSocket delivery | 30 | 5-10 | 50 |
| Total | 3000 | 957-2155 | 3000 |
Bedrock consumes 60-77% of the total latency budget. This means that any degradation in other components quickly breaks the SLA.
2.2 LatencyAnalyzer Implementation
import time
import json
import statistics
from datetime import datetime, timezone, timedelta
from typing import Optional, Dict, Any, List, Tuple
from dataclasses import dataclass, field
from collections import defaultdict
from enum import Enum
class LatencyStatus(Enum):
"""Status of a latency measurement relative to its budget."""
WITHIN_BUDGET = "WITHIN_BUDGET"
WARNING = "WARNING" # > 80% of budget
BUDGET_EXCEEDED = "BUDGET_EXCEEDED"
CRITICAL = "CRITICAL" # > 150% of budget
@dataclass
class ComponentLatencyConfig:
"""Latency budget and thresholds for a single component."""
component_name: str
budget_ms: float
warning_threshold_ms: float
alarm_threshold_ms: float
critical_threshold_ms: float
@dataclass
class LatencyMeasurement:
"""A single latency measurement for a component in a request."""
request_id: str
component: str
duration_ms: float
timestamp: str
status: LatencyStatus
budget_utilization_pct: float
metadata: Dict[str, Any] = field(default_factory=dict)
@dataclass
class LatencyReport:
"""Aggregated latency analysis for a time window."""
window_start: str
window_end: str
total_requests: int
sla_met_count: int
sla_met_pct: float
component_stats: Dict[str, Dict[str, float]]
bottleneck_component: str
bottleneck_avg_ms: float
bottleneck_pct_of_total: float
recommendations: List[str]
class LatencyAnalyzer:
"""
Analyzes latency traces to identify bottlenecks and SLA violations
in the MangaAssist FM call chain.
Uses X-Ray trace data and structured logs to:
1. Track latency per component against defined budgets
2. Identify the bottleneck component in slow requests
3. Detect latency trends (degradation over time)
4. Generate optimization recommendations
The analyzer maintains a sliding window of measurements for
statistical analysis (percentiles, trends, correlations).
Usage:
analyzer = LatencyAnalyzer(sla_target_ms=3000)
# Feed it trace data from X-Ray or structured logs
analyzer.record_trace(trace_data)
# Get analysis
report = analyzer.generate_report(window_minutes=60)
bottleneck = analyzer.identify_bottleneck(request_id)
anomalies = analyzer.detect_anomalies(window_minutes=30)
"""
# Default latency budgets for MangaAssist components
DEFAULT_BUDGETS: Dict[str, ComponentLatencyConfig] = {
"api_gateway": ComponentLatencyConfig(
component_name="api_gateway",
budget_ms=50,
warning_threshold_ms=40,
alarm_threshold_ms=100,
critical_threshold_ms=150,
),
"orchestrator": ComponentLatencyConfig(
component_name="orchestrator",
budget_ms=100,
warning_threshold_ms=80,
alarm_threshold_ms=200,
critical_threshold_ms=300,
),
"redis_cache": ComponentLatencyConfig(
component_name="redis_cache",
budget_ms=20,
warning_threshold_ms=15,
alarm_threshold_ms=50,
critical_threshold_ms=100,
),
"dynamodb_session": ComponentLatencyConfig(
component_name="dynamodb_session",
budget_ms=50,
warning_threshold_ms=40,
alarm_threshold_ms=100,
critical_threshold_ms=200,
),
"opensearch_vector": ComponentLatencyConfig(
component_name="opensearch_vector",
budget_ms=300,
warning_threshold_ms=240,
alarm_threshold_ms=500,
critical_threshold_ms=800,
),
"bedrock_invoke": ComponentLatencyConfig(
component_name="bedrock_invoke",
budget_ms=2300,
warning_threshold_ms=1800,
alarm_threshold_ms=2500,
critical_threshold_ms=3000,
),
"response_parsing": ComponentLatencyConfig(
component_name="response_parsing",
budget_ms=50,
warning_threshold_ms=40,
alarm_threshold_ms=100,
critical_threshold_ms=150,
),
"websocket_delivery": ComponentLatencyConfig(
component_name="websocket_delivery",
budget_ms=30,
warning_threshold_ms=25,
alarm_threshold_ms=50,
critical_threshold_ms=100,
),
}
def __init__(
self,
sla_target_ms: float = 3000,
max_history_size: int = 100_000,
budgets: Optional[Dict[str, ComponentLatencyConfig]] = None,
):
self.sla_target_ms = sla_target_ms
self.max_history_size = max_history_size
self.budgets = budgets or self.DEFAULT_BUDGETS
self._measurements: Dict[str, List[LatencyMeasurement]] = defaultdict(list)
self._request_totals: Dict[str, float] = {}
self._request_timestamps: Dict[str, str] = {}
def _classify_latency(
self, component: str, duration_ms: float
) -> Tuple[LatencyStatus, float]:
"""Classify a latency measurement against its budget."""
config = self.budgets.get(component)
if not config:
return LatencyStatus.WITHIN_BUDGET, 0.0
utilization = (duration_ms / config.budget_ms) * 100 if config.budget_ms > 0 else 0
if duration_ms >= config.critical_threshold_ms:
status = LatencyStatus.CRITICAL
elif duration_ms >= config.alarm_threshold_ms:
status = LatencyStatus.BUDGET_EXCEEDED
elif duration_ms >= config.warning_threshold_ms:
status = LatencyStatus.WARNING
else:
status = LatencyStatus.WITHIN_BUDGET
return status, round(utilization, 1)
def record_trace(self, trace_data: Dict[str, Any]) -> List[LatencyMeasurement]:
"""
Record latency measurements from an X-Ray trace or structured log.
Expected trace_data format:
{
"request_id": "req-abc123",
"total_e2e_ms": 2450,
"components": {
"api_gateway": 25,
"orchestrator": 35,
"redis_cache": 3,
"dynamodb_session": 18,
"opensearch_vector": 180,
"bedrock_invoke": 2100,
"response_parsing": 12,
"websocket_delivery": 7,
}
}
"""
request_id = trace_data["request_id"]
total_ms = trace_data.get("total_e2e_ms", 0)
components = trace_data.get("components", {})
now_iso = datetime.now(timezone.utc).isoformat()
self._request_totals[request_id] = total_ms
self._request_timestamps[request_id] = now_iso
measurements = []
for component_name, duration_ms in components.items():
status, utilization = self._classify_latency(component_name, duration_ms)
measurement = LatencyMeasurement(
request_id=request_id,
component=component_name,
duration_ms=duration_ms,
timestamp=now_iso,
status=status,
budget_utilization_pct=utilization,
)
measurements.append(measurement)
self._measurements[component_name].append(measurement)
# Trim history if needed
if len(self._measurements[component_name]) > self.max_history_size:
self._measurements[component_name] = self._measurements[component_name][
-self.max_history_size:
]
return measurements
def identify_bottleneck(self, request_id: str) -> Dict[str, Any]:
"""
Identify the bottleneck component for a specific request.
Returns the component consuming the highest percentage of
the total latency, along with optimization suggestions.
"""
total_ms = self._request_totals.get(request_id, 0)
if total_ms == 0:
return {"error": f"No trace data for request {request_id}"}
component_durations = {}
for comp_name, measurements in self._measurements.items():
for m in measurements:
if m.request_id == request_id:
component_durations[comp_name] = m.duration_ms
break
if not component_durations:
return {"error": f"No component data for request {request_id}"}
bottleneck = max(component_durations, key=component_durations.get)
bottleneck_ms = component_durations[bottleneck]
bottleneck_pct = (bottleneck_ms / total_ms) * 100 if total_ms > 0 else 0
suggestions = self._generate_optimization_suggestions(bottleneck, bottleneck_ms)
return {
"request_id": request_id,
"total_ms": total_ms,
"sla_met": total_ms <= self.sla_target_ms,
"bottleneck": {
"component": bottleneck,
"duration_ms": bottleneck_ms,
"pct_of_total": round(bottleneck_pct, 1),
"status": self._classify_latency(bottleneck, bottleneck_ms)[0].value,
},
"all_components": {
k: {
"duration_ms": v,
"pct_of_total": round((v / total_ms) * 100, 1) if total_ms > 0 else 0,
}
for k, v in sorted(
component_durations.items(), key=lambda x: x[1], reverse=True
)
},
"suggestions": suggestions,
}
def _generate_optimization_suggestions(
self, component: str, duration_ms: float
) -> List[str]:
"""Generate component-specific optimization suggestions."""
suggestions_map = {
"bedrock_invoke": [
"Consider switching to Haiku ($0.25/$1.25) for simple queries to reduce latency by 40-60%",
"Implement InvokeModelWithResponseStream for perceived speed improvement",
"Reduce input tokens by limiting RAG chunks from 5 to 3",
"Add prompt caching to eliminate redundant system prompt processing",
"Check if model-specific provisioned throughput would help",
],
"opensearch_vector": [
"Verify HNSW index parameters (ef_search, m) are tuned for latency vs recall",
"Reduce k from 5 to 3 for faster approximate nearest neighbor search",
"Check if OpenSearch Serverless OCU count needs scaling",
"Add a relevance score threshold to skip low-quality results early",
"Consider pre-computing embeddings for popular manga queries",
],
"redis_cache": [
"Check Redis node CPU utilization — may need larger instance type",
"Verify connection pooling is configured (max_connections=50)",
"Check for hot key patterns causing single-node bottleneck",
"Consider pipeline mode for batch cache operations",
"Ensure VPC endpoint is in same AZ as ECS tasks",
],
"dynamodb_session": [
"Verify DynamoDB table is using on-demand capacity mode for burst handling",
"Check item size — large conversation histories may need compression",
"Consider DAX for microsecond-latency session reads",
"Implement read-through cache pattern with Redis",
"Use projection expressions to fetch only needed attributes",
],
"api_gateway": [
"Check WebSocket connection idle timeout settings",
"Verify integration timeout is not causing premature closure",
"Monitor concurrent connection count against account limits",
"Consider regional endpoint vs edge-optimized for JP traffic",
],
"orchestrator": [
"Profile Python code for CPU-bound bottlenecks",
"Check ECS task CPU/memory allocation — may need scaling",
"Verify async I/O is used for parallel service calls where possible",
"Check for lock contention in shared state",
],
"response_parsing": [
"Optimize JSON parsing — use orjson instead of json for 3-10x speedup",
"Pre-compile regex patterns used in response extraction",
"Check for unnecessary response transformations",
],
"websocket_delivery": [
"Verify API Gateway WebSocket connection management API latency",
"Check message size — compress large responses",
"Ensure connection ID is still valid before sending",
],
}
return suggestions_map.get(component, [
f"Component '{component}' took {duration_ms}ms — review configuration and scaling"
])
def generate_report(self, window_minutes: int = 60) -> LatencyReport:
"""
Generate a comprehensive latency report for a time window.
Aggregates all measurements, calculates percentiles per
component, identifies overall bottleneck, and provides
actionable recommendations.
"""
cutoff = datetime.now(timezone.utc) - timedelta(minutes=window_minutes)
cutoff_iso = cutoff.isoformat()
# Filter measurements within window
component_stats = {}
for comp_name, measurements in self._measurements.items():
recent = [m for m in measurements if m.timestamp >= cutoff_iso]
if not recent:
continue
durations = [m.duration_ms for m in recent]
component_stats[comp_name] = {
"avg_ms": round(statistics.mean(durations), 2),
"p50_ms": round(statistics.median(durations), 2),
"p90_ms": round(
sorted(durations)[int(len(durations) * 0.9)] if durations else 0, 2
),
"p99_ms": round(
sorted(durations)[int(len(durations) * 0.99)] if durations else 0, 2
),
"min_ms": round(min(durations), 2),
"max_ms": round(max(durations), 2),
"count": len(durations),
"budget_exceeded_pct": round(
sum(1 for m in recent if m.status in (LatencyStatus.BUDGET_EXCEEDED, LatencyStatus.CRITICAL))
/ len(recent)
* 100,
1,
),
}
# Calculate SLA compliance
recent_totals = {
rid: total
for rid, total in self._request_totals.items()
if self._request_timestamps.get(rid, "") >= cutoff_iso
}
total_requests = len(recent_totals)
sla_met = sum(1 for t in recent_totals.values() if t <= self.sla_target_ms)
# Identify overall bottleneck
bottleneck_comp = ""
bottleneck_avg = 0.0
for comp, stats in component_stats.items():
if stats["avg_ms"] > bottleneck_avg:
bottleneck_avg = stats["avg_ms"]
bottleneck_comp = comp
avg_total = statistics.mean(recent_totals.values()) if recent_totals else 0
bottleneck_pct = (bottleneck_avg / avg_total * 100) if avg_total > 0 else 0
# Generate recommendations
recommendations = []
for comp, stats in component_stats.items():
if stats["budget_exceeded_pct"] > 5:
recommendations.append(
f"{comp}: {stats['budget_exceeded_pct']}% of requests exceed budget "
f"(avg={stats['avg_ms']}ms, p99={stats['p99_ms']}ms)"
)
if total_requests > 0 and (sla_met / total_requests) < 0.95:
recommendations.insert(
0,
f"SLA compliance is {round(sla_met / total_requests * 100, 1)}% — "
f"below 95% target. Focus on {bottleneck_comp} optimization.",
)
return LatencyReport(
window_start=cutoff_iso,
window_end=datetime.now(timezone.utc).isoformat(),
total_requests=total_requests,
sla_met_count=sla_met,
sla_met_pct=round(sla_met / total_requests * 100, 1) if total_requests > 0 else 0,
component_stats=component_stats,
bottleneck_component=bottleneck_comp,
bottleneck_avg_ms=round(bottleneck_avg, 2),
bottleneck_pct_of_total=round(bottleneck_pct, 1),
recommendations=recommendations,
)
def detect_anomalies(
self,
window_minutes: int = 30,
std_dev_threshold: float = 2.0,
) -> List[Dict[str, Any]]:
"""
Detect latency anomalies using statistical analysis.
A measurement is anomalous if it exceeds the mean + N standard
deviations for its component within the analysis window.
"""
cutoff = datetime.now(timezone.utc) - timedelta(minutes=window_minutes)
cutoff_iso = cutoff.isoformat()
anomalies = []
for comp_name, measurements in self._measurements.items():
recent = [m for m in measurements if m.timestamp >= cutoff_iso]
if len(recent) < 10: # Need minimum samples
continue
durations = [m.duration_ms for m in recent]
mean = statistics.mean(durations)
stdev = statistics.stdev(durations) if len(durations) > 1 else 0
threshold = mean + (std_dev_threshold * stdev)
for m in recent:
if m.duration_ms > threshold:
anomalies.append({
"component": comp_name,
"request_id": m.request_id,
"duration_ms": m.duration_ms,
"mean_ms": round(mean, 2),
"stdev_ms": round(stdev, 2),
"threshold_ms": round(threshold, 2),
"deviation_factor": round(
(m.duration_ms - mean) / stdev if stdev > 0 else 0, 2
),
"timestamp": m.timestamp,
})
anomalies.sort(key=lambda a: a.get("deviation_factor", 0), reverse=True)
return anomalies
3. Common FM Error Patterns and Root Causes
3.1 Pattern Catalog
| # | Pattern Name | Error Signature | Root Cause | Frequency (MangaAssist) | Auto-Remediable? |
|---|---|---|---|---|---|
| 1 | Bedrock Burst Throttle | ThrottlingException in 5-min burst |
Traffic spike exceeds on-demand token/min limit | 2-3x daily at peak | Yes (backoff + model fallback) |
| 2 | Long Context Timeout | ReadTimeoutError when input > 80K tokens |
Large manga catalog context exceeds inference time | Weekly | Yes (truncate + retry) |
| 3 | Stale Embedding Mismatch | Low relevance scores (< 0.3) post-update | Product catalog re-indexed but embedding model version changed | After each reindex | No (requires reindex) |
| 4 | Session Item Size Exceeded | ValidationException on DynamoDB put |
Conversation history exceeds 400KB item limit | Rare (long conversations) | Yes (archive + compress) |
| 5 | Redis Failover Gap | CLUSTERDOWN for 10-30 seconds |
Multi-AZ failover during maintenance or node failure | Monthly | Yes (cache-aside fallback) |
| 6 | Content Filter False Positive | AccessDeniedException for manga terms |
Guardrail overly aggressive on manga genre terminology | 2-5% of queries | No (guardrail config change) |
| 7 | WebSocket Premature Close | GoneException on PostToConnection |
Client disconnected before response delivery | 1-2% of requests | Yes (graceful handling) |
| 8 | Model Access Not Enabled | AccessDeniedException on InvokeModel |
Bedrock model access not enabled for new region/model | After deployments | No (console action needed) |
| 9 | Token Count Estimation Drift | Actual tokens 10-20% higher than estimated | Tokenizer version mismatch between estimator and model | Continuous (minor) | Yes (use model tokenizer) |
| 10 | Concurrent Session Write Conflict | ConditionalCheckFailedException |
Multiple Lambda/ECS tasks writing same session item | Under high load | Yes (optimistic locking) |
3.2 Detailed Pattern: Bedrock Burst Throttle
sequenceDiagram
participant U as Users (burst)
participant O as Orchestrator
participant B as Bedrock
participant D as ErrorPatternDetector
participant R as RemediationEngine
U->>O: 50 concurrent requests
O->>B: InvokeModel (Sonnet)
B-->>O: ThrottlingException
O->>D: detect("ThrottlingException")
D->>D: Match EP-001, confidence=0.95
D->>R: Remediation: RETRY_WITH_BACKOFF
R->>O: Apply exponential backoff (1s, 2s, 4s)
O->>B: Retry InvokeModel (Sonnet)
B-->>O: ThrottlingException (still throttled)
R->>O: Switch to Haiku fallback
O->>B: InvokeModel (Haiku)
B-->>O: Success (200ms faster)
O->>U: Response (with Haiku quality caveat)
Note over D: Log pattern trend: increasing throttle rate
D->>R: Alert: sustained throttle > 5 min
R->>R: Trigger quota increase request
4. Amazon Q Developer Integration for Automated Diagnosis
4.1 AutoDiagnosisEngine
import json
import time
import re
from datetime import datetime, timezone
from typing import Optional, Dict, Any, List, Tuple
from dataclasses import dataclass, field
from enum import Enum
from collections import defaultdict
class DiagnosisConfidence(Enum):
"""Confidence level for automated diagnoses."""
HIGH = "HIGH" # > 90% match, seen before, auto-remediate
MEDIUM = "MEDIUM" # 70-90% match, needs confirmation
LOW = "LOW" # < 70% match, suggest investigation
UNKNOWN = "UNKNOWN" # No pattern match
class DiagnosisStatus(Enum):
"""Status of a diagnosis workflow."""
PENDING = "PENDING"
ANALYZING = "ANALYZING"
DIAGNOSED = "DIAGNOSED"
REMEDIATED = "REMEDIATED"
ESCALATED = "ESCALATED"
FALSE_POSITIVE = "FALSE_POSITIVE"
@dataclass
class DiagnosisResult:
"""Complete result of an automated diagnosis."""
diagnosis_id: str
timestamp: str
request_id: str
error_message: str
error_code: str
confidence: DiagnosisConfidence
status: DiagnosisStatus
pattern_id: Optional[str]
pattern_name: Optional[str]
root_cause: str
impact_assessment: str
remediation_applied: Optional[str]
remediation_success: Optional[bool]
q_developer_query: str
q_developer_response: Optional[str]
time_to_diagnose_ms: float
time_to_remediate_ms: Optional[float]
related_request_ids: List[str]
metadata: Dict[str, Any] = field(default_factory=dict)
@dataclass
class RemediationRule:
"""Rule for automated remediation of known error patterns."""
rule_id: str
pattern_id: str
condition: str
action: str
cooldown_seconds: int
max_auto_remediations_per_hour: int
requires_confirmation: bool
rollback_action: Optional[str]
class AutoDiagnosisEngine:
"""
Automated diagnosis engine for MangaAssist FM application errors.
Combines error pattern detection, log correlation, and trace
analysis to provide automated diagnosis and remediation. Integrates
with Amazon Q Developer for AI-assisted root cause analysis.
The engine operates in three modes:
1. AUTO: Fully automated detection, diagnosis, and remediation
2. ASSISTED: Automated detection and diagnosis, manual remediation
3. MANUAL: Generates Q Developer prompts for human investigation
Workflow:
1. Error detected in logs or X-Ray trace
2. ErrorPatternDetector classifies the error
3. LogCorrelator provides request context
4. LatencyAnalyzer identifies timing anomalies
5. AutoDiagnosisEngine synthesizes a diagnosis
6. If high confidence and auto-remediable: apply fix
7. If low confidence: generate Q Developer prompt for human review
Usage:
engine = AutoDiagnosisEngine(mode="ASSISTED")
result = engine.diagnose(
error_message="ThrottlingException: Rate exceeded",
error_code="ThrottlingException",
request_id="req-abc123",
context={...}
)
if result.confidence == DiagnosisConfidence.HIGH:
engine.auto_remediate(result)
"""
def __init__(
self,
mode: str = "ASSISTED",
max_auto_remediations_per_hour: int = 10,
):
self.mode = mode
self.max_auto_remediations = max_auto_remediations_per_hour
self._remediation_count: int = 0
self._remediation_reset_time: float = time.time()
self._diagnosis_history: List[DiagnosisResult] = []
self._remediation_rules: Dict[str, RemediationRule] = {}
self._false_positive_patterns: Dict[str, int] = defaultdict(int)
self._register_default_rules()
def _register_default_rules(self) -> None:
"""Register default remediation rules for known patterns."""
default_rules = [
RemediationRule(
rule_id="RR-001",
pattern_id="EP-001", # Bedrock Throttling
condition="error_count > 3 in 5 minutes",
action="enable_model_fallback(from='sonnet', to='haiku')",
cooldown_seconds=300,
max_auto_remediations_per_hour=5,
requires_confirmation=False,
rollback_action="disable_model_fallback()",
),
RemediationRule(
rule_id="RR-002",
pattern_id="EP-002", # Bedrock Timeout
condition="timeout_count > 2 for same session",
action="switch_to_streaming_mode()",
cooldown_seconds=60,
max_auto_remediations_per_hour=20,
requires_confirmation=False,
rollback_action="revert_to_sync_mode()",
),
RemediationRule(
rule_id="RR-003",
pattern_id="EP-003", # Token Limit Exceeded
condition="token_count > model_limit * 0.95",
action="truncate_context(keep_recent=5, max_rag_chunks=3)",
cooldown_seconds=0,
max_auto_remediations_per_hour=100,
requires_confirmation=False,
rollback_action=None,
),
RemediationRule(
rule_id="RR-004",
pattern_id="EP-004", # OpenSearch Failure
condition="opensearch_error_count > 1",
action="fallback_to_keyword_search()",
cooldown_seconds=120,
max_auto_remediations_per_hour=10,
requires_confirmation=False,
rollback_action="restore_vector_search()",
),
RemediationRule(
rule_id="RR-005",
pattern_id="EP-006", # Session Corruption
condition="session_error for user",
action="reset_session_with_summary()",
cooldown_seconds=0,
max_auto_remediations_per_hour=50,
requires_confirmation=True,
rollback_action=None,
),
RemediationRule(
rule_id="RR-006",
pattern_id="EP-007", # Redis Cache Failure
condition="redis_connection_error",
action="bypass_cache_for_request()",
cooldown_seconds=0,
max_auto_remediations_per_hour=1000,
requires_confirmation=False,
rollback_action="restore_cache_path()",
),
]
for rule in default_rules:
self._remediation_rules[rule.pattern_id] = rule
def diagnose(
self,
error_message: str,
error_code: str,
request_id: str,
context: Optional[Dict[str, Any]] = None,
) -> DiagnosisResult:
"""
Perform automated diagnosis of an FM application error.
Steps:
1. Pattern matching against known error signatures
2. Context enrichment from correlation data
3. Confidence scoring based on pattern match + context
4. Generate Q Developer query for further analysis
5. Apply remediation if confidence is HIGH and mode allows
"""
t0 = time.time()
ctx = context or {}
# Step 1: Pattern matching (simulated — in production, uses ErrorPatternDetector)
pattern_match = self._match_pattern(error_message, error_code)
# Step 2: Build diagnosis
diagnosis_id = f"diag-{int(time.time())}-{request_id[:8]}"
confidence = self._assess_confidence(pattern_match, ctx)
# Step 3: Generate Q Developer query
q_query = self._generate_q_developer_query(
error_message, error_code, pattern_match, ctx
)
# Step 4: Determine root cause
root_cause = (
pattern_match.get("root_cause", "Unknown — requires manual investigation")
if pattern_match
else "No matching pattern found. Error requires manual investigation."
)
# Step 5: Impact assessment
impact = self._assess_impact(pattern_match, ctx)
diagnosis_time = (time.time() - t0) * 1000
result = DiagnosisResult(
diagnosis_id=diagnosis_id,
timestamp=datetime.now(timezone.utc).isoformat(),
request_id=request_id,
error_message=error_message,
error_code=error_code,
confidence=confidence,
status=DiagnosisStatus.DIAGNOSED,
pattern_id=pattern_match.get("pattern_id") if pattern_match else None,
pattern_name=pattern_match.get("name") if pattern_match else None,
root_cause=root_cause,
impact_assessment=impact,
remediation_applied=None,
remediation_success=None,
q_developer_query=q_query,
q_developer_response=None,
time_to_diagnose_ms=round(diagnosis_time, 2),
time_to_remediate_ms=None,
related_request_ids=ctx.get("related_request_ids", []),
metadata=ctx,
)
self._diagnosis_history.append(result)
return result
def _match_pattern(
self, error_message: str, error_code: str
) -> Optional[Dict[str, Any]]:
"""Match error against known patterns (simplified)."""
patterns = {
"ThrottlingException": {
"pattern_id": "EP-001",
"name": "Bedrock Throttling",
"root_cause": "Request rate exceeds provisioned throughput",
"is_transient": True,
},
"ReadTimeoutError": {
"pattern_id": "EP-002",
"name": "Bedrock Invoke Timeout",
"root_cause": "Model inference exceeds client timeout",
"is_transient": True,
},
"ValidationException": {
"pattern_id": "EP-003",
"name": "Token Limit Exceeded",
"root_cause": "Input tokens exceed model context window",
"is_transient": False,
},
"OpenSearchException": {
"pattern_id": "EP-004",
"name": "OpenSearch Vector Search Failure",
"root_cause": "Vector search index unavailable or query timeout",
"is_transient": True,
},
"AccessDeniedException": {
"pattern_id": "EP-005",
"name": "Content Filter or Permission Error",
"root_cause": "Content policy violation or IAM permission missing",
"is_transient": False,
},
}
if error_code in patterns:
return patterns[error_code]
for code, pattern in patterns.items():
if code.lower() in error_message.lower():
return pattern
return None
def _assess_confidence(
self, pattern_match: Optional[Dict[str, Any]], context: Dict[str, Any]
) -> DiagnosisConfidence:
"""Assess diagnostic confidence based on pattern match and context."""
if not pattern_match:
return DiagnosisConfidence.UNKNOWN
pattern_id = pattern_match.get("pattern_id", "")
# Check for known false positives
if self._false_positive_patterns.get(pattern_id, 0) > 3:
return DiagnosisConfidence.LOW
# High confidence if error code matches and pattern seen before
history_matches = sum(
1
for d in self._diagnosis_history
if d.pattern_id == pattern_id and d.status != DiagnosisStatus.FALSE_POSITIVE
)
if history_matches > 5:
return DiagnosisConfidence.HIGH
elif history_matches > 0:
return DiagnosisConfidence.MEDIUM
else:
return DiagnosisConfidence.MEDIUM
def _generate_q_developer_query(
self,
error_message: str,
error_code: str,
pattern_match: Optional[Dict[str, Any]],
context: Dict[str, Any],
) -> str:
"""
Generate a contextual prompt for Amazon Q Developer.
The prompt includes:
- The error details
- The matched pattern (if any)
- Relevant context (model, tokens, latency)
- Specific question for Q Developer to answer
"""
parts = [
"I am troubleshooting an error in my MangaAssist chatbot application.",
f"Error Code: {error_code}",
f"Error Message: {error_message}",
]
if pattern_match:
parts.append(f"Known Pattern: {pattern_match.get('name', 'Unknown')}")
parts.append(f"Root Cause (suspected): {pattern_match.get('root_cause', 'Unknown')}")
if context.get("model_id"):
parts.append(f"Model: {context['model_id']}")
if context.get("input_tokens"):
parts.append(f"Input Tokens: {context['input_tokens']}")
if context.get("latency_ms"):
parts.append(f"Latency: {context['latency_ms']}ms")
parts.append("")
parts.append(
"The application stack is: ECS Fargate (Python orchestrator) -> "
"Bedrock Claude 3 (Sonnet/Haiku), OpenSearch Serverless (RAG), "
"DynamoDB (sessions), ElastiCache Redis (cache), "
"API Gateway WebSocket. Target SLA: 3 seconds."
)
parts.append("")
parts.append("Questions for Q Developer:")
parts.append("1. What is the most likely root cause?")
parts.append("2. What is the recommended fix?")
parts.append("3. Are there any preventive measures?")
parts.append("4. Could this error be related to other recent issues?")
return "\n".join(parts)
def _assess_impact(
self,
pattern_match: Optional[Dict[str, Any]],
context: Dict[str, Any],
) -> str:
"""Assess the user and business impact of the error."""
if not pattern_match:
return "Unknown impact — requires manual assessment"
pattern_id = pattern_match.get("pattern_id", "")
impact_map = {
"EP-001": "Users may experience 2-5 second delays due to retry backoff. At scale, this affects burst traffic windows. Cost impact: retries consume additional API calls.",
"EP-002": "User receives no response for this message. WebSocket may timeout. Session state remains consistent but user must resend.",
"EP-003": "Request fails before inference. No cost incurred. User sees error message. Long conversations are most affected.",
"EP-004": "FM responds without context, leading to generic or hallucinated answers. Product recommendations will be absent.",
"EP-005": "User query is blocked with generic error. Legitimate manga queries may be incorrectly filtered.",
}
return impact_map.get(pattern_id, "Impact assessment requires manual review")
def auto_remediate(self, diagnosis: DiagnosisResult) -> bool:
"""
Apply automated remediation for a diagnosed error.
Only applies if:
1. Mode is AUTO
2. Confidence is HIGH
3. Remediation rule exists for the pattern
4. Hourly remediation limit not exceeded
5. Rule does not require confirmation
"""
if self.mode not in ("AUTO", "ASSISTED"):
diagnosis.status = DiagnosisStatus.ESCALATED
return False
if diagnosis.confidence != DiagnosisConfidence.HIGH:
diagnosis.status = DiagnosisStatus.ESCALATED
return False
if not diagnosis.pattern_id:
diagnosis.status = DiagnosisStatus.ESCALATED
return False
rule = self._remediation_rules.get(diagnosis.pattern_id)
if not rule:
diagnosis.status = DiagnosisStatus.ESCALATED
return False
# Check rate limit
now = time.time()
if now - self._remediation_reset_time > 3600:
self._remediation_count = 0
self._remediation_reset_time = now
if self._remediation_count >= self.max_auto_remediations:
diagnosis.status = DiagnosisStatus.ESCALATED
return False
if rule.requires_confirmation and self.mode != "AUTO":
diagnosis.status = DiagnosisStatus.ESCALATED
return False
# Apply remediation (in production, these would be real actions)
t0 = time.time()
diagnosis.remediation_applied = rule.action
diagnosis.status = DiagnosisStatus.REMEDIATED
diagnosis.remediation_success = True
diagnosis.time_to_remediate_ms = round((time.time() - t0) * 1000, 2)
self._remediation_count += 1
return True
def mark_false_positive(self, diagnosis_id: str) -> None:
"""
Mark a diagnosis as a false positive.
Used to train the system to avoid incorrect classifications.
After 3 false positives for a pattern, confidence is downgraded.
"""
for d in self._diagnosis_history:
if d.diagnosis_id == diagnosis_id:
d.status = DiagnosisStatus.FALSE_POSITIVE
if d.pattern_id:
self._false_positive_patterns[d.pattern_id] += 1
break
def get_diagnosis_stats(self) -> Dict[str, Any]:
"""Return summary statistics for the diagnosis engine."""
total = len(self._diagnosis_history)
if total == 0:
return {"total_diagnoses": 0}
by_confidence = defaultdict(int)
by_status = defaultdict(int)
by_pattern = defaultdict(int)
avg_diagnosis_time = []
for d in self._diagnosis_history:
by_confidence[d.confidence.value] += 1
by_status[d.status.value] += 1
if d.pattern_id:
by_pattern[d.pattern_id] += 1
avg_diagnosis_time.append(d.time_to_diagnose_ms)
return {
"total_diagnoses": total,
"by_confidence": dict(by_confidence),
"by_status": dict(by_status),
"by_pattern": dict(by_pattern),
"avg_diagnosis_time_ms": round(
sum(avg_diagnosis_time) / len(avg_diagnosis_time), 2
),
"auto_remediations_this_hour": self._remediation_count,
"false_positive_patterns": dict(self._false_positive_patterns),
}
5. Q Developer Integration Workflow
5.1 How Q Developer Fits Into the Troubleshooting Loop
flowchart TD
A[Error Detected<br/>in CloudWatch Logs] --> B{Pattern<br/>Matched?}
B -->|Yes, HIGH confidence| C[Auto-Remediate<br/>via Remediation Rule]
B -->|Yes, MEDIUM confidence| D[Generate Q Developer<br/>Diagnosis Query]
B -->|No match| E[Generate Q Developer<br/>Investigation Query]
C --> F{Remediation<br/>Successful?}
F -->|Yes| G[Log Resolution<br/>Update Dashboard]
F -->|No| D
D --> H[Q Developer<br/>Analyzes Error Context]
E --> H
H --> I[Q Developer Suggests<br/>Root Cause + Fix]
I --> J{Developer<br/>Confirms?}
J -->|Yes, correct| K[Apply Fix<br/>Add to Pattern Registry]
J -->|No, false positive| L[Mark False Positive<br/>Retrain Pattern Detector]
J -->|Partial| M[Refine Diagnosis<br/>Re-query Q Developer]
K --> G
L --> N[Update Error Pattern<br/>Confidence Scoring]
M --> H
style C fill:#c8e6c9,stroke:#2e7d32
style H fill:#e3f2fd,stroke:#1565c0
style K fill:#c8e6c9,stroke:#2e7d32
style L fill:#ffcdd2,stroke:#c62828
5.2 Q Developer Query Templates
| Scenario | Template |
|---|---|
| Bedrock API error | "My Bedrock InvokeModel call to {model_id} failed with {error_code}. Input was {input_tokens} tokens. How do I fix this in my boto3 Python application?" |
| Latency spike | "Bedrock response latency increased from {baseline_ms}ms to {current_ms}ms for Claude 3 Sonnet. What could cause this degradation?" |
| RAG quality drop | "OpenSearch vector search relevance scores dropped from {baseline_score} to {current_score}. My index uses {dimension}D embeddings. What should I check?" |
| Cost anomaly | "My daily Bedrock cost jumped from ${baseline_cost} to ${current_cost}. I'm using Claude 3 Sonnet at 1M messages/day. What could cause this?" |
| Session management error | "DynamoDB {error_code} when writing session data. Item size is {item_size_kb}KB. How do I handle large conversation histories?" |
| Deployment regression | "After deploying version {version}, error rate jumped from {baseline_pct}% to {current_pct}%. The only change was {change_description}." |
6. Integrated Troubleshooting Flow — End-to-End Example
6.1 Scenario: User Reports Slow Manga Recommendation
Timeline:
T+0ms API Gateway receives WebSocket message
T+15ms Orchestrator picks up message
T+18ms Redis cache MISS (key not found)
T+35ms DynamoDB session lookup (conversation history)
T+220ms OpenSearch vector search (manga product embeddings)
T+240ms Orchestrator builds prompt (system + RAG context + history)
T+2900ms Bedrock InvokeModel returns (Claude 3 Sonnet)
T+2920ms Response parsed and formatted
T+2945ms WebSocket response delivered
T+2945ms Total: 2945ms — within 3s SLA but tight
Analysis:
- Bedrock consumed 2660ms (90.3% of total)
- Input tokens: 4,200 (system: 800, RAG: 2,400, history: 1,000)
- Output tokens: 350
- OpenSearch returned 5 chunks with avg relevance 0.72
Diagnosis:
- No error, but latency is within 55ms of SLA breach
- Bedrock latency is high due to 4,200 input tokens
- Recommendation: reduce RAG chunks from 5 to 3, compress history
CloudWatch Logs Insights Query:
fields @timestamp, latency.bedrock_invoke_ms, prompt_metrics.total_input_tokens
| filter metadata.request_id = "req-abc123"
| limit 1
X-Ray Trace: Shows waterfall with Bedrock as 90.3% of total time
Q Developer: "Bedrock latency is 2660ms for 4200 input tokens. How do I reduce this?"
7. Key Takeaways
-
Correlation is the foundation: Without request_id, session_id, and trace_id flowing through every log entry, troubleshooting becomes guesswork. The
LogCorrelatormust be the first thing initialized in the request lifecycle. -
Latency budgets drive alerting: The
LatencyAnalyzerassigns concrete budgets to each component. When Bedrock has 2300ms of 3000ms total, even a 200ms regression in OpenSearch can break the SLA. Monitor budget utilization percentage, not just absolute values. -
Auto-diagnosis reduces MTTR: The
AutoDiagnosisEnginecombines pattern matching, context analysis, and Q Developer integration to reduce mean-time-to-resolution from hours to minutes for known patterns. -
False positive tracking prevents incorrect remediation: When an auto-remediation is wrong, marking it as a false positive degrades that pattern's confidence score, preventing future incorrect actions.
-
Q Developer prompts must be contextual: A generic "fix this error" prompt to Q Developer is useless. The system generates prompts with specific error codes, token counts, latency values, model IDs, and stack details. The more context, the better the suggestion.
-
Statistical anomaly detection catches gradual degradation: Simple threshold alarms miss slow degradation (P99 creeping from 2500ms to 2800ms over a week). The
detect_anomaliesmethod uses standard deviation to flag gradual changes.