Troubleshooting Efficiency Architecture for FM Applications
MangaAssist context: JP Manga store chatbot on AWS — Bedrock Claude 3 (Sonnet at $3/$15 per 1M tokens input/output, Haiku at $0.25/$1.25), OpenSearch Serverless (vector store), DynamoDB (sessions/products), ECS Fargate (orchestrator), API Gateway WebSocket, ElastiCache Redis. Target: useful answer in under 3 seconds, 1M messages/day scale.
Skill Mapping
| Attribute | Value |
|---|---|
| Certification | AWS Certified AI Practitioner (AIP-C01) |
| Domain | 2 — Development and Implementation of FM Applications |
| Task | 2.5 — Describe methods to integrate FM applications into existing systems |
| Skill | 2.5.6 — Improve troubleshooting efficiency for FM applications |
| Focus Areas | CloudWatch Logs Insights for prompt/response analysis, X-Ray for FM API call tracing, Amazon Q Developer for GenAI-specific error pattern recognition |
1. Troubleshooting Efficiency Mindmap
mindmap
root((FM Troubleshooting<br/>Efficiency))
CloudWatch Logs Insights
Prompt/Response Analysis
Token Count Tracking
Latency Distribution
Error Rate Correlation
Log Query Patterns
fields @timestamp, @message
filter by model_id
stats aggregation
Structured Log Parsing
JSON Path Extraction
Regex Pattern Matching
Metric Filters
Dashboards
Real-Time Error Panels
Latency Percentile Graphs
Token Usage Heatmaps
X-Ray Tracing
FM API Call Tracing
Bedrock InvokeModel Segments
OpenSearch Vector Query Subsegments
DynamoDB Session Lookup Subsegments
Service Map
End-to-End Call Visualization
Latency Hotspot Detection
Error Propagation Paths
Trace Analysis
Cold Start Identification
Throttle Pattern Detection
Retry Storm Analysis
Sampling Rules
Dynamic Rate Adjustment
Error-Biased Sampling
High-Latency Capture
Amazon Q Developer
GenAI Error Pattern Recognition
Model Timeout Classification
Token Limit Violations
Content Filter Triggers
Code Analysis
SDK Misconfiguration Detection
Prompt Template Errors
Response Parsing Failures
Automated Suggestions
Fix Recommendations
Performance Optimization Tips
Cost Reduction Patterns
Learning Loop
Historical Pattern Matching
Team Knowledge Aggregation
Runbook Automation
Error Taxonomy
Model Errors
ThrottlingException
ModelTimeoutException
ValidationException
Infrastructure Errors
NetworkTimeout
ServiceUnavailable
MemoryExhaustion
Application Errors
PromptTooLarge
ResponseParseFailure
ContextWindowExceeded
Data Errors
VectorSearchEmpty
SessionCorruption
CacheStaleData
2. Architecture Flowchart — MangaAssist Observability and Troubleshooting Stack
flowchart TB
subgraph UserLayer["User Layer"]
U[Manga Store<br/>Customer] -->|WebSocket| APIGW[API Gateway<br/>WebSocket]
end
subgraph Orchestration["ECS Fargate — Orchestrator"]
APIGW --> ORC[Orchestrator<br/>Service]
ORC --> SL[Structured<br/>FM Logger]
ORC --> XT[X-Ray<br/>Tracer]
end
subgraph FMLayer["FM & Data Layer"]
ORC -->|InvokeModel| BR[Amazon Bedrock<br/>Claude 3 Sonnet/Haiku]
ORC -->|Vector Search| OS[OpenSearch<br/>Serverless]
ORC -->|Session R/W| DDB[DynamoDB]
ORC -->|Cache| RC[ElastiCache<br/>Redis]
end
subgraph Observability["Observability Stack"]
SL -->|Structured JSON| CWL[CloudWatch<br/>Logs]
XT -->|Trace Segments| XR[AWS X-Ray]
BR -->|Model Metrics| CWM[CloudWatch<br/>Metrics]
CWL --> CLI[CloudWatch<br/>Logs Insights]
XR --> SM[X-Ray<br/>Service Map]
CWM --> CWD[CloudWatch<br/>Dashboards]
end
subgraph Troubleshooting["Troubleshooting & Alerting"]
CLI --> EPD[Error Pattern<br/>Detector]
SM --> LA[Latency<br/>Analyzer]
CWD --> AL[CloudWatch<br/>Alarms]
EPD --> QD[Amazon Q<br/>Developer]
LA --> QD
AL --> SNS[SNS<br/>Notifications]
QD --> RB[Runbook<br/>Automation]
RB --> SSM[Systems Manager<br/>Automation]
end
subgraph ResponseLoop["Feedback Loop"]
SSM -->|Auto-Remediate| ORC
SNS -->|Alert Ops Team| OPS[Operations<br/>Team]
OPS -->|Manual Intervention| ORC
end
style UserLayer fill:#e8f4f8,stroke:#2196F3
style Orchestration fill:#fff3e0,stroke:#FF9800
style FMLayer fill:#f3e5f5,stroke:#9C27B0
style Observability fill:#e8f5e9,stroke:#4CAF50
style Troubleshooting fill:#fce4ec,stroke:#E91E63
style ResponseLoop fill:#fffde7,stroke:#FFC107
3. CloudWatch Logs Insights for Prompt/Response Analysis
3.1 Structured Log Format for FM Interactions
Every FM call in MangaAssist produces a structured JSON log entry with fields designed for Logs Insights queries.
import json
import time
import hashlib
import logging
from datetime import datetime, timezone
from typing import Optional, Dict, Any, List
from dataclasses import dataclass, field, asdict
from enum import Enum
class FMCallStatus(Enum):
"""Status values for FM API calls."""
SUCCESS = "SUCCESS"
ERROR = "ERROR"
TIMEOUT = "TIMEOUT"
THROTTLED = "THROTTLED"
CONTENT_FILTERED = "CONTENT_FILTERED"
PARTIAL = "PARTIAL"
class ErrorSeverity(Enum):
"""Severity levels for FM errors."""
LOW = "LOW"
MEDIUM = "MEDIUM"
HIGH = "HIGH"
CRITICAL = "CRITICAL"
@dataclass
class FMCallMetadata:
"""Metadata captured for every FM API call."""
request_id: str
session_id: str
user_id: str
conversation_turn: int
model_id: str
model_version: str
region: str
timestamp_iso: str
timestamp_epoch_ms: int
@dataclass
class PromptMetrics:
"""Metrics extracted from the prompt side of an FM call."""
prompt_template_id: str
prompt_token_count: int
system_prompt_token_count: int
context_token_count: int
user_message_token_count: int
total_input_tokens: int
prompt_hash: str
contains_manga_title: bool
contains_author_name: bool
language: str
rag_chunks_included: int
rag_relevance_scores: List[float]
@dataclass
class ResponseMetrics:
"""Metrics extracted from the response side of an FM call."""
output_token_count: int
stop_reason: str
response_length_chars: int
response_language: str
contains_product_recommendation: bool
recommendation_count: int
confidence_score: Optional[float]
content_filter_triggered: bool
content_filter_reason: Optional[str]
@dataclass
class LatencyBreakdown:
"""Granular latency breakdown for an FM call chain."""
total_e2e_ms: float
api_gateway_ms: float
orchestrator_processing_ms: float
rag_retrieval_ms: float
opensearch_query_ms: float
dynamodb_session_lookup_ms: float
redis_cache_check_ms: float
bedrock_invoke_ms: float
bedrock_first_token_ms: float
response_parsing_ms: float
websocket_delivery_ms: float
@dataclass
class ErrorDetails:
"""Detailed error information for failed FM calls."""
error_type: str
error_code: str
error_message: str
error_severity: str
retry_count: int
is_retryable: bool
root_cause_category: str
stack_trace_hash: str
upstream_service: str
@dataclass
class CostMetrics:
"""Cost tracking for each FM call."""
input_cost_usd: float
output_cost_usd: float
total_cost_usd: float
model_pricing_tier: str
was_cached: bool
cache_hit_savings_usd: float
@dataclass
class FMCallLogEntry:
"""Complete structured log entry for a single FM interaction."""
log_type: str = "FM_CALL"
metadata: Optional[FMCallMetadata] = None
status: str = FMCallStatus.SUCCESS.value
prompt_metrics: Optional[PromptMetrics] = None
response_metrics: Optional[ResponseMetrics] = None
latency: Optional[LatencyBreakdown] = None
error: Optional[ErrorDetails] = None
cost: Optional[CostMetrics] = None
tags: Dict[str, str] = field(default_factory=dict)
def to_json(self) -> str:
"""Serialize the log entry to JSON for CloudWatch ingestion."""
data = {}
for k, v in asdict(self).items():
if v is not None:
data[k] = v
return json.dumps(data, default=str)
class StructuredFMLogger:
"""
Structured logger for FM interactions in MangaAssist.
Produces JSON log entries optimized for CloudWatch Logs Insights
queries. Every FM call, whether successful or failed, is logged
with consistent fields to enable efficient troubleshooting.
Usage:
logger = StructuredFMLogger(service_name="manga-assist-orchestrator")
entry = logger.log_fm_call(
request_id="req-abc123",
session_id="sess-xyz",
user_id="user-001",
model_id="anthropic.claude-3-sonnet-20240229-v1:0",
...
)
"""
# Pricing per 1M tokens for supported models
MODEL_PRICING = {
"anthropic.claude-3-sonnet": {"input": 3.00, "output": 15.00},
"anthropic.claude-3-haiku": {"input": 0.25, "output": 1.25},
}
def __init__(self, service_name: str, region: str = "ap-northeast-1"):
self.service_name = service_name
self.region = region
self.logger = logging.getLogger(f"fm.{service_name}")
self._configure_handler()
def _configure_handler(self) -> None:
"""Configure JSON handler for CloudWatch agent pickup."""
handler = logging.StreamHandler()
handler.setFormatter(logging.Formatter("%(message)s"))
self.logger.addHandler(handler)
self.logger.setLevel(logging.INFO)
def _compute_prompt_hash(self, prompt_text: str) -> str:
"""Hash prompt text for deduplication analysis without storing PII."""
return hashlib.sha256(prompt_text.encode("utf-8")).hexdigest()[:16]
def _resolve_model_key(self, model_id: str) -> str:
"""Resolve full model ID to pricing key."""
for key in self.MODEL_PRICING:
if key in model_id:
return key
return "anthropic.claude-3-haiku"
def _calculate_cost(
self, model_id: str, input_tokens: int, output_tokens: int, was_cached: bool
) -> CostMetrics:
"""Calculate cost metrics for an FM call."""
model_key = self._resolve_model_key(model_id)
pricing = self.MODEL_PRICING[model_key]
input_cost = (input_tokens / 1_000_000) * pricing["input"]
output_cost = (output_tokens / 1_000_000) * pricing["output"]
total_cost = input_cost + output_cost
cache_savings = total_cost * 0.9 if was_cached else 0.0
return CostMetrics(
input_cost_usd=round(input_cost, 8),
output_cost_usd=round(output_cost, 8),
total_cost_usd=round(total_cost if not was_cached else total_cost * 0.1, 8),
model_pricing_tier=model_key,
was_cached=was_cached,
cache_hit_savings_usd=round(cache_savings, 8),
)
def log_fm_call(
self,
request_id: str,
session_id: str,
user_id: str,
conversation_turn: int,
model_id: str,
model_version: str,
prompt_template_id: str,
prompt_text: str,
system_prompt_tokens: int,
context_tokens: int,
user_message_tokens: int,
output_tokens: int,
stop_reason: str,
response_text: str,
latency_breakdown: Dict[str, float],
rag_chunks: int,
rag_scores: List[float],
status: FMCallStatus = FMCallStatus.SUCCESS,
error_details: Optional[Dict[str, Any]] = None,
was_cached: bool = False,
tags: Optional[Dict[str, str]] = None,
) -> FMCallLogEntry:
"""
Log a complete FM call with all metrics.
This is the primary logging method called by the orchestrator
after every Bedrock InvokeModel call completes (or fails).
"""
now = datetime.now(timezone.utc)
total_input = system_prompt_tokens + context_tokens + user_message_tokens
metadata = FMCallMetadata(
request_id=request_id,
session_id=session_id,
user_id=user_id,
conversation_turn=conversation_turn,
model_id=model_id,
model_version=model_version,
region=self.region,
timestamp_iso=now.isoformat(),
timestamp_epoch_ms=int(now.timestamp() * 1000),
)
prompt_metrics = PromptMetrics(
prompt_template_id=prompt_template_id,
prompt_token_count=total_input,
system_prompt_token_count=system_prompt_tokens,
context_token_count=context_tokens,
user_message_token_count=user_message_tokens,
total_input_tokens=total_input,
prompt_hash=self._compute_prompt_hash(prompt_text),
contains_manga_title=any(
tag in prompt_text.lower()
for tag in ["manga", "comic", "volume", "chapter"]
),
contains_author_name=False,
language="ja" if any("\u3040" <= c <= "\u309f" for c in prompt_text) else "en",
rag_chunks_included=rag_chunks,
rag_relevance_scores=rag_scores,
)
response_metrics = ResponseMetrics(
output_token_count=output_tokens,
stop_reason=stop_reason,
response_length_chars=len(response_text),
response_language="ja" if any("\u3040" <= c <= "\u309f" for c in response_text) else "en",
contains_product_recommendation="isbn" in response_text.lower()
or "price" in response_text.lower(),
recommendation_count=response_text.lower().count("isbn"),
confidence_score=None,
content_filter_triggered=status == FMCallStatus.CONTENT_FILTERED,
content_filter_reason=None,
)
latency = LatencyBreakdown(
total_e2e_ms=latency_breakdown.get("total_e2e_ms", 0),
api_gateway_ms=latency_breakdown.get("api_gateway_ms", 0),
orchestrator_processing_ms=latency_breakdown.get("orchestrator_processing_ms", 0),
rag_retrieval_ms=latency_breakdown.get("rag_retrieval_ms", 0),
opensearch_query_ms=latency_breakdown.get("opensearch_query_ms", 0),
dynamodb_session_lookup_ms=latency_breakdown.get("dynamodb_session_lookup_ms", 0),
redis_cache_check_ms=latency_breakdown.get("redis_cache_check_ms", 0),
bedrock_invoke_ms=latency_breakdown.get("bedrock_invoke_ms", 0),
bedrock_first_token_ms=latency_breakdown.get("bedrock_first_token_ms", 0),
response_parsing_ms=latency_breakdown.get("response_parsing_ms", 0),
websocket_delivery_ms=latency_breakdown.get("websocket_delivery_ms", 0),
)
cost = self._calculate_cost(model_id, total_input, output_tokens, was_cached)
error = None
if error_details:
error = ErrorDetails(
error_type=error_details.get("error_type", "Unknown"),
error_code=error_details.get("error_code", "UNKNOWN"),
error_message=error_details.get("error_message", ""),
error_severity=error_details.get("error_severity", ErrorSeverity.MEDIUM.value),
retry_count=error_details.get("retry_count", 0),
is_retryable=error_details.get("is_retryable", False),
root_cause_category=error_details.get("root_cause_category", "unknown"),
stack_trace_hash=error_details.get("stack_trace_hash", ""),
upstream_service=error_details.get("upstream_service", ""),
)
entry = FMCallLogEntry(
log_type="FM_CALL",
metadata=metadata,
status=status.value,
prompt_metrics=prompt_metrics,
response_metrics=response_metrics,
latency=latency,
error=error,
cost=cost,
tags=tags or {},
)
self.logger.info(entry.to_json())
return entry
def log_rag_retrieval(
self,
request_id: str,
session_id: str,
query_text: str,
opensearch_index: str,
vector_dimension: int,
k_results: int,
results_returned: int,
relevance_scores: List[float],
query_latency_ms: float,
filter_applied: Optional[str] = None,
) -> None:
"""Log RAG vector retrieval step separately for granular analysis."""
log_entry = {
"log_type": "RAG_RETRIEVAL",
"request_id": request_id,
"session_id": session_id,
"query_hash": hashlib.sha256(query_text.encode()).hexdigest()[:16],
"opensearch_index": opensearch_index,
"vector_dimension": vector_dimension,
"k_requested": k_results,
"results_returned": results_returned,
"relevance_scores": relevance_scores,
"avg_relevance": round(sum(relevance_scores) / len(relevance_scores), 4)
if relevance_scores
else 0,
"min_relevance": min(relevance_scores) if relevance_scores else 0,
"max_relevance": max(relevance_scores) if relevance_scores else 0,
"query_latency_ms": query_latency_ms,
"filter_applied": filter_applied,
"timestamp": datetime.now(timezone.utc).isoformat(),
}
self.logger.info(json.dumps(log_entry))
def log_session_event(
self,
session_id: str,
user_id: str,
event_type: str,
details: Dict[str, Any],
) -> None:
"""Log session lifecycle events (start, end, timeout, error)."""
log_entry = {
"log_type": "SESSION_EVENT",
"session_id": session_id,
"user_id": user_id,
"event_type": event_type,
"details": details,
"timestamp": datetime.now(timezone.utc).isoformat(),
}
self.logger.info(json.dumps(log_entry))
3.2 CloudWatch Logs Insights Query Examples
These queries are designed for the MangaAssist log group and leverage the structured JSON fields produced by StructuredFMLogger.
# ── Query 1: P50/P90/P99 Bedrock Invoke Latency Over Time ──
# Use: Identify latency degradation trends for Bedrock calls.
fields @timestamp, latency.bedrock_invoke_ms as bedrock_ms
| filter log_type = "FM_CALL"
| filter status = "SUCCESS"
| stats
avg(bedrock_ms) as avg_latency,
percentile(bedrock_ms, 50) as p50,
percentile(bedrock_ms, 90) as p90,
percentile(bedrock_ms, 99) as p99,
count(*) as call_count
by bin(5m) as time_bucket
| sort time_bucket desc
# ── Query 2: Error Rate by Model and Error Type ──
# Use: Pinpoint which model variant generates the most errors.
fields @timestamp, metadata.model_id, error.error_type, error.error_code
| filter log_type = "FM_CALL"
| filter status != "SUCCESS"
| stats count(*) as error_count by metadata.model_id, error.error_type
| sort error_count desc
| limit 20
# ── Query 3: Token Usage Distribution (Input vs Output) ──
# Use: Detect prompt bloat or unexpectedly long responses.
fields @timestamp,
prompt_metrics.total_input_tokens as input_tokens,
response_metrics.output_token_count as output_tokens,
metadata.model_id
| filter log_type = "FM_CALL"
| stats
avg(input_tokens) as avg_input,
max(input_tokens) as max_input,
avg(output_tokens) as avg_output,
max(output_tokens) as max_output,
sum(input_tokens) as total_input,
sum(output_tokens) as total_output
by bin(1h)
# ── Query 4: Slow RAG Retrieval Correlation with Response Latency ──
# Use: Determine if OpenSearch is the bottleneck.
fields @timestamp,
latency.opensearch_query_ms as os_ms,
latency.bedrock_invoke_ms as bedrock_ms,
latency.total_e2e_ms as total_ms
| filter log_type = "FM_CALL"
| filter latency.total_e2e_ms > 3000
| stats
avg(os_ms) as avg_os,
avg(bedrock_ms) as avg_bedrock,
avg(total_ms) as avg_total,
count(*) as slow_count
by bin(15m)
| sort slow_count desc
# ── Query 5: Content Filter Trigger Analysis ──
# Use: Understand what inputs trigger content filters.
fields @timestamp,
metadata.request_id,
metadata.session_id,
prompt_metrics.prompt_template_id,
prompt_metrics.language,
response_metrics.content_filter_reason
| filter log_type = "FM_CALL"
| filter status = "CONTENT_FILTERED"
| stats count(*) as filter_count by prompt_metrics.prompt_template_id, prompt_metrics.language
| sort filter_count desc
# ── Query 6: Cost Analysis by Model Over Time ──
# Use: Track daily spend and identify cost anomalies.
fields @timestamp,
metadata.model_id,
cost.total_cost_usd,
cost.was_cached,
cost.cache_hit_savings_usd
| filter log_type = "FM_CALL"
| stats
sum(cost.total_cost_usd) as total_spend,
sum(cost.cache_hit_savings_usd) as total_savings,
count(*) as call_count,
avg(cost.total_cost_usd) as avg_cost_per_call
by bin(1d), metadata.model_id
| sort total_spend desc
# ── Query 7: RAG Retrieval Quality Over Time ──
# Use: Detect degradation in vector search relevance.
fields @timestamp, avg_relevance, results_returned, query_latency_ms
| filter log_type = "RAG_RETRIEVAL"
| stats
avg(avg_relevance) as mean_relevance,
min(min_relevance) as worst_relevance,
avg(query_latency_ms) as avg_latency,
count(*) as retrieval_count
by bin(1h)
| sort @timestamp desc
# ── Query 8: Throttling Pattern Detection ──
# Use: Identify time windows with high throttle rates.
fields @timestamp, metadata.model_id, error.retry_count
| filter log_type = "FM_CALL"
| filter status = "THROTTLED"
| stats
count(*) as throttle_count,
avg(error.retry_count) as avg_retries,
max(error.retry_count) as max_retries
by bin(5m), metadata.model_id
| sort throttle_count desc
| limit 50
4. X-Ray Tracing for FM API Call Chains
4.1 X-Ray FM Tracer
import time
import json
import uuid
import traceback
from typing import Optional, Dict, Any, List, Callable
from dataclasses import dataclass, field
from contextlib import contextmanager
from functools import wraps
@dataclass
class TraceAnnotation:
"""Annotations attached to X-Ray segments for filtering."""
key: str
value: Any # str, int, or bool only for X-Ray annotations
@dataclass
class TraceMetadataEntry:
"""Metadata attached to X-Ray segments for detailed debug info."""
namespace: str
key: str
value: Any
@dataclass
class SubsegmentTiming:
"""Timing record for a subsegment within a trace."""
name: str
start_time: float
end_time: float
duration_ms: float
is_error: bool
is_throttle: bool
is_fault: bool
annotations: List[TraceAnnotation]
metadata: List[TraceMetadataEntry]
class XRayFMTracer:
"""
X-Ray tracing wrapper tailored for FM API call chains in MangaAssist.
Provides segment/subsegment management for the full request lifecycle:
API Gateway -> Orchestrator -> Redis -> DynamoDB -> OpenSearch -> Bedrock -> Response
Each service call becomes a subsegment with typed annotations for
efficient filtering in the X-Ray console and analytics.
Usage:
tracer = XRayFMTracer(service_name="manga-assist-orchestrator")
with tracer.trace_fm_request(request_id, session_id, model_id) as ctx:
with ctx.trace_redis_cache("cache-check"):
result = redis_client.get(cache_key)
with ctx.trace_bedrock_invoke("claude-sonnet"):
response = bedrock.invoke_model(...)
"""
def __init__(self, service_name: str, sampling_rate: float = 0.05):
self.service_name = service_name
self.sampling_rate = sampling_rate
self._xray_recorder = self._init_recorder()
self._active_segments: Dict[str, Any] = {}
def _init_recorder(self):
"""
Initialize the X-Ray recorder with daemon address and plugins.
In production, aws_xray_sdk.core.xray_recorder is used.
Here we show the configuration pattern.
"""
try:
from aws_xray_sdk.core import xray_recorder, patch_all
xray_recorder.configure(
service=self.service_name,
sampling=True,
context_missing="LOG_ERROR",
plugins=("ECSPlugin",),
daemon_address="127.0.0.1:2000",
)
# Patch boto3, requests, etc. for automatic subsegment creation
patch_all()
return xray_recorder
except ImportError:
return None
@contextmanager
def trace_fm_request(self, request_id: str, session_id: str, model_id: str):
"""
Create a top-level segment for an FM request.
Yields a TraceContext that provides subsegment helpers.
"""
ctx = FMTraceContext(
tracer=self,
request_id=request_id,
session_id=session_id,
model_id=model_id,
)
segment_name = f"{self.service_name}##fm-request"
if self._xray_recorder:
segment = self._xray_recorder.begin_segment(
name=segment_name,
traceid=None, # auto-generate
sampling=True,
)
segment.put_annotation("request_id", request_id)
segment.put_annotation("session_id", session_id)
segment.put_annotation("model_id", model_id)
segment.put_annotation("service", self.service_name)
ctx._segment = segment
ctx._start_time = time.time()
try:
yield ctx
except Exception as e:
ctx._is_error = True
ctx._error_message = str(e)
if self._xray_recorder and ctx._segment:
ctx._segment.add_exception(e, traceback.extract_stack())
raise
finally:
ctx._end_time = time.time()
ctx._duration_ms = (ctx._end_time - ctx._start_time) * 1000
if self._xray_recorder and ctx._segment:
ctx._segment.put_metadata(
"timing_summary",
ctx.get_timing_summary(),
"fm_trace",
)
self._xray_recorder.end_segment()
def create_sampling_rule(
self,
rule_name: str,
priority: int,
fixed_rate: float,
reservoir_size: int,
service_name: str,
url_path: str = "*",
http_method: str = "*",
) -> Dict[str, Any]:
"""
Generate an X-Ray sampling rule definition.
For MangaAssist, we want higher sampling on error paths and
lower sampling on healthy paths to control cost.
"""
return {
"SamplingRule": {
"RuleName": rule_name,
"Priority": priority,
"FixedRate": fixed_rate,
"ReservoirSize": reservoir_size,
"ServiceName": service_name,
"ServiceType": "AWS::ECS::Container",
"Host": "*",
"HTTPMethod": http_method,
"URLPath": url_path,
"ResourceARN": "*",
"Version": 1,
"Attributes": {},
}
}
def get_sampling_rules_for_mangaassist(self) -> List[Dict[str, Any]]:
"""
Return recommended sampling rules for MangaAssist.
At 1M messages/day, full sampling would be cost-prohibitive.
These rules balance observability with cost.
"""
return [
self.create_sampling_rule(
rule_name="MangaAssist-Errors",
priority=1,
fixed_rate=1.0, # Sample 100% of errors
reservoir_size=100,
service_name=self.service_name,
),
self.create_sampling_rule(
rule_name="MangaAssist-SlowRequests",
priority=2,
fixed_rate=0.5, # Sample 50% of slow requests
reservoir_size=50,
service_name=self.service_name,
),
self.create_sampling_rule(
rule_name="MangaAssist-Normal",
priority=10,
fixed_rate=0.01, # Sample 1% of normal traffic
reservoir_size=10,
service_name=self.service_name,
),
]
class FMTraceContext:
"""
Context object yielded by XRayFMTracer.trace_fm_request().
Provides typed subsegment helpers for each downstream service call.
"""
def __init__(
self,
tracer: XRayFMTracer,
request_id: str,
session_id: str,
model_id: str,
):
self._tracer = tracer
self._request_id = request_id
self._session_id = session_id
self._model_id = model_id
self._segment = None
self._start_time: float = 0
self._end_time: float = 0
self._duration_ms: float = 0
self._is_error: bool = False
self._error_message: str = ""
self._subsegment_timings: List[SubsegmentTiming] = []
@contextmanager
def _trace_subsegment(
self,
name: str,
service_type: str,
annotations: Optional[Dict[str, Any]] = None,
metadata: Optional[Dict[str, Any]] = None,
):
"""Generic subsegment tracer for any downstream call."""
start = time.time()
is_error = False
is_throttle = False
is_fault = False
recorder = self._tracer._xray_recorder
subsegment = None
if recorder:
subsegment = recorder.begin_subsegment(name)
subsegment.put_annotation("service_type", service_type)
subsegment.put_annotation("request_id", self._request_id)
if annotations:
for k, v in annotations.items():
subsegment.put_annotation(k, v)
if metadata:
for k, v in metadata.items():
subsegment.put_metadata(k, v, "fm_trace")
try:
yield subsegment
except Exception as e:
is_error = True
error_str = str(e).lower()
if "throttl" in error_str:
is_throttle = True
if "internal" in error_str or "fault" in error_str:
is_fault = True
if subsegment:
subsegment.add_exception(e, traceback.extract_stack())
raise
finally:
end = time.time()
duration = (end - start) * 1000
if subsegment:
recorder.end_subsegment()
timing = SubsegmentTiming(
name=name,
start_time=start,
end_time=end,
duration_ms=duration,
is_error=is_error,
is_throttle=is_throttle,
is_fault=is_fault,
annotations=[
TraceAnnotation(k, v) for k, v in (annotations or {}).items()
],
metadata=[
TraceMetadataEntry("fm_trace", k, v) for k, v in (metadata or {}).items()
],
)
self._subsegment_timings.append(timing)
@contextmanager
def trace_redis_cache(self, operation: str = "get"):
"""Trace a Redis cache check/set operation."""
with self._trace_subsegment(
name="ElastiCache-Redis",
service_type="AWS::ElastiCache::Redis",
annotations={"cache_operation": operation},
) as subseg:
yield subseg
@contextmanager
def trace_dynamodb(self, table_name: str, operation: str = "GetItem"):
"""Trace a DynamoDB operation (session lookup, product fetch)."""
with self._trace_subsegment(
name=f"DynamoDB-{table_name}",
service_type="AWS::DynamoDB::Table",
annotations={"table_name": table_name, "ddb_operation": operation},
) as subseg:
yield subseg
@contextmanager
def trace_opensearch(self, index_name: str, k_results: int = 5):
"""Trace an OpenSearch vector search query."""
with self._trace_subsegment(
name="OpenSearch-VectorSearch",
service_type="AWS::OpenSearchServerless",
annotations={"index_name": index_name, "k_results": k_results},
) as subseg:
yield subseg
@contextmanager
def trace_bedrock_invoke(
self, model_id: str, input_tokens: int = 0, max_tokens: int = 1024
):
"""Trace a Bedrock InvokeModel call with model-specific annotations."""
with self._trace_subsegment(
name="Bedrock-InvokeModel",
service_type="AWS::Bedrock::InvokeModel",
annotations={
"model_id": model_id,
"input_tokens": input_tokens,
"max_tokens": max_tokens,
},
metadata={"model_config": {"model_id": model_id, "max_tokens": max_tokens}},
) as subseg:
yield subseg
@contextmanager
def trace_websocket_response(self):
"""Trace the WebSocket response delivery to the client."""
with self._trace_subsegment(
name="APIGateway-WebSocket-Send",
service_type="AWS::ApiGatewayV2::WebSocket",
) as subseg:
yield subseg
def get_timing_summary(self) -> Dict[str, Any]:
"""
Return a summary of all subsegment timings for this request.
Used for both X-Ray metadata and structured logging.
"""
summary = {
"request_id": self._request_id,
"total_duration_ms": self._duration_ms,
"is_error": self._is_error,
"subsegments": [],
}
for t in self._subsegment_timings:
summary["subsegments"].append({
"name": t.name,
"duration_ms": round(t.duration_ms, 2),
"is_error": t.is_error,
"is_throttle": t.is_throttle,
})
total_subseg = sum(t.duration_ms for t in self._subsegment_timings)
summary["subsegment_total_ms"] = round(total_subseg, 2)
summary["orchestrator_overhead_ms"] = round(self._duration_ms - total_subseg, 2)
return summary
5. Amazon Q Developer for GenAI-Specific Error Pattern Recognition
5.1 Error Pattern Detector
import re
import json
import time
from datetime import datetime, timezone, timedelta
from typing import Optional, Dict, Any, List, Tuple
from dataclasses import dataclass, field
from enum import Enum
from collections import Counter, defaultdict
class ErrorCategory(Enum):
"""Top-level error categories for FM applications."""
MODEL_THROTTLE = "MODEL_THROTTLE"
MODEL_TIMEOUT = "MODEL_TIMEOUT"
MODEL_VALIDATION = "MODEL_VALIDATION"
CONTENT_FILTER = "CONTENT_FILTER"
TOKEN_LIMIT = "TOKEN_LIMIT"
CONTEXT_WINDOW = "CONTEXT_WINDOW"
RAG_RETRIEVAL = "RAG_RETRIEVAL"
SESSION_CORRUPTION = "SESSION_CORRUPTION"
CACHE_FAILURE = "CACHE_FAILURE"
NETWORK = "NETWORK"
INFRASTRUCTURE = "INFRASTRUCTURE"
SDK_MISCONFIGURATION = "SDK_MISCONFIGURATION"
PROMPT_TEMPLATE = "PROMPT_TEMPLATE"
RESPONSE_PARSE = "RESPONSE_PARSE"
UNKNOWN = "UNKNOWN"
class RemediationAction(Enum):
"""Possible automated remediation actions."""
RETRY_WITH_BACKOFF = "RETRY_WITH_BACKOFF"
SWITCH_MODEL = "SWITCH_MODEL"
REDUCE_INPUT_TOKENS = "REDUCE_INPUT_TOKENS"
CLEAR_CACHE = "CLEAR_CACHE"
RESET_SESSION = "RESET_SESSION"
SCALE_UP = "SCALE_UP"
ALERT_OPS = "ALERT_OPS"
NO_ACTION = "NO_ACTION"
TRUNCATE_CONTEXT = "TRUNCATE_CONTEXT"
FALLBACK_RESPONSE = "FALLBACK_RESPONSE"
@dataclass
class ErrorPattern:
"""
Defines a recognizable error pattern with its classification,
root cause analysis, and recommended remediation.
"""
pattern_id: str
category: ErrorCategory
name: str
description: str
regex_patterns: List[str]
error_codes: List[str]
root_cause: str
impact: str
remediation: RemediationAction
remediation_steps: List[str]
severity: str
is_transient: bool
expected_frequency: str
q_developer_prompt: str
@dataclass
class PatternMatch:
"""Result of matching an error against known patterns."""
pattern: ErrorPattern
confidence: float
matched_by: str
error_message: str
timestamp: str
request_id: str
additional_context: Dict[str, Any] = field(default_factory=dict)
@dataclass
class ErrorTrend:
"""Aggregated error trend over a time window."""
category: ErrorCategory
count: int
time_window_minutes: int
trend_direction: str # "increasing", "decreasing", "stable"
rate_per_minute: float
first_seen: str
last_seen: str
affected_sessions: int
affected_models: List[str]
class ErrorPatternDetector:
"""
Detects and classifies error patterns in FM application logs.
Maintains a registry of known error patterns specific to
MangaAssist's GenAI stack (Bedrock, OpenSearch, DynamoDB,
Redis, API Gateway). Uses regex matching, error code lookup,
and contextual analysis to classify errors and recommend
automated remediation.
Integration with Amazon Q Developer:
The detector generates Q Developer prompts for each detected
pattern, enabling AI-assisted diagnosis and fix suggestions.
"""
def __init__(self):
self._patterns: List[ErrorPattern] = []
self._match_history: List[PatternMatch] = []
self._error_counts: Counter = Counter()
self._error_timeline: defaultdict = defaultdict(list)
self._register_default_patterns()
def _register_default_patterns(self) -> None:
"""Register all known error patterns for MangaAssist."""
patterns = [
ErrorPattern(
pattern_id="EP-001",
category=ErrorCategory.MODEL_THROTTLE,
name="Bedrock Throttling",
description="InvokeModel requests being throttled due to rate limits",
regex_patterns=[
r"ThrottlingException",
r"Rate exceeded",
r"Too many requests",
r"TooManyRequestsException",
],
error_codes=["ThrottlingException", "TooManyRequestsException"],
root_cause="Request rate exceeds provisioned throughput or on-demand limits for the selected model",
impact="Users experience delayed responses or timeout errors. At 1M msgs/day, burst traffic can easily hit limits.",
remediation=RemediationAction.RETRY_WITH_BACKOFF,
remediation_steps=[
"1. Implement exponential backoff with jitter (base=1s, max=30s)",
"2. Check current provisioned throughput in Bedrock console",
"3. Request quota increase via AWS Support if sustained",
"4. Consider routing overflow to Haiku ($0.25/$1.25) as fallback",
"5. Enable request queuing in orchestrator with priority levels",
],
severity="HIGH",
is_transient=True,
expected_frequency="< 1% of requests under normal load",
q_developer_prompt="Analyze this Bedrock ThrottlingException in my ECS Fargate service calling Claude 3. Show me how to implement exponential backoff with jitter and a fallback to Haiku model.",
),
ErrorPattern(
pattern_id="EP-002",
category=ErrorCategory.MODEL_TIMEOUT,
name="Bedrock Invoke Timeout",
description="InvokeModel call exceeds configured timeout",
regex_patterns=[
r"ReadTimeoutError",
r"ConnectTimeoutError",
r"ModelTimeoutException",
r"Connection reset by peer",
],
error_codes=["ReadTimeoutError", "ModelTimeoutException"],
root_cause="Model inference takes longer than client timeout, often with large input contexts or during service degradation",
impact="User gets no response. WebSocket connection may close. Session state may be inconsistent.",
remediation=RemediationAction.SWITCH_MODEL,
remediation_steps=[
"1. Check input token count — if > 100K, truncate context",
"2. If Sonnet times out, retry with Haiku for faster response",
"3. Increase client timeout to 60s for large contexts",
"4. Implement streaming (InvokeModelWithResponseStream) for perceived speed",
"5. Add circuit breaker to prevent cascading timeouts",
],
severity="HIGH",
is_transient=True,
expected_frequency="< 0.5% under normal conditions",
q_developer_prompt="My Bedrock InvokeModel call is timing out. The model is Claude 3 Sonnet with large manga context. How do I implement streaming with InvokeModelWithResponseStream and add a circuit breaker?",
),
ErrorPattern(
pattern_id="EP-003",
category=ErrorCategory.TOKEN_LIMIT,
name="Token Limit Exceeded",
description="Input or output token count exceeds model limits",
regex_patterns=[
r"ValidationException.*token",
r"maximum.*token.*exceeded",
r"input.*too.*long",
r"context.*length.*exceeded",
],
error_codes=["ValidationException"],
root_cause="Prompt with system instructions + RAG context + conversation history exceeds model token limit",
impact="Request fails before inference. User sees error. No cost incurred for failed call.",
remediation=RemediationAction.TRUNCATE_CONTEXT,
remediation_steps=[
"1. Count tokens before sending (use tiktoken or model tokenizer)",
"2. Implement sliding window for conversation history",
"3. Limit RAG chunks to top-3 by relevance score",
"4. Summarize older conversation turns",
"5. Set max_tokens output limit to reserve space",
],
severity="MEDIUM",
is_transient=False,
expected_frequency="1-2% for long manga discussion threads",
q_developer_prompt="I'm getting ValidationException for token limits on Claude 3 Sonnet. My prompt includes system instructions, RAG chunks from OpenSearch, and conversation history from DynamoDB. How do I implement a token budget manager?",
),
ErrorPattern(
pattern_id="EP-004",
category=ErrorCategory.RAG_RETRIEVAL,
name="OpenSearch Vector Search Failure",
description="Vector similarity search returns empty or irrelevant results",
regex_patterns=[
r"OpenSearchException",
r"ConnectionTimeout.*opensearch",
r"index_not_found_exception",
r"search_phase_execution_exception",
],
error_codes=["OpenSearchException", "ConnectionTimeout"],
root_cause="OpenSearch Serverless collection unreachable, index not built, or embedding dimension mismatch",
impact="FM receives no context, producing hallucinated or generic answers about manga products",
remediation=RemediationAction.FALLBACK_RESPONSE,
remediation_steps=[
"1. Check OpenSearch Serverless collection status",
"2. Verify VPC endpoint connectivity from ECS tasks",
"3. Confirm embedding dimension matches index mapping (1536 for Titan)",
"4. Fall back to keyword search if vector search fails",
"5. Return cached popular recommendations as fallback",
],
severity="HIGH",
is_transient=True,
expected_frequency="< 0.1% under normal conditions",
q_developer_prompt="OpenSearch Serverless vector search is failing in my RAG pipeline. The index uses 1536-dimension Titan embeddings. How do I add a keyword search fallback and verify index health?",
),
ErrorPattern(
pattern_id="EP-005",
category=ErrorCategory.CONTENT_FILTER,
name="Content Filter Triggered",
description="Bedrock content filter blocks input or output",
regex_patterns=[
r"content.*filter",
r"blocked.*content.*policy",
r"AccessDeniedException.*content",
r"guardrail.*blocked",
],
error_codes=["AccessDeniedException"],
root_cause="User query or FM response contains content that violates Bedrock guardrails or content policies",
impact="User receives generic error instead of helpful response. May frustrate legitimate queries about mature manga titles.",
remediation=RemediationAction.FALLBACK_RESPONSE,
remediation_steps=[
"1. Log the blocked content category for pattern analysis",
"2. Review guardrail configuration for false positives",
"3. Adjust content filter sensitivity for manga-specific terms",
"4. Provide user-friendly message explaining the limitation",
"5. Offer alternative query suggestions",
],
severity="MEDIUM",
is_transient=False,
expected_frequency="2-5% depending on manga genre queries",
q_developer_prompt="Bedrock content filters are triggering too often for my manga store chatbot. How do I configure Bedrock Guardrails to allow manga-specific terminology while maintaining safety?",
),
ErrorPattern(
pattern_id="EP-006",
category=ErrorCategory.SESSION_CORRUPTION,
name="DynamoDB Session Corruption",
description="Session data is corrupted or inconsistent in DynamoDB",
regex_patterns=[
r"ConditionalCheckFailedException",
r"ValidationException.*attribute",
r"SerializationException",
r"item.*size.*exceeded",
],
error_codes=["ConditionalCheckFailedException", "SerializationException"],
root_cause="Concurrent writes to session, item size exceeding 400KB limit, or schema mismatch after deployment",
impact="Conversation context is lost or incorrect. User may need to restart conversation.",
remediation=RemediationAction.RESET_SESSION,
remediation_steps=[
"1. Implement optimistic locking with version numbers",
"2. Compress conversation history before storage",
"3. Archive old turns to S3 if item approaches 400KB",
"4. Add schema validation before writes",
"5. Reset session gracefully with user notification",
],
severity="MEDIUM",
is_transient=False,
expected_frequency="< 0.01% with proper locking",
q_developer_prompt="DynamoDB ConditionalCheckFailedException on my session table. Multiple ECS tasks are writing to the same session item. How do I implement optimistic locking and handle concurrent writes?",
),
ErrorPattern(
pattern_id="EP-007",
category=ErrorCategory.CACHE_FAILURE,
name="Redis Cache Connection Failure",
description="ElastiCache Redis connection lost or timeout",
regex_patterns=[
r"ConnectionError.*redis",
r"TimeoutError.*redis",
r"CLUSTERDOWN",
r"MOVED",
r"redis.*connection.*refused",
],
error_codes=["ConnectionError", "TimeoutError", "CLUSTERDOWN"],
root_cause="Redis node failure, network partition, or cluster rebalancing. Also possible during maintenance windows.",
impact="Cache misses increase Bedrock calls and latency. At 1M msgs/day, every cache miss costs $0.003-$0.015 per call.",
remediation=RemediationAction.RETRY_WITH_BACKOFF,
remediation_steps=[
"1. Configure Redis client with connection pooling and retry logic",
"2. Set socket_timeout=1s, socket_connect_timeout=1s",
"3. Implement cache-aside pattern with graceful degradation",
"4. Monitor Redis CloudWatch metrics (CacheHitRate, EngineCPUUtilization)",
"5. Use Multi-AZ replication for high availability",
],
severity="MEDIUM",
is_transient=True,
expected_frequency="< 0.01% with Multi-AZ",
q_developer_prompt="ElastiCache Redis connection is dropping intermittently from my ECS Fargate tasks. How do I configure connection pooling with retry logic and implement graceful cache-aside degradation?",
),
ErrorPattern(
pattern_id="EP-008",
category=ErrorCategory.SDK_MISCONFIGURATION,
name="Boto3 Client Misconfiguration",
description="AWS SDK client improperly configured for Bedrock calls",
regex_patterns=[
r"NoCredentialsError",
r"EndpointConnectionError",
r"ClientError.*UnrecognizedClientException",
r"botocore.*ParamValidationError",
r"InvalidRegionError",
],
error_codes=["NoCredentialsError", "EndpointConnectionError", "ParamValidationError"],
root_cause="ECS task role missing Bedrock permissions, incorrect region, or SDK version too old for latest model IDs",
impact="All FM calls fail. Complete service outage for chatbot functionality.",
remediation=RemediationAction.ALERT_OPS,
remediation_steps=[
"1. Verify ECS task role has bedrock:InvokeModel permission",
"2. Confirm Bedrock region matches client config (ap-northeast-1 for JP)",
"3. Update boto3/botocore to latest version",
"4. Check model access is enabled in Bedrock console",
"5. Validate IAM policy resource ARN for specific model access",
],
severity="CRITICAL",
is_transient=False,
expected_frequency="Only after deployments or IAM changes",
q_developer_prompt="Getting NoCredentialsError when calling Bedrock from ECS Fargate. My task definition uses a task role. How do I debug IAM permission issues for bedrock:InvokeModel in ap-northeast-1?",
),
]
for pattern in patterns:
self._patterns.append(pattern)
def detect(
self,
error_message: str,
error_code: Optional[str] = None,
request_id: str = "",
context: Optional[Dict[str, Any]] = None,
) -> List[PatternMatch]:
"""
Match an error against all registered patterns.
Returns a list of PatternMatch objects sorted by confidence,
highest first. Multiple patterns may match (e.g., a throttle
that also causes a timeout).
"""
matches: List[PatternMatch] = []
now_iso = datetime.now(timezone.utc).isoformat()
for pattern in self._patterns:
confidence = 0.0
matched_by = ""
# Check error code match (highest confidence)
if error_code and error_code in pattern.error_codes:
confidence = max(confidence, 0.95)
matched_by = f"error_code:{error_code}"
# Check regex patterns
for regex in pattern.regex_patterns:
if re.search(regex, error_message, re.IGNORECASE):
regex_confidence = 0.85
if confidence < regex_confidence:
confidence = regex_confidence
matched_by = f"regex:{regex}"
if confidence > 0.5:
match = PatternMatch(
pattern=pattern,
confidence=confidence,
matched_by=matched_by,
error_message=error_message,
timestamp=now_iso,
request_id=request_id,
additional_context=context or {},
)
matches.append(match)
self._error_counts[pattern.category.value] += 1
self._error_timeline[pattern.category.value].append(now_iso)
self._match_history.append(match)
matches.sort(key=lambda m: m.confidence, reverse=True)
return matches
def get_trend(
self,
category: ErrorCategory,
window_minutes: int = 60,
) -> Optional[ErrorTrend]:
"""
Analyze error trend for a specific category over a time window.
Used by the auto-remediation engine to decide whether to
escalate or self-heal.
"""
timestamps = self._error_timeline.get(category.value, [])
if not timestamps:
return None
cutoff = datetime.now(timezone.utc) - timedelta(minutes=window_minutes)
recent = [t for t in timestamps if t >= cutoff.isoformat()]
count = len(recent)
if count == 0:
return None
rate = count / window_minutes if window_minutes > 0 else 0
# Simple trend detection: compare first half vs second half
mid = window_minutes // 2
mid_cutoff = datetime.now(timezone.utc) - timedelta(minutes=mid)
first_half = [t for t in recent if t < mid_cutoff.isoformat()]
second_half = [t for t in recent if t >= mid_cutoff.isoformat()]
if len(second_half) > len(first_half) * 1.5:
direction = "increasing"
elif len(first_half) > len(second_half) * 1.5:
direction = "decreasing"
else:
direction = "stable"
return ErrorTrend(
category=category,
count=count,
time_window_minutes=window_minutes,
trend_direction=direction,
rate_per_minute=round(rate, 4),
first_seen=recent[0] if recent else "",
last_seen=recent[-1] if recent else "",
affected_sessions=0,
affected_models=[],
)
def generate_q_developer_prompt(self, match: PatternMatch) -> str:
"""
Generate a contextual prompt for Amazon Q Developer
based on the detected error pattern.
"""
prompt_parts = [
f"Error Pattern: {match.pattern.name} ({match.pattern.pattern_id})",
f"Category: {match.pattern.category.value}",
f"Error Message: {match.error_message}",
f"Root Cause: {match.pattern.root_cause}",
f"Severity: {match.pattern.severity}",
"",
f"Q Developer Query: {match.pattern.q_developer_prompt}",
"",
"Additional Context:",
]
for k, v in match.additional_context.items():
prompt_parts.append(f" - {k}: {v}")
return "\n".join(prompt_parts)
def get_error_summary(self) -> Dict[str, Any]:
"""Return a summary of all detected errors for dashboard display."""
return {
"total_errors_detected": sum(self._error_counts.values()),
"by_category": dict(self._error_counts),
"pattern_count": len(self._patterns),
"match_history_size": len(self._match_history),
"top_3_categories": self._error_counts.most_common(3),
}
6. Error Taxonomy for GenAI Applications
flowchart TD
subgraph ModelErrors["Model Errors"]
ME1[ThrottlingException<br/>Rate limit exceeded]
ME2[ModelTimeoutException<br/>Inference too slow]
ME3[ValidationException<br/>Invalid parameters]
ME4[ModelNotReadyException<br/>Model loading]
ME5[ServiceUnavailableException<br/>Bedrock outage]
end
subgraph ContentErrors["Content & Safety Errors"]
CE1[ContentFilterBlocked<br/>Input/output filtered]
CE2[GuardrailViolation<br/>Custom guardrail triggered]
CE3[TokenLimitExceeded<br/>Context too large]
CE4[ContextWindowOverflow<br/>History too long]
end
subgraph DataErrors["Data Layer Errors"]
DE1[VectorSearchEmpty<br/>No RAG results]
DE2[VectorSearchTimeout<br/>OpenSearch slow]
DE3[SessionNotFound<br/>DynamoDB miss]
DE4[SessionCorrupted<br/>Schema mismatch]
DE5[CacheMiss<br/>Redis unavailable]
DE6[StaleCache<br/>Outdated data served]
end
subgraph InfraErrors["Infrastructure Errors"]
IE1[ECSTaskOOM<br/>Memory exhaustion]
IE2[ECSTaskCrash<br/>Container failure]
IE3[WebSocketDisconnect<br/>Client dropped]
IE4[VPCNetworkError<br/>Connectivity loss]
IE5[IAMPermissionDenied<br/>Role misconfigured]
end
subgraph AppErrors["Application Logic Errors"]
AE1[PromptTemplateError<br/>Bad template render]
AE2[ResponseParseError<br/>Unexpected output format]
AE3[SDKVersionMismatch<br/>Old boto3]
AE4[ConfigMissing<br/>Environment variable absent]
AE5[RetryExhausted<br/>All retries failed]
end
ME1 --> R1[Retry with backoff]
ME2 --> R2[Switch to Haiku]
ME3 --> R3[Fix parameters]
CE1 --> R4[Adjust guardrails]
CE3 --> R5[Truncate context]
DE1 --> R6[Keyword fallback]
DE5 --> R7[Skip cache]
IE1 --> R8[Scale up task]
IE5 --> R9[Fix IAM role]
AE1 --> R10[Fix template]
style ModelErrors fill:#ffebee,stroke:#c62828
style ContentErrors fill:#fff3e0,stroke:#e65100
style DataErrors fill:#e8f5e9,stroke:#2e7d32
style InfraErrors fill:#e3f2fd,stroke:#1565c0
style AppErrors fill:#f3e5f5,stroke:#6a1b9a
7. FMLogAnalyzer — CloudWatch Insights Automation
import json
import time
import boto3
from datetime import datetime, timezone, timedelta
from typing import Optional, Dict, Any, List, Tuple
from dataclasses import dataclass
@dataclass
class InsightsQueryResult:
"""Result from a CloudWatch Logs Insights query."""
query_id: str
status: str
results: List[Dict[str, str]]
statistics: Dict[str, float]
query_text: str
duration_ms: float
class FMLogAnalyzer:
"""
Automated CloudWatch Logs Insights query engine for FM applications.
Provides pre-built queries for common MangaAssist troubleshooting
scenarios and the ability to run custom queries. Results are
formatted for dashboard consumption or alert rule evaluation.
Usage:
analyzer = FMLogAnalyzer(
log_group="/ecs/manga-assist-orchestrator",
region="ap-northeast-1"
)
# Check for latency spikes
result = analyzer.query_latency_percentiles(window_minutes=30)
# Check error rates by model
result = analyzer.query_error_distribution()
"""
def __init__(
self,
log_group: str,
region: str = "ap-northeast-1",
max_results: int = 1000,
):
self.log_group = log_group
self.region = region
self.max_results = max_results
self.client = boto3.client("logs", region_name=region)
def _run_query(
self,
query: str,
start_time: datetime,
end_time: datetime,
) -> InsightsQueryResult:
"""Execute a CloudWatch Logs Insights query and wait for results."""
start_epoch = int(start_time.timestamp())
end_epoch = int(end_time.timestamp())
t0 = time.time()
response = self.client.start_query(
logGroupName=self.log_group,
startTime=start_epoch,
endTime=end_epoch,
queryString=query,
limit=self.max_results,
)
query_id = response["queryId"]
# Poll for query completion
while True:
result = self.client.get_query_results(queryId=query_id)
if result["status"] in ("Complete", "Failed", "Cancelled"):
break
time.sleep(0.5)
duration = (time.time() - t0) * 1000
formatted_results = []
for row in result.get("results", []):
entry = {}
for field_obj in row:
entry[field_obj["field"]] = field_obj["value"]
formatted_results.append(entry)
return InsightsQueryResult(
query_id=query_id,
status=result["status"],
results=formatted_results,
statistics=result.get("statistics", {}),
query_text=query,
duration_ms=round(duration, 2),
)
def query_latency_percentiles(
self,
window_minutes: int = 60,
bucket_minutes: int = 5,
) -> InsightsQueryResult:
"""Query P50/P90/P99 latency for Bedrock calls."""
end_time = datetime.now(timezone.utc)
start_time = end_time - timedelta(minutes=window_minutes)
query = f"""
fields @timestamp, latency.bedrock_invoke_ms as bedrock_ms, latency.total_e2e_ms as total_ms
| filter log_type = "FM_CALL" and status = "SUCCESS"
| stats
count(*) as call_count,
avg(bedrock_ms) as avg_bedrock,
percentile(bedrock_ms, 50) as p50_bedrock,
percentile(bedrock_ms, 90) as p90_bedrock,
percentile(bedrock_ms, 99) as p99_bedrock,
avg(total_ms) as avg_total,
percentile(total_ms, 99) as p99_total
by bin({bucket_minutes}m)
| sort @timestamp desc
"""
return self._run_query(query, start_time, end_time)
def query_error_distribution(
self,
window_minutes: int = 60,
) -> InsightsQueryResult:
"""Query error distribution by category and model."""
end_time = datetime.now(timezone.utc)
start_time = end_time - timedelta(minutes=window_minutes)
query = """
fields @timestamp, metadata.model_id as model, error.error_type as err_type, error.error_code as err_code, error.root_cause_category as root_cause
| filter log_type = "FM_CALL" and status != "SUCCESS"
| stats count(*) as error_count by model, err_type, root_cause
| sort error_count desc
| limit 25
"""
return self._run_query(query, start_time, end_time)
def query_token_usage(
self,
window_minutes: int = 1440, # 24 hours default
) -> InsightsQueryResult:
"""Query token usage patterns to detect prompt bloat."""
end_time = datetime.now(timezone.utc)
start_time = end_time - timedelta(minutes=window_minutes)
query = """
fields @timestamp, prompt_metrics.total_input_tokens as input_tok, response_metrics.output_token_count as output_tok, metadata.model_id as model
| filter log_type = "FM_CALL"
| stats
avg(input_tok) as avg_input,
max(input_tok) as max_input,
percentile(input_tok, 95) as p95_input,
avg(output_tok) as avg_output,
max(output_tok) as max_output,
sum(input_tok) as total_input,
sum(output_tok) as total_output,
count(*) as calls
by bin(1h), model
| sort @timestamp desc
"""
return self._run_query(query, start_time, end_time)
def query_cost_analysis(
self,
window_minutes: int = 1440,
) -> InsightsQueryResult:
"""Query cost breakdown by model and cache utilization."""
end_time = datetime.now(timezone.utc)
start_time = end_time - timedelta(minutes=window_minutes)
query = """
fields @timestamp, metadata.model_id as model, cost.total_cost_usd as cost_usd, cost.was_cached as cached, cost.cache_hit_savings_usd as savings
| filter log_type = "FM_CALL"
| stats
sum(cost_usd) as total_spend,
sum(savings) as total_savings,
count(*) as call_count,
avg(cost_usd) as avg_cost_per_call,
sum(cached) as cache_hits
by bin(1h), model
| sort @timestamp desc
"""
return self._run_query(query, start_time, end_time)
def query_rag_quality(
self,
window_minutes: int = 360,
) -> InsightsQueryResult:
"""Query RAG retrieval quality metrics."""
end_time = datetime.now(timezone.utc)
start_time = end_time - timedelta(minutes=window_minutes)
query = """
fields @timestamp, avg_relevance, results_returned, query_latency_ms, opensearch_index
| filter log_type = "RAG_RETRIEVAL"
| stats
avg(avg_relevance) as mean_relevance,
min(min_relevance) as worst_relevance,
percentile(query_latency_ms, 50) as p50_latency,
percentile(query_latency_ms, 99) as p99_latency,
avg(results_returned) as avg_results,
count(*) as retrieval_count
by bin(30m)
| sort @timestamp desc
"""
return self._run_query(query, start_time, end_time)
def query_throttle_patterns(
self,
window_minutes: int = 120,
) -> InsightsQueryResult:
"""Detect throttling bursts and patterns."""
end_time = datetime.now(timezone.utc)
start_time = end_time - timedelta(minutes=window_minutes)
query = """
fields @timestamp, metadata.model_id as model, error.retry_count as retries, latency.total_e2e_ms as total_ms
| filter log_type = "FM_CALL" and status = "THROTTLED"
| stats
count(*) as throttle_count,
avg(retries) as avg_retries,
max(retries) as max_retries,
avg(total_ms) as avg_total_latency
by bin(5m), model
| sort throttle_count desc
"""
return self._run_query(query, start_time, end_time)
def query_slow_requests(
self,
threshold_ms: float = 3000,
window_minutes: int = 60,
) -> InsightsQueryResult:
"""Find requests exceeding the 3-second SLA target."""
end_time = datetime.now(timezone.utc)
start_time = end_time - timedelta(minutes=window_minutes)
query = f"""
fields @timestamp, metadata.request_id, metadata.session_id, metadata.model_id as model,
latency.total_e2e_ms as total_ms, latency.bedrock_invoke_ms as bedrock_ms,
latency.opensearch_query_ms as os_ms, latency.redis_cache_check_ms as redis_ms,
prompt_metrics.total_input_tokens as input_tokens
| filter log_type = "FM_CALL" and latency.total_e2e_ms > {threshold_ms}
| sort total_ms desc
| limit 50
"""
return self._run_query(query, start_time, end_time)
8. Summary Table — Troubleshooting Tool Mapping
| Tool | Purpose | MangaAssist Application | Key Metric |
|---|---|---|---|
| CloudWatch Logs Insights | Prompt/response analysis | Query structured FM call logs for patterns | Query latency < 5s for 1h window |
| AWS X-Ray | FM API call tracing | End-to-end trace through Bedrock, OpenSearch, DynamoDB | Trace sampling at 1% normal, 100% errors |
| Amazon Q Developer | GenAI error pattern recognition | AI-assisted diagnosis from detected error patterns | Pattern match confidence > 85% |
| CloudWatch Metrics | Real-time operational dashboards | Latency percentiles, error rates, token usage | Dashboard refresh < 60s |
| CloudWatch Alarms | Proactive alerting | Alert on P99 latency > 3s, error rate > 5% | Alarm evaluation period: 5m |
| SSM Automation | Auto-remediation | Restart tasks, clear caches, switch model fallback | MTTR target < 5 minutes |
9. Key Takeaways
-
Structured logging is non-negotiable: Every FM call must produce a consistent JSON log entry with metadata, prompt metrics, response metrics, latency breakdown, error details, and cost. Without this, CloudWatch Logs Insights queries return nothing useful.
-
X-Ray sampling must be error-biased: At 1M messages/day, sampling 100% of traces would be cost-prohibitive and would overwhelm the X-Ray backend. Sample 100% of errors, 50% of slow requests (> 3s), and 1% of normal traffic.
-
Error pattern detection enables automation: By classifying errors into known patterns, the system can automatically apply the correct remediation (retry, fallback model, truncate context) without human intervention for transient issues.
-
Q Developer integration closes the feedback loop: When patterns are detected that the system cannot auto-remediate, generating a contextual Q Developer prompt with error details, root cause, and stack context helps developers diagnose faster.
-
Cost tracking per call is essential: At MangaAssist's scale ($3/$15 per 1M tokens for Sonnet), a single prompt template bug that doubles input tokens could cost thousands of dollars per day. Every log entry must include cost metrics.
-
The 3-second SLA drives trace granularity: The latency breakdown must cover every component (API Gateway, orchestrator, Redis, DynamoDB, OpenSearch, Bedrock, response parsing, WebSocket delivery) to identify which subsystem is causing SLA breaches.