LOCAL PREVIEW View on GitHub

Troubleshooting Efficiency Architecture for FM Applications

MangaAssist context: JP Manga store chatbot on AWS — Bedrock Claude 3 (Sonnet at $3/$15 per 1M tokens input/output, Haiku at $0.25/$1.25), OpenSearch Serverless (vector store), DynamoDB (sessions/products), ECS Fargate (orchestrator), API Gateway WebSocket, ElastiCache Redis. Target: useful answer in under 3 seconds, 1M messages/day scale.


Skill Mapping

Attribute Value
Certification AWS Certified AI Practitioner (AIP-C01)
Domain 2 — Development and Implementation of FM Applications
Task 2.5 — Describe methods to integrate FM applications into existing systems
Skill 2.5.6 — Improve troubleshooting efficiency for FM applications
Focus Areas CloudWatch Logs Insights for prompt/response analysis, X-Ray for FM API call tracing, Amazon Q Developer for GenAI-specific error pattern recognition

1. Troubleshooting Efficiency Mindmap

mindmap
  root((FM Troubleshooting<br/>Efficiency))
    CloudWatch Logs Insights
      Prompt/Response Analysis
        Token Count Tracking
        Latency Distribution
        Error Rate Correlation
      Log Query Patterns
        fields @timestamp, @message
        filter by model_id
        stats aggregation
      Structured Log Parsing
        JSON Path Extraction
        Regex Pattern Matching
        Metric Filters
      Dashboards
        Real-Time Error Panels
        Latency Percentile Graphs
        Token Usage Heatmaps
    X-Ray Tracing
      FM API Call Tracing
        Bedrock InvokeModel Segments
        OpenSearch Vector Query Subsegments
        DynamoDB Session Lookup Subsegments
      Service Map
        End-to-End Call Visualization
        Latency Hotspot Detection
        Error Propagation Paths
      Trace Analysis
        Cold Start Identification
        Throttle Pattern Detection
        Retry Storm Analysis
      Sampling Rules
        Dynamic Rate Adjustment
        Error-Biased Sampling
        High-Latency Capture
    Amazon Q Developer
      GenAI Error Pattern Recognition
        Model Timeout Classification
        Token Limit Violations
        Content Filter Triggers
      Code Analysis
        SDK Misconfiguration Detection
        Prompt Template Errors
        Response Parsing Failures
      Automated Suggestions
        Fix Recommendations
        Performance Optimization Tips
        Cost Reduction Patterns
      Learning Loop
        Historical Pattern Matching
        Team Knowledge Aggregation
        Runbook Automation
    Error Taxonomy
      Model Errors
        ThrottlingException
        ModelTimeoutException
        ValidationException
      Infrastructure Errors
        NetworkTimeout
        ServiceUnavailable
        MemoryExhaustion
      Application Errors
        PromptTooLarge
        ResponseParseFailure
        ContextWindowExceeded
      Data Errors
        VectorSearchEmpty
        SessionCorruption
        CacheStaleData

2. Architecture Flowchart — MangaAssist Observability and Troubleshooting Stack

flowchart TB
    subgraph UserLayer["User Layer"]
        U[Manga Store<br/>Customer] -->|WebSocket| APIGW[API Gateway<br/>WebSocket]
    end

    subgraph Orchestration["ECS Fargate — Orchestrator"]
        APIGW --> ORC[Orchestrator<br/>Service]
        ORC --> SL[Structured<br/>FM Logger]
        ORC --> XT[X-Ray<br/>Tracer]
    end

    subgraph FMLayer["FM & Data Layer"]
        ORC -->|InvokeModel| BR[Amazon Bedrock<br/>Claude 3 Sonnet/Haiku]
        ORC -->|Vector Search| OS[OpenSearch<br/>Serverless]
        ORC -->|Session R/W| DDB[DynamoDB]
        ORC -->|Cache| RC[ElastiCache<br/>Redis]
    end

    subgraph Observability["Observability Stack"]
        SL -->|Structured JSON| CWL[CloudWatch<br/>Logs]
        XT -->|Trace Segments| XR[AWS X-Ray]
        BR -->|Model Metrics| CWM[CloudWatch<br/>Metrics]
        CWL --> CLI[CloudWatch<br/>Logs Insights]
        XR --> SM[X-Ray<br/>Service Map]
        CWM --> CWD[CloudWatch<br/>Dashboards]
    end

    subgraph Troubleshooting["Troubleshooting & Alerting"]
        CLI --> EPD[Error Pattern<br/>Detector]
        SM --> LA[Latency<br/>Analyzer]
        CWD --> AL[CloudWatch<br/>Alarms]
        EPD --> QD[Amazon Q<br/>Developer]
        LA --> QD
        AL --> SNS[SNS<br/>Notifications]
        QD --> RB[Runbook<br/>Automation]
        RB --> SSM[Systems Manager<br/>Automation]
    end

    subgraph ResponseLoop["Feedback Loop"]
        SSM -->|Auto-Remediate| ORC
        SNS -->|Alert Ops Team| OPS[Operations<br/>Team]
        OPS -->|Manual Intervention| ORC
    end

    style UserLayer fill:#e8f4f8,stroke:#2196F3
    style Orchestration fill:#fff3e0,stroke:#FF9800
    style FMLayer fill:#f3e5f5,stroke:#9C27B0
    style Observability fill:#e8f5e9,stroke:#4CAF50
    style Troubleshooting fill:#fce4ec,stroke:#E91E63
    style ResponseLoop fill:#fffde7,stroke:#FFC107

3. CloudWatch Logs Insights for Prompt/Response Analysis

3.1 Structured Log Format for FM Interactions

Every FM call in MangaAssist produces a structured JSON log entry with fields designed for Logs Insights queries.

import json
import time
import hashlib
import logging
from datetime import datetime, timezone
from typing import Optional, Dict, Any, List
from dataclasses import dataclass, field, asdict
from enum import Enum


class FMCallStatus(Enum):
    """Status values for FM API calls."""
    SUCCESS = "SUCCESS"
    ERROR = "ERROR"
    TIMEOUT = "TIMEOUT"
    THROTTLED = "THROTTLED"
    CONTENT_FILTERED = "CONTENT_FILTERED"
    PARTIAL = "PARTIAL"


class ErrorSeverity(Enum):
    """Severity levels for FM errors."""
    LOW = "LOW"
    MEDIUM = "MEDIUM"
    HIGH = "HIGH"
    CRITICAL = "CRITICAL"


@dataclass
class FMCallMetadata:
    """Metadata captured for every FM API call."""
    request_id: str
    session_id: str
    user_id: str
    conversation_turn: int
    model_id: str
    model_version: str
    region: str
    timestamp_iso: str
    timestamp_epoch_ms: int


@dataclass
class PromptMetrics:
    """Metrics extracted from the prompt side of an FM call."""
    prompt_template_id: str
    prompt_token_count: int
    system_prompt_token_count: int
    context_token_count: int
    user_message_token_count: int
    total_input_tokens: int
    prompt_hash: str
    contains_manga_title: bool
    contains_author_name: bool
    language: str
    rag_chunks_included: int
    rag_relevance_scores: List[float]


@dataclass
class ResponseMetrics:
    """Metrics extracted from the response side of an FM call."""
    output_token_count: int
    stop_reason: str
    response_length_chars: int
    response_language: str
    contains_product_recommendation: bool
    recommendation_count: int
    confidence_score: Optional[float]
    content_filter_triggered: bool
    content_filter_reason: Optional[str]


@dataclass
class LatencyBreakdown:
    """Granular latency breakdown for an FM call chain."""
    total_e2e_ms: float
    api_gateway_ms: float
    orchestrator_processing_ms: float
    rag_retrieval_ms: float
    opensearch_query_ms: float
    dynamodb_session_lookup_ms: float
    redis_cache_check_ms: float
    bedrock_invoke_ms: float
    bedrock_first_token_ms: float
    response_parsing_ms: float
    websocket_delivery_ms: float


@dataclass
class ErrorDetails:
    """Detailed error information for failed FM calls."""
    error_type: str
    error_code: str
    error_message: str
    error_severity: str
    retry_count: int
    is_retryable: bool
    root_cause_category: str
    stack_trace_hash: str
    upstream_service: str


@dataclass
class CostMetrics:
    """Cost tracking for each FM call."""
    input_cost_usd: float
    output_cost_usd: float
    total_cost_usd: float
    model_pricing_tier: str
    was_cached: bool
    cache_hit_savings_usd: float


@dataclass
class FMCallLogEntry:
    """Complete structured log entry for a single FM interaction."""
    log_type: str = "FM_CALL"
    metadata: Optional[FMCallMetadata] = None
    status: str = FMCallStatus.SUCCESS.value
    prompt_metrics: Optional[PromptMetrics] = None
    response_metrics: Optional[ResponseMetrics] = None
    latency: Optional[LatencyBreakdown] = None
    error: Optional[ErrorDetails] = None
    cost: Optional[CostMetrics] = None
    tags: Dict[str, str] = field(default_factory=dict)

    def to_json(self) -> str:
        """Serialize the log entry to JSON for CloudWatch ingestion."""
        data = {}
        for k, v in asdict(self).items():
            if v is not None:
                data[k] = v
        return json.dumps(data, default=str)


class StructuredFMLogger:
    """
    Structured logger for FM interactions in MangaAssist.

    Produces JSON log entries optimized for CloudWatch Logs Insights
    queries. Every FM call, whether successful or failed, is logged
    with consistent fields to enable efficient troubleshooting.

    Usage:
        logger = StructuredFMLogger(service_name="manga-assist-orchestrator")
        entry = logger.log_fm_call(
            request_id="req-abc123",
            session_id="sess-xyz",
            user_id="user-001",
            model_id="anthropic.claude-3-sonnet-20240229-v1:0",
            ...
        )
    """

    # Pricing per 1M tokens for supported models
    MODEL_PRICING = {
        "anthropic.claude-3-sonnet": {"input": 3.00, "output": 15.00},
        "anthropic.claude-3-haiku": {"input": 0.25, "output": 1.25},
    }

    def __init__(self, service_name: str, region: str = "ap-northeast-1"):
        self.service_name = service_name
        self.region = region
        self.logger = logging.getLogger(f"fm.{service_name}")
        self._configure_handler()

    def _configure_handler(self) -> None:
        """Configure JSON handler for CloudWatch agent pickup."""
        handler = logging.StreamHandler()
        handler.setFormatter(logging.Formatter("%(message)s"))
        self.logger.addHandler(handler)
        self.logger.setLevel(logging.INFO)

    def _compute_prompt_hash(self, prompt_text: str) -> str:
        """Hash prompt text for deduplication analysis without storing PII."""
        return hashlib.sha256(prompt_text.encode("utf-8")).hexdigest()[:16]

    def _resolve_model_key(self, model_id: str) -> str:
        """Resolve full model ID to pricing key."""
        for key in self.MODEL_PRICING:
            if key in model_id:
                return key
        return "anthropic.claude-3-haiku"

    def _calculate_cost(
        self, model_id: str, input_tokens: int, output_tokens: int, was_cached: bool
    ) -> CostMetrics:
        """Calculate cost metrics for an FM call."""
        model_key = self._resolve_model_key(model_id)
        pricing = self.MODEL_PRICING[model_key]

        input_cost = (input_tokens / 1_000_000) * pricing["input"]
        output_cost = (output_tokens / 1_000_000) * pricing["output"]
        total_cost = input_cost + output_cost

        cache_savings = total_cost * 0.9 if was_cached else 0.0

        return CostMetrics(
            input_cost_usd=round(input_cost, 8),
            output_cost_usd=round(output_cost, 8),
            total_cost_usd=round(total_cost if not was_cached else total_cost * 0.1, 8),
            model_pricing_tier=model_key,
            was_cached=was_cached,
            cache_hit_savings_usd=round(cache_savings, 8),
        )

    def log_fm_call(
        self,
        request_id: str,
        session_id: str,
        user_id: str,
        conversation_turn: int,
        model_id: str,
        model_version: str,
        prompt_template_id: str,
        prompt_text: str,
        system_prompt_tokens: int,
        context_tokens: int,
        user_message_tokens: int,
        output_tokens: int,
        stop_reason: str,
        response_text: str,
        latency_breakdown: Dict[str, float],
        rag_chunks: int,
        rag_scores: List[float],
        status: FMCallStatus = FMCallStatus.SUCCESS,
        error_details: Optional[Dict[str, Any]] = None,
        was_cached: bool = False,
        tags: Optional[Dict[str, str]] = None,
    ) -> FMCallLogEntry:
        """
        Log a complete FM call with all metrics.

        This is the primary logging method called by the orchestrator
        after every Bedrock InvokeModel call completes (or fails).
        """
        now = datetime.now(timezone.utc)
        total_input = system_prompt_tokens + context_tokens + user_message_tokens

        metadata = FMCallMetadata(
            request_id=request_id,
            session_id=session_id,
            user_id=user_id,
            conversation_turn=conversation_turn,
            model_id=model_id,
            model_version=model_version,
            region=self.region,
            timestamp_iso=now.isoformat(),
            timestamp_epoch_ms=int(now.timestamp() * 1000),
        )

        prompt_metrics = PromptMetrics(
            prompt_template_id=prompt_template_id,
            prompt_token_count=total_input,
            system_prompt_token_count=system_prompt_tokens,
            context_token_count=context_tokens,
            user_message_token_count=user_message_tokens,
            total_input_tokens=total_input,
            prompt_hash=self._compute_prompt_hash(prompt_text),
            contains_manga_title=any(
                tag in prompt_text.lower()
                for tag in ["manga", "comic", "volume", "chapter"]
            ),
            contains_author_name=False,
            language="ja" if any("\u3040" <= c <= "\u309f" for c in prompt_text) else "en",
            rag_chunks_included=rag_chunks,
            rag_relevance_scores=rag_scores,
        )

        response_metrics = ResponseMetrics(
            output_token_count=output_tokens,
            stop_reason=stop_reason,
            response_length_chars=len(response_text),
            response_language="ja" if any("\u3040" <= c <= "\u309f" for c in response_text) else "en",
            contains_product_recommendation="isbn" in response_text.lower()
            or "price" in response_text.lower(),
            recommendation_count=response_text.lower().count("isbn"),
            confidence_score=None,
            content_filter_triggered=status == FMCallStatus.CONTENT_FILTERED,
            content_filter_reason=None,
        )

        latency = LatencyBreakdown(
            total_e2e_ms=latency_breakdown.get("total_e2e_ms", 0),
            api_gateway_ms=latency_breakdown.get("api_gateway_ms", 0),
            orchestrator_processing_ms=latency_breakdown.get("orchestrator_processing_ms", 0),
            rag_retrieval_ms=latency_breakdown.get("rag_retrieval_ms", 0),
            opensearch_query_ms=latency_breakdown.get("opensearch_query_ms", 0),
            dynamodb_session_lookup_ms=latency_breakdown.get("dynamodb_session_lookup_ms", 0),
            redis_cache_check_ms=latency_breakdown.get("redis_cache_check_ms", 0),
            bedrock_invoke_ms=latency_breakdown.get("bedrock_invoke_ms", 0),
            bedrock_first_token_ms=latency_breakdown.get("bedrock_first_token_ms", 0),
            response_parsing_ms=latency_breakdown.get("response_parsing_ms", 0),
            websocket_delivery_ms=latency_breakdown.get("websocket_delivery_ms", 0),
        )

        cost = self._calculate_cost(model_id, total_input, output_tokens, was_cached)

        error = None
        if error_details:
            error = ErrorDetails(
                error_type=error_details.get("error_type", "Unknown"),
                error_code=error_details.get("error_code", "UNKNOWN"),
                error_message=error_details.get("error_message", ""),
                error_severity=error_details.get("error_severity", ErrorSeverity.MEDIUM.value),
                retry_count=error_details.get("retry_count", 0),
                is_retryable=error_details.get("is_retryable", False),
                root_cause_category=error_details.get("root_cause_category", "unknown"),
                stack_trace_hash=error_details.get("stack_trace_hash", ""),
                upstream_service=error_details.get("upstream_service", ""),
            )

        entry = FMCallLogEntry(
            log_type="FM_CALL",
            metadata=metadata,
            status=status.value,
            prompt_metrics=prompt_metrics,
            response_metrics=response_metrics,
            latency=latency,
            error=error,
            cost=cost,
            tags=tags or {},
        )

        self.logger.info(entry.to_json())
        return entry

    def log_rag_retrieval(
        self,
        request_id: str,
        session_id: str,
        query_text: str,
        opensearch_index: str,
        vector_dimension: int,
        k_results: int,
        results_returned: int,
        relevance_scores: List[float],
        query_latency_ms: float,
        filter_applied: Optional[str] = None,
    ) -> None:
        """Log RAG vector retrieval step separately for granular analysis."""
        log_entry = {
            "log_type": "RAG_RETRIEVAL",
            "request_id": request_id,
            "session_id": session_id,
            "query_hash": hashlib.sha256(query_text.encode()).hexdigest()[:16],
            "opensearch_index": opensearch_index,
            "vector_dimension": vector_dimension,
            "k_requested": k_results,
            "results_returned": results_returned,
            "relevance_scores": relevance_scores,
            "avg_relevance": round(sum(relevance_scores) / len(relevance_scores), 4)
            if relevance_scores
            else 0,
            "min_relevance": min(relevance_scores) if relevance_scores else 0,
            "max_relevance": max(relevance_scores) if relevance_scores else 0,
            "query_latency_ms": query_latency_ms,
            "filter_applied": filter_applied,
            "timestamp": datetime.now(timezone.utc).isoformat(),
        }
        self.logger.info(json.dumps(log_entry))

    def log_session_event(
        self,
        session_id: str,
        user_id: str,
        event_type: str,
        details: Dict[str, Any],
    ) -> None:
        """Log session lifecycle events (start, end, timeout, error)."""
        log_entry = {
            "log_type": "SESSION_EVENT",
            "session_id": session_id,
            "user_id": user_id,
            "event_type": event_type,
            "details": details,
            "timestamp": datetime.now(timezone.utc).isoformat(),
        }
        self.logger.info(json.dumps(log_entry))

3.2 CloudWatch Logs Insights Query Examples

These queries are designed for the MangaAssist log group and leverage the structured JSON fields produced by StructuredFMLogger.

# ── Query 1: P50/P90/P99 Bedrock Invoke Latency Over Time ──
# Use: Identify latency degradation trends for Bedrock calls.
fields @timestamp, latency.bedrock_invoke_ms as bedrock_ms
| filter log_type = "FM_CALL"
| filter status = "SUCCESS"
| stats
    avg(bedrock_ms) as avg_latency,
    percentile(bedrock_ms, 50) as p50,
    percentile(bedrock_ms, 90) as p90,
    percentile(bedrock_ms, 99) as p99,
    count(*) as call_count
  by bin(5m) as time_bucket
| sort time_bucket desc

# ── Query 2: Error Rate by Model and Error Type ──
# Use: Pinpoint which model variant generates the most errors.
fields @timestamp, metadata.model_id, error.error_type, error.error_code
| filter log_type = "FM_CALL"
| filter status != "SUCCESS"
| stats count(*) as error_count by metadata.model_id, error.error_type
| sort error_count desc
| limit 20

# ── Query 3: Token Usage Distribution (Input vs Output) ──
# Use: Detect prompt bloat or unexpectedly long responses.
fields @timestamp,
       prompt_metrics.total_input_tokens as input_tokens,
       response_metrics.output_token_count as output_tokens,
       metadata.model_id
| filter log_type = "FM_CALL"
| stats
    avg(input_tokens) as avg_input,
    max(input_tokens) as max_input,
    avg(output_tokens) as avg_output,
    max(output_tokens) as max_output,
    sum(input_tokens) as total_input,
    sum(output_tokens) as total_output
  by bin(1h)

# ── Query 4: Slow RAG Retrieval Correlation with Response Latency ──
# Use: Determine if OpenSearch is the bottleneck.
fields @timestamp,
       latency.opensearch_query_ms as os_ms,
       latency.bedrock_invoke_ms as bedrock_ms,
       latency.total_e2e_ms as total_ms
| filter log_type = "FM_CALL"
| filter latency.total_e2e_ms > 3000
| stats
    avg(os_ms) as avg_os,
    avg(bedrock_ms) as avg_bedrock,
    avg(total_ms) as avg_total,
    count(*) as slow_count
  by bin(15m)
| sort slow_count desc

# ── Query 5: Content Filter Trigger Analysis ──
# Use: Understand what inputs trigger content filters.
fields @timestamp,
       metadata.request_id,
       metadata.session_id,
       prompt_metrics.prompt_template_id,
       prompt_metrics.language,
       response_metrics.content_filter_reason
| filter log_type = "FM_CALL"
| filter status = "CONTENT_FILTERED"
| stats count(*) as filter_count by prompt_metrics.prompt_template_id, prompt_metrics.language
| sort filter_count desc

# ── Query 6: Cost Analysis by Model Over Time ──
# Use: Track daily spend and identify cost anomalies.
fields @timestamp,
       metadata.model_id,
       cost.total_cost_usd,
       cost.was_cached,
       cost.cache_hit_savings_usd
| filter log_type = "FM_CALL"
| stats
    sum(cost.total_cost_usd) as total_spend,
    sum(cost.cache_hit_savings_usd) as total_savings,
    count(*) as call_count,
    avg(cost.total_cost_usd) as avg_cost_per_call
  by bin(1d), metadata.model_id
| sort total_spend desc

# ── Query 7: RAG Retrieval Quality Over Time ──
# Use: Detect degradation in vector search relevance.
fields @timestamp, avg_relevance, results_returned, query_latency_ms
| filter log_type = "RAG_RETRIEVAL"
| stats
    avg(avg_relevance) as mean_relevance,
    min(min_relevance) as worst_relevance,
    avg(query_latency_ms) as avg_latency,
    count(*) as retrieval_count
  by bin(1h)
| sort @timestamp desc

# ── Query 8: Throttling Pattern Detection ──
# Use: Identify time windows with high throttle rates.
fields @timestamp, metadata.model_id, error.retry_count
| filter log_type = "FM_CALL"
| filter status = "THROTTLED"
| stats
    count(*) as throttle_count,
    avg(error.retry_count) as avg_retries,
    max(error.retry_count) as max_retries
  by bin(5m), metadata.model_id
| sort throttle_count desc
| limit 50

4. X-Ray Tracing for FM API Call Chains

4.1 X-Ray FM Tracer

import time
import json
import uuid
import traceback
from typing import Optional, Dict, Any, List, Callable
from dataclasses import dataclass, field
from contextlib import contextmanager
from functools import wraps


@dataclass
class TraceAnnotation:
    """Annotations attached to X-Ray segments for filtering."""
    key: str
    value: Any  # str, int, or bool only for X-Ray annotations


@dataclass
class TraceMetadataEntry:
    """Metadata attached to X-Ray segments for detailed debug info."""
    namespace: str
    key: str
    value: Any


@dataclass
class SubsegmentTiming:
    """Timing record for a subsegment within a trace."""
    name: str
    start_time: float
    end_time: float
    duration_ms: float
    is_error: bool
    is_throttle: bool
    is_fault: bool
    annotations: List[TraceAnnotation]
    metadata: List[TraceMetadataEntry]


class XRayFMTracer:
    """
    X-Ray tracing wrapper tailored for FM API call chains in MangaAssist.

    Provides segment/subsegment management for the full request lifecycle:
      API Gateway -> Orchestrator -> Redis -> DynamoDB -> OpenSearch -> Bedrock -> Response

    Each service call becomes a subsegment with typed annotations for
    efficient filtering in the X-Ray console and analytics.

    Usage:
        tracer = XRayFMTracer(service_name="manga-assist-orchestrator")
        with tracer.trace_fm_request(request_id, session_id, model_id) as ctx:
            with ctx.trace_redis_cache("cache-check"):
                result = redis_client.get(cache_key)
            with ctx.trace_bedrock_invoke("claude-sonnet"):
                response = bedrock.invoke_model(...)
    """

    def __init__(self, service_name: str, sampling_rate: float = 0.05):
        self.service_name = service_name
        self.sampling_rate = sampling_rate
        self._xray_recorder = self._init_recorder()
        self._active_segments: Dict[str, Any] = {}

    def _init_recorder(self):
        """
        Initialize the X-Ray recorder with daemon address and plugins.

        In production, aws_xray_sdk.core.xray_recorder is used.
        Here we show the configuration pattern.
        """
        try:
            from aws_xray_sdk.core import xray_recorder, patch_all

            xray_recorder.configure(
                service=self.service_name,
                sampling=True,
                context_missing="LOG_ERROR",
                plugins=("ECSPlugin",),
                daemon_address="127.0.0.1:2000",
            )
            # Patch boto3, requests, etc. for automatic subsegment creation
            patch_all()
            return xray_recorder
        except ImportError:
            return None

    @contextmanager
    def trace_fm_request(self, request_id: str, session_id: str, model_id: str):
        """
        Create a top-level segment for an FM request.

        Yields a TraceContext that provides subsegment helpers.
        """
        ctx = FMTraceContext(
            tracer=self,
            request_id=request_id,
            session_id=session_id,
            model_id=model_id,
        )
        segment_name = f"{self.service_name}##fm-request"

        if self._xray_recorder:
            segment = self._xray_recorder.begin_segment(
                name=segment_name,
                traceid=None,  # auto-generate
                sampling=True,
            )
            segment.put_annotation("request_id", request_id)
            segment.put_annotation("session_id", session_id)
            segment.put_annotation("model_id", model_id)
            segment.put_annotation("service", self.service_name)
            ctx._segment = segment

        ctx._start_time = time.time()

        try:
            yield ctx
        except Exception as e:
            ctx._is_error = True
            ctx._error_message = str(e)
            if self._xray_recorder and ctx._segment:
                ctx._segment.add_exception(e, traceback.extract_stack())
            raise
        finally:
            ctx._end_time = time.time()
            ctx._duration_ms = (ctx._end_time - ctx._start_time) * 1000
            if self._xray_recorder and ctx._segment:
                ctx._segment.put_metadata(
                    "timing_summary",
                    ctx.get_timing_summary(),
                    "fm_trace",
                )
                self._xray_recorder.end_segment()

    def create_sampling_rule(
        self,
        rule_name: str,
        priority: int,
        fixed_rate: float,
        reservoir_size: int,
        service_name: str,
        url_path: str = "*",
        http_method: str = "*",
    ) -> Dict[str, Any]:
        """
        Generate an X-Ray sampling rule definition.

        For MangaAssist, we want higher sampling on error paths and
        lower sampling on healthy paths to control cost.
        """
        return {
            "SamplingRule": {
                "RuleName": rule_name,
                "Priority": priority,
                "FixedRate": fixed_rate,
                "ReservoirSize": reservoir_size,
                "ServiceName": service_name,
                "ServiceType": "AWS::ECS::Container",
                "Host": "*",
                "HTTPMethod": http_method,
                "URLPath": url_path,
                "ResourceARN": "*",
                "Version": 1,
                "Attributes": {},
            }
        }

    def get_sampling_rules_for_mangaassist(self) -> List[Dict[str, Any]]:
        """
        Return recommended sampling rules for MangaAssist.

        At 1M messages/day, full sampling would be cost-prohibitive.
        These rules balance observability with cost.
        """
        return [
            self.create_sampling_rule(
                rule_name="MangaAssist-Errors",
                priority=1,
                fixed_rate=1.0,       # Sample 100% of errors
                reservoir_size=100,
                service_name=self.service_name,
            ),
            self.create_sampling_rule(
                rule_name="MangaAssist-SlowRequests",
                priority=2,
                fixed_rate=0.5,       # Sample 50% of slow requests
                reservoir_size=50,
                service_name=self.service_name,
            ),
            self.create_sampling_rule(
                rule_name="MangaAssist-Normal",
                priority=10,
                fixed_rate=0.01,      # Sample 1% of normal traffic
                reservoir_size=10,
                service_name=self.service_name,
            ),
        ]


class FMTraceContext:
    """
    Context object yielded by XRayFMTracer.trace_fm_request().

    Provides typed subsegment helpers for each downstream service call.
    """

    def __init__(
        self,
        tracer: XRayFMTracer,
        request_id: str,
        session_id: str,
        model_id: str,
    ):
        self._tracer = tracer
        self._request_id = request_id
        self._session_id = session_id
        self._model_id = model_id
        self._segment = None
        self._start_time: float = 0
        self._end_time: float = 0
        self._duration_ms: float = 0
        self._is_error: bool = False
        self._error_message: str = ""
        self._subsegment_timings: List[SubsegmentTiming] = []

    @contextmanager
    def _trace_subsegment(
        self,
        name: str,
        service_type: str,
        annotations: Optional[Dict[str, Any]] = None,
        metadata: Optional[Dict[str, Any]] = None,
    ):
        """Generic subsegment tracer for any downstream call."""
        start = time.time()
        is_error = False
        is_throttle = False
        is_fault = False

        recorder = self._tracer._xray_recorder
        subsegment = None
        if recorder:
            subsegment = recorder.begin_subsegment(name)
            subsegment.put_annotation("service_type", service_type)
            subsegment.put_annotation("request_id", self._request_id)
            if annotations:
                for k, v in annotations.items():
                    subsegment.put_annotation(k, v)
            if metadata:
                for k, v in metadata.items():
                    subsegment.put_metadata(k, v, "fm_trace")

        try:
            yield subsegment
        except Exception as e:
            is_error = True
            error_str = str(e).lower()
            if "throttl" in error_str:
                is_throttle = True
            if "internal" in error_str or "fault" in error_str:
                is_fault = True
            if subsegment:
                subsegment.add_exception(e, traceback.extract_stack())
            raise
        finally:
            end = time.time()
            duration = (end - start) * 1000
            if subsegment:
                recorder.end_subsegment()
            timing = SubsegmentTiming(
                name=name,
                start_time=start,
                end_time=end,
                duration_ms=duration,
                is_error=is_error,
                is_throttle=is_throttle,
                is_fault=is_fault,
                annotations=[
                    TraceAnnotation(k, v) for k, v in (annotations or {}).items()
                ],
                metadata=[
                    TraceMetadataEntry("fm_trace", k, v) for k, v in (metadata or {}).items()
                ],
            )
            self._subsegment_timings.append(timing)

    @contextmanager
    def trace_redis_cache(self, operation: str = "get"):
        """Trace a Redis cache check/set operation."""
        with self._trace_subsegment(
            name="ElastiCache-Redis",
            service_type="AWS::ElastiCache::Redis",
            annotations={"cache_operation": operation},
        ) as subseg:
            yield subseg

    @contextmanager
    def trace_dynamodb(self, table_name: str, operation: str = "GetItem"):
        """Trace a DynamoDB operation (session lookup, product fetch)."""
        with self._trace_subsegment(
            name=f"DynamoDB-{table_name}",
            service_type="AWS::DynamoDB::Table",
            annotations={"table_name": table_name, "ddb_operation": operation},
        ) as subseg:
            yield subseg

    @contextmanager
    def trace_opensearch(self, index_name: str, k_results: int = 5):
        """Trace an OpenSearch vector search query."""
        with self._trace_subsegment(
            name="OpenSearch-VectorSearch",
            service_type="AWS::OpenSearchServerless",
            annotations={"index_name": index_name, "k_results": k_results},
        ) as subseg:
            yield subseg

    @contextmanager
    def trace_bedrock_invoke(
        self, model_id: str, input_tokens: int = 0, max_tokens: int = 1024
    ):
        """Trace a Bedrock InvokeModel call with model-specific annotations."""
        with self._trace_subsegment(
            name="Bedrock-InvokeModel",
            service_type="AWS::Bedrock::InvokeModel",
            annotations={
                "model_id": model_id,
                "input_tokens": input_tokens,
                "max_tokens": max_tokens,
            },
            metadata={"model_config": {"model_id": model_id, "max_tokens": max_tokens}},
        ) as subseg:
            yield subseg

    @contextmanager
    def trace_websocket_response(self):
        """Trace the WebSocket response delivery to the client."""
        with self._trace_subsegment(
            name="APIGateway-WebSocket-Send",
            service_type="AWS::ApiGatewayV2::WebSocket",
        ) as subseg:
            yield subseg

    def get_timing_summary(self) -> Dict[str, Any]:
        """
        Return a summary of all subsegment timings for this request.

        Used for both X-Ray metadata and structured logging.
        """
        summary = {
            "request_id": self._request_id,
            "total_duration_ms": self._duration_ms,
            "is_error": self._is_error,
            "subsegments": [],
        }
        for t in self._subsegment_timings:
            summary["subsegments"].append({
                "name": t.name,
                "duration_ms": round(t.duration_ms, 2),
                "is_error": t.is_error,
                "is_throttle": t.is_throttle,
            })
        total_subseg = sum(t.duration_ms for t in self._subsegment_timings)
        summary["subsegment_total_ms"] = round(total_subseg, 2)
        summary["orchestrator_overhead_ms"] = round(self._duration_ms - total_subseg, 2)
        return summary

5. Amazon Q Developer for GenAI-Specific Error Pattern Recognition

5.1 Error Pattern Detector

import re
import json
import time
from datetime import datetime, timezone, timedelta
from typing import Optional, Dict, Any, List, Tuple
from dataclasses import dataclass, field
from enum import Enum
from collections import Counter, defaultdict


class ErrorCategory(Enum):
    """Top-level error categories for FM applications."""
    MODEL_THROTTLE = "MODEL_THROTTLE"
    MODEL_TIMEOUT = "MODEL_TIMEOUT"
    MODEL_VALIDATION = "MODEL_VALIDATION"
    CONTENT_FILTER = "CONTENT_FILTER"
    TOKEN_LIMIT = "TOKEN_LIMIT"
    CONTEXT_WINDOW = "CONTEXT_WINDOW"
    RAG_RETRIEVAL = "RAG_RETRIEVAL"
    SESSION_CORRUPTION = "SESSION_CORRUPTION"
    CACHE_FAILURE = "CACHE_FAILURE"
    NETWORK = "NETWORK"
    INFRASTRUCTURE = "INFRASTRUCTURE"
    SDK_MISCONFIGURATION = "SDK_MISCONFIGURATION"
    PROMPT_TEMPLATE = "PROMPT_TEMPLATE"
    RESPONSE_PARSE = "RESPONSE_PARSE"
    UNKNOWN = "UNKNOWN"


class RemediationAction(Enum):
    """Possible automated remediation actions."""
    RETRY_WITH_BACKOFF = "RETRY_WITH_BACKOFF"
    SWITCH_MODEL = "SWITCH_MODEL"
    REDUCE_INPUT_TOKENS = "REDUCE_INPUT_TOKENS"
    CLEAR_CACHE = "CLEAR_CACHE"
    RESET_SESSION = "RESET_SESSION"
    SCALE_UP = "SCALE_UP"
    ALERT_OPS = "ALERT_OPS"
    NO_ACTION = "NO_ACTION"
    TRUNCATE_CONTEXT = "TRUNCATE_CONTEXT"
    FALLBACK_RESPONSE = "FALLBACK_RESPONSE"


@dataclass
class ErrorPattern:
    """
    Defines a recognizable error pattern with its classification,
    root cause analysis, and recommended remediation.
    """
    pattern_id: str
    category: ErrorCategory
    name: str
    description: str
    regex_patterns: List[str]
    error_codes: List[str]
    root_cause: str
    impact: str
    remediation: RemediationAction
    remediation_steps: List[str]
    severity: str
    is_transient: bool
    expected_frequency: str
    q_developer_prompt: str


@dataclass
class PatternMatch:
    """Result of matching an error against known patterns."""
    pattern: ErrorPattern
    confidence: float
    matched_by: str
    error_message: str
    timestamp: str
    request_id: str
    additional_context: Dict[str, Any] = field(default_factory=dict)


@dataclass
class ErrorTrend:
    """Aggregated error trend over a time window."""
    category: ErrorCategory
    count: int
    time_window_minutes: int
    trend_direction: str  # "increasing", "decreasing", "stable"
    rate_per_minute: float
    first_seen: str
    last_seen: str
    affected_sessions: int
    affected_models: List[str]


class ErrorPatternDetector:
    """
    Detects and classifies error patterns in FM application logs.

    Maintains a registry of known error patterns specific to
    MangaAssist's GenAI stack (Bedrock, OpenSearch, DynamoDB,
    Redis, API Gateway). Uses regex matching, error code lookup,
    and contextual analysis to classify errors and recommend
    automated remediation.

    Integration with Amazon Q Developer:
      The detector generates Q Developer prompts for each detected
      pattern, enabling AI-assisted diagnosis and fix suggestions.
    """

    def __init__(self):
        self._patterns: List[ErrorPattern] = []
        self._match_history: List[PatternMatch] = []
        self._error_counts: Counter = Counter()
        self._error_timeline: defaultdict = defaultdict(list)
        self._register_default_patterns()

    def _register_default_patterns(self) -> None:
        """Register all known error patterns for MangaAssist."""
        patterns = [
            ErrorPattern(
                pattern_id="EP-001",
                category=ErrorCategory.MODEL_THROTTLE,
                name="Bedrock Throttling",
                description="InvokeModel requests being throttled due to rate limits",
                regex_patterns=[
                    r"ThrottlingException",
                    r"Rate exceeded",
                    r"Too many requests",
                    r"TooManyRequestsException",
                ],
                error_codes=["ThrottlingException", "TooManyRequestsException"],
                root_cause="Request rate exceeds provisioned throughput or on-demand limits for the selected model",
                impact="Users experience delayed responses or timeout errors. At 1M msgs/day, burst traffic can easily hit limits.",
                remediation=RemediationAction.RETRY_WITH_BACKOFF,
                remediation_steps=[
                    "1. Implement exponential backoff with jitter (base=1s, max=30s)",
                    "2. Check current provisioned throughput in Bedrock console",
                    "3. Request quota increase via AWS Support if sustained",
                    "4. Consider routing overflow to Haiku ($0.25/$1.25) as fallback",
                    "5. Enable request queuing in orchestrator with priority levels",
                ],
                severity="HIGH",
                is_transient=True,
                expected_frequency="< 1% of requests under normal load",
                q_developer_prompt="Analyze this Bedrock ThrottlingException in my ECS Fargate service calling Claude 3. Show me how to implement exponential backoff with jitter and a fallback to Haiku model.",
            ),
            ErrorPattern(
                pattern_id="EP-002",
                category=ErrorCategory.MODEL_TIMEOUT,
                name="Bedrock Invoke Timeout",
                description="InvokeModel call exceeds configured timeout",
                regex_patterns=[
                    r"ReadTimeoutError",
                    r"ConnectTimeoutError",
                    r"ModelTimeoutException",
                    r"Connection reset by peer",
                ],
                error_codes=["ReadTimeoutError", "ModelTimeoutException"],
                root_cause="Model inference takes longer than client timeout, often with large input contexts or during service degradation",
                impact="User gets no response. WebSocket connection may close. Session state may be inconsistent.",
                remediation=RemediationAction.SWITCH_MODEL,
                remediation_steps=[
                    "1. Check input token count — if > 100K, truncate context",
                    "2. If Sonnet times out, retry with Haiku for faster response",
                    "3. Increase client timeout to 60s for large contexts",
                    "4. Implement streaming (InvokeModelWithResponseStream) for perceived speed",
                    "5. Add circuit breaker to prevent cascading timeouts",
                ],
                severity="HIGH",
                is_transient=True,
                expected_frequency="< 0.5% under normal conditions",
                q_developer_prompt="My Bedrock InvokeModel call is timing out. The model is Claude 3 Sonnet with large manga context. How do I implement streaming with InvokeModelWithResponseStream and add a circuit breaker?",
            ),
            ErrorPattern(
                pattern_id="EP-003",
                category=ErrorCategory.TOKEN_LIMIT,
                name="Token Limit Exceeded",
                description="Input or output token count exceeds model limits",
                regex_patterns=[
                    r"ValidationException.*token",
                    r"maximum.*token.*exceeded",
                    r"input.*too.*long",
                    r"context.*length.*exceeded",
                ],
                error_codes=["ValidationException"],
                root_cause="Prompt with system instructions + RAG context + conversation history exceeds model token limit",
                impact="Request fails before inference. User sees error. No cost incurred for failed call.",
                remediation=RemediationAction.TRUNCATE_CONTEXT,
                remediation_steps=[
                    "1. Count tokens before sending (use tiktoken or model tokenizer)",
                    "2. Implement sliding window for conversation history",
                    "3. Limit RAG chunks to top-3 by relevance score",
                    "4. Summarize older conversation turns",
                    "5. Set max_tokens output limit to reserve space",
                ],
                severity="MEDIUM",
                is_transient=False,
                expected_frequency="1-2% for long manga discussion threads",
                q_developer_prompt="I'm getting ValidationException for token limits on Claude 3 Sonnet. My prompt includes system instructions, RAG chunks from OpenSearch, and conversation history from DynamoDB. How do I implement a token budget manager?",
            ),
            ErrorPattern(
                pattern_id="EP-004",
                category=ErrorCategory.RAG_RETRIEVAL,
                name="OpenSearch Vector Search Failure",
                description="Vector similarity search returns empty or irrelevant results",
                regex_patterns=[
                    r"OpenSearchException",
                    r"ConnectionTimeout.*opensearch",
                    r"index_not_found_exception",
                    r"search_phase_execution_exception",
                ],
                error_codes=["OpenSearchException", "ConnectionTimeout"],
                root_cause="OpenSearch Serverless collection unreachable, index not built, or embedding dimension mismatch",
                impact="FM receives no context, producing hallucinated or generic answers about manga products",
                remediation=RemediationAction.FALLBACK_RESPONSE,
                remediation_steps=[
                    "1. Check OpenSearch Serverless collection status",
                    "2. Verify VPC endpoint connectivity from ECS tasks",
                    "3. Confirm embedding dimension matches index mapping (1536 for Titan)",
                    "4. Fall back to keyword search if vector search fails",
                    "5. Return cached popular recommendations as fallback",
                ],
                severity="HIGH",
                is_transient=True,
                expected_frequency="< 0.1% under normal conditions",
                q_developer_prompt="OpenSearch Serverless vector search is failing in my RAG pipeline. The index uses 1536-dimension Titan embeddings. How do I add a keyword search fallback and verify index health?",
            ),
            ErrorPattern(
                pattern_id="EP-005",
                category=ErrorCategory.CONTENT_FILTER,
                name="Content Filter Triggered",
                description="Bedrock content filter blocks input or output",
                regex_patterns=[
                    r"content.*filter",
                    r"blocked.*content.*policy",
                    r"AccessDeniedException.*content",
                    r"guardrail.*blocked",
                ],
                error_codes=["AccessDeniedException"],
                root_cause="User query or FM response contains content that violates Bedrock guardrails or content policies",
                impact="User receives generic error instead of helpful response. May frustrate legitimate queries about mature manga titles.",
                remediation=RemediationAction.FALLBACK_RESPONSE,
                remediation_steps=[
                    "1. Log the blocked content category for pattern analysis",
                    "2. Review guardrail configuration for false positives",
                    "3. Adjust content filter sensitivity for manga-specific terms",
                    "4. Provide user-friendly message explaining the limitation",
                    "5. Offer alternative query suggestions",
                ],
                severity="MEDIUM",
                is_transient=False,
                expected_frequency="2-5% depending on manga genre queries",
                q_developer_prompt="Bedrock content filters are triggering too often for my manga store chatbot. How do I configure Bedrock Guardrails to allow manga-specific terminology while maintaining safety?",
            ),
            ErrorPattern(
                pattern_id="EP-006",
                category=ErrorCategory.SESSION_CORRUPTION,
                name="DynamoDB Session Corruption",
                description="Session data is corrupted or inconsistent in DynamoDB",
                regex_patterns=[
                    r"ConditionalCheckFailedException",
                    r"ValidationException.*attribute",
                    r"SerializationException",
                    r"item.*size.*exceeded",
                ],
                error_codes=["ConditionalCheckFailedException", "SerializationException"],
                root_cause="Concurrent writes to session, item size exceeding 400KB limit, or schema mismatch after deployment",
                impact="Conversation context is lost or incorrect. User may need to restart conversation.",
                remediation=RemediationAction.RESET_SESSION,
                remediation_steps=[
                    "1. Implement optimistic locking with version numbers",
                    "2. Compress conversation history before storage",
                    "3. Archive old turns to S3 if item approaches 400KB",
                    "4. Add schema validation before writes",
                    "5. Reset session gracefully with user notification",
                ],
                severity="MEDIUM",
                is_transient=False,
                expected_frequency="< 0.01% with proper locking",
                q_developer_prompt="DynamoDB ConditionalCheckFailedException on my session table. Multiple ECS tasks are writing to the same session item. How do I implement optimistic locking and handle concurrent writes?",
            ),
            ErrorPattern(
                pattern_id="EP-007",
                category=ErrorCategory.CACHE_FAILURE,
                name="Redis Cache Connection Failure",
                description="ElastiCache Redis connection lost or timeout",
                regex_patterns=[
                    r"ConnectionError.*redis",
                    r"TimeoutError.*redis",
                    r"CLUSTERDOWN",
                    r"MOVED",
                    r"redis.*connection.*refused",
                ],
                error_codes=["ConnectionError", "TimeoutError", "CLUSTERDOWN"],
                root_cause="Redis node failure, network partition, or cluster rebalancing. Also possible during maintenance windows.",
                impact="Cache misses increase Bedrock calls and latency. At 1M msgs/day, every cache miss costs $0.003-$0.015 per call.",
                remediation=RemediationAction.RETRY_WITH_BACKOFF,
                remediation_steps=[
                    "1. Configure Redis client with connection pooling and retry logic",
                    "2. Set socket_timeout=1s, socket_connect_timeout=1s",
                    "3. Implement cache-aside pattern with graceful degradation",
                    "4. Monitor Redis CloudWatch metrics (CacheHitRate, EngineCPUUtilization)",
                    "5. Use Multi-AZ replication for high availability",
                ],
                severity="MEDIUM",
                is_transient=True,
                expected_frequency="< 0.01% with Multi-AZ",
                q_developer_prompt="ElastiCache Redis connection is dropping intermittently from my ECS Fargate tasks. How do I configure connection pooling with retry logic and implement graceful cache-aside degradation?",
            ),
            ErrorPattern(
                pattern_id="EP-008",
                category=ErrorCategory.SDK_MISCONFIGURATION,
                name="Boto3 Client Misconfiguration",
                description="AWS SDK client improperly configured for Bedrock calls",
                regex_patterns=[
                    r"NoCredentialsError",
                    r"EndpointConnectionError",
                    r"ClientError.*UnrecognizedClientException",
                    r"botocore.*ParamValidationError",
                    r"InvalidRegionError",
                ],
                error_codes=["NoCredentialsError", "EndpointConnectionError", "ParamValidationError"],
                root_cause="ECS task role missing Bedrock permissions, incorrect region, or SDK version too old for latest model IDs",
                impact="All FM calls fail. Complete service outage for chatbot functionality.",
                remediation=RemediationAction.ALERT_OPS,
                remediation_steps=[
                    "1. Verify ECS task role has bedrock:InvokeModel permission",
                    "2. Confirm Bedrock region matches client config (ap-northeast-1 for JP)",
                    "3. Update boto3/botocore to latest version",
                    "4. Check model access is enabled in Bedrock console",
                    "5. Validate IAM policy resource ARN for specific model access",
                ],
                severity="CRITICAL",
                is_transient=False,
                expected_frequency="Only after deployments or IAM changes",
                q_developer_prompt="Getting NoCredentialsError when calling Bedrock from ECS Fargate. My task definition uses a task role. How do I debug IAM permission issues for bedrock:InvokeModel in ap-northeast-1?",
            ),
        ]

        for pattern in patterns:
            self._patterns.append(pattern)

    def detect(
        self,
        error_message: str,
        error_code: Optional[str] = None,
        request_id: str = "",
        context: Optional[Dict[str, Any]] = None,
    ) -> List[PatternMatch]:
        """
        Match an error against all registered patterns.

        Returns a list of PatternMatch objects sorted by confidence,
        highest first. Multiple patterns may match (e.g., a throttle
        that also causes a timeout).
        """
        matches: List[PatternMatch] = []
        now_iso = datetime.now(timezone.utc).isoformat()

        for pattern in self._patterns:
            confidence = 0.0
            matched_by = ""

            # Check error code match (highest confidence)
            if error_code and error_code in pattern.error_codes:
                confidence = max(confidence, 0.95)
                matched_by = f"error_code:{error_code}"

            # Check regex patterns
            for regex in pattern.regex_patterns:
                if re.search(regex, error_message, re.IGNORECASE):
                    regex_confidence = 0.85
                    if confidence < regex_confidence:
                        confidence = regex_confidence
                        matched_by = f"regex:{regex}"

            if confidence > 0.5:
                match = PatternMatch(
                    pattern=pattern,
                    confidence=confidence,
                    matched_by=matched_by,
                    error_message=error_message,
                    timestamp=now_iso,
                    request_id=request_id,
                    additional_context=context or {},
                )
                matches.append(match)
                self._error_counts[pattern.category.value] += 1
                self._error_timeline[pattern.category.value].append(now_iso)
                self._match_history.append(match)

        matches.sort(key=lambda m: m.confidence, reverse=True)
        return matches

    def get_trend(
        self,
        category: ErrorCategory,
        window_minutes: int = 60,
    ) -> Optional[ErrorTrend]:
        """
        Analyze error trend for a specific category over a time window.

        Used by the auto-remediation engine to decide whether to
        escalate or self-heal.
        """
        timestamps = self._error_timeline.get(category.value, [])
        if not timestamps:
            return None

        cutoff = datetime.now(timezone.utc) - timedelta(minutes=window_minutes)
        recent = [t for t in timestamps if t >= cutoff.isoformat()]
        count = len(recent)

        if count == 0:
            return None

        rate = count / window_minutes if window_minutes > 0 else 0

        # Simple trend detection: compare first half vs second half
        mid = window_minutes // 2
        mid_cutoff = datetime.now(timezone.utc) - timedelta(minutes=mid)
        first_half = [t for t in recent if t < mid_cutoff.isoformat()]
        second_half = [t for t in recent if t >= mid_cutoff.isoformat()]

        if len(second_half) > len(first_half) * 1.5:
            direction = "increasing"
        elif len(first_half) > len(second_half) * 1.5:
            direction = "decreasing"
        else:
            direction = "stable"

        return ErrorTrend(
            category=category,
            count=count,
            time_window_minutes=window_minutes,
            trend_direction=direction,
            rate_per_minute=round(rate, 4),
            first_seen=recent[0] if recent else "",
            last_seen=recent[-1] if recent else "",
            affected_sessions=0,
            affected_models=[],
        )

    def generate_q_developer_prompt(self, match: PatternMatch) -> str:
        """
        Generate a contextual prompt for Amazon Q Developer
        based on the detected error pattern.
        """
        prompt_parts = [
            f"Error Pattern: {match.pattern.name} ({match.pattern.pattern_id})",
            f"Category: {match.pattern.category.value}",
            f"Error Message: {match.error_message}",
            f"Root Cause: {match.pattern.root_cause}",
            f"Severity: {match.pattern.severity}",
            "",
            f"Q Developer Query: {match.pattern.q_developer_prompt}",
            "",
            "Additional Context:",
        ]
        for k, v in match.additional_context.items():
            prompt_parts.append(f"  - {k}: {v}")

        return "\n".join(prompt_parts)

    def get_error_summary(self) -> Dict[str, Any]:
        """Return a summary of all detected errors for dashboard display."""
        return {
            "total_errors_detected": sum(self._error_counts.values()),
            "by_category": dict(self._error_counts),
            "pattern_count": len(self._patterns),
            "match_history_size": len(self._match_history),
            "top_3_categories": self._error_counts.most_common(3),
        }

6. Error Taxonomy for GenAI Applications

flowchart TD
    subgraph ModelErrors["Model Errors"]
        ME1[ThrottlingException<br/>Rate limit exceeded]
        ME2[ModelTimeoutException<br/>Inference too slow]
        ME3[ValidationException<br/>Invalid parameters]
        ME4[ModelNotReadyException<br/>Model loading]
        ME5[ServiceUnavailableException<br/>Bedrock outage]
    end

    subgraph ContentErrors["Content & Safety Errors"]
        CE1[ContentFilterBlocked<br/>Input/output filtered]
        CE2[GuardrailViolation<br/>Custom guardrail triggered]
        CE3[TokenLimitExceeded<br/>Context too large]
        CE4[ContextWindowOverflow<br/>History too long]
    end

    subgraph DataErrors["Data Layer Errors"]
        DE1[VectorSearchEmpty<br/>No RAG results]
        DE2[VectorSearchTimeout<br/>OpenSearch slow]
        DE3[SessionNotFound<br/>DynamoDB miss]
        DE4[SessionCorrupted<br/>Schema mismatch]
        DE5[CacheMiss<br/>Redis unavailable]
        DE6[StaleCache<br/>Outdated data served]
    end

    subgraph InfraErrors["Infrastructure Errors"]
        IE1[ECSTaskOOM<br/>Memory exhaustion]
        IE2[ECSTaskCrash<br/>Container failure]
        IE3[WebSocketDisconnect<br/>Client dropped]
        IE4[VPCNetworkError<br/>Connectivity loss]
        IE5[IAMPermissionDenied<br/>Role misconfigured]
    end

    subgraph AppErrors["Application Logic Errors"]
        AE1[PromptTemplateError<br/>Bad template render]
        AE2[ResponseParseError<br/>Unexpected output format]
        AE3[SDKVersionMismatch<br/>Old boto3]
        AE4[ConfigMissing<br/>Environment variable absent]
        AE5[RetryExhausted<br/>All retries failed]
    end

    ME1 --> R1[Retry with backoff]
    ME2 --> R2[Switch to Haiku]
    ME3 --> R3[Fix parameters]
    CE1 --> R4[Adjust guardrails]
    CE3 --> R5[Truncate context]
    DE1 --> R6[Keyword fallback]
    DE5 --> R7[Skip cache]
    IE1 --> R8[Scale up task]
    IE5 --> R9[Fix IAM role]
    AE1 --> R10[Fix template]

    style ModelErrors fill:#ffebee,stroke:#c62828
    style ContentErrors fill:#fff3e0,stroke:#e65100
    style DataErrors fill:#e8f5e9,stroke:#2e7d32
    style InfraErrors fill:#e3f2fd,stroke:#1565c0
    style AppErrors fill:#f3e5f5,stroke:#6a1b9a

7. FMLogAnalyzer — CloudWatch Insights Automation

import json
import time
import boto3
from datetime import datetime, timezone, timedelta
from typing import Optional, Dict, Any, List, Tuple
from dataclasses import dataclass


@dataclass
class InsightsQueryResult:
    """Result from a CloudWatch Logs Insights query."""
    query_id: str
    status: str
    results: List[Dict[str, str]]
    statistics: Dict[str, float]
    query_text: str
    duration_ms: float


class FMLogAnalyzer:
    """
    Automated CloudWatch Logs Insights query engine for FM applications.

    Provides pre-built queries for common MangaAssist troubleshooting
    scenarios and the ability to run custom queries. Results are
    formatted for dashboard consumption or alert rule evaluation.

    Usage:
        analyzer = FMLogAnalyzer(
            log_group="/ecs/manga-assist-orchestrator",
            region="ap-northeast-1"
        )
        # Check for latency spikes
        result = analyzer.query_latency_percentiles(window_minutes=30)
        # Check error rates by model
        result = analyzer.query_error_distribution()
    """

    def __init__(
        self,
        log_group: str,
        region: str = "ap-northeast-1",
        max_results: int = 1000,
    ):
        self.log_group = log_group
        self.region = region
        self.max_results = max_results
        self.client = boto3.client("logs", region_name=region)

    def _run_query(
        self,
        query: str,
        start_time: datetime,
        end_time: datetime,
    ) -> InsightsQueryResult:
        """Execute a CloudWatch Logs Insights query and wait for results."""
        start_epoch = int(start_time.timestamp())
        end_epoch = int(end_time.timestamp())
        t0 = time.time()

        response = self.client.start_query(
            logGroupName=self.log_group,
            startTime=start_epoch,
            endTime=end_epoch,
            queryString=query,
            limit=self.max_results,
        )
        query_id = response["queryId"]

        # Poll for query completion
        while True:
            result = self.client.get_query_results(queryId=query_id)
            if result["status"] in ("Complete", "Failed", "Cancelled"):
                break
            time.sleep(0.5)

        duration = (time.time() - t0) * 1000

        formatted_results = []
        for row in result.get("results", []):
            entry = {}
            for field_obj in row:
                entry[field_obj["field"]] = field_obj["value"]
            formatted_results.append(entry)

        return InsightsQueryResult(
            query_id=query_id,
            status=result["status"],
            results=formatted_results,
            statistics=result.get("statistics", {}),
            query_text=query,
            duration_ms=round(duration, 2),
        )

    def query_latency_percentiles(
        self,
        window_minutes: int = 60,
        bucket_minutes: int = 5,
    ) -> InsightsQueryResult:
        """Query P50/P90/P99 latency for Bedrock calls."""
        end_time = datetime.now(timezone.utc)
        start_time = end_time - timedelta(minutes=window_minutes)

        query = f"""
fields @timestamp, latency.bedrock_invoke_ms as bedrock_ms, latency.total_e2e_ms as total_ms
| filter log_type = "FM_CALL" and status = "SUCCESS"
| stats
    count(*) as call_count,
    avg(bedrock_ms) as avg_bedrock,
    percentile(bedrock_ms, 50) as p50_bedrock,
    percentile(bedrock_ms, 90) as p90_bedrock,
    percentile(bedrock_ms, 99) as p99_bedrock,
    avg(total_ms) as avg_total,
    percentile(total_ms, 99) as p99_total
  by bin({bucket_minutes}m)
| sort @timestamp desc
"""
        return self._run_query(query, start_time, end_time)

    def query_error_distribution(
        self,
        window_minutes: int = 60,
    ) -> InsightsQueryResult:
        """Query error distribution by category and model."""
        end_time = datetime.now(timezone.utc)
        start_time = end_time - timedelta(minutes=window_minutes)

        query = """
fields @timestamp, metadata.model_id as model, error.error_type as err_type, error.error_code as err_code, error.root_cause_category as root_cause
| filter log_type = "FM_CALL" and status != "SUCCESS"
| stats count(*) as error_count by model, err_type, root_cause
| sort error_count desc
| limit 25
"""
        return self._run_query(query, start_time, end_time)

    def query_token_usage(
        self,
        window_minutes: int = 1440,  # 24 hours default
    ) -> InsightsQueryResult:
        """Query token usage patterns to detect prompt bloat."""
        end_time = datetime.now(timezone.utc)
        start_time = end_time - timedelta(minutes=window_minutes)

        query = """
fields @timestamp, prompt_metrics.total_input_tokens as input_tok, response_metrics.output_token_count as output_tok, metadata.model_id as model
| filter log_type = "FM_CALL"
| stats
    avg(input_tok) as avg_input,
    max(input_tok) as max_input,
    percentile(input_tok, 95) as p95_input,
    avg(output_tok) as avg_output,
    max(output_tok) as max_output,
    sum(input_tok) as total_input,
    sum(output_tok) as total_output,
    count(*) as calls
  by bin(1h), model
| sort @timestamp desc
"""
        return self._run_query(query, start_time, end_time)

    def query_cost_analysis(
        self,
        window_minutes: int = 1440,
    ) -> InsightsQueryResult:
        """Query cost breakdown by model and cache utilization."""
        end_time = datetime.now(timezone.utc)
        start_time = end_time - timedelta(minutes=window_minutes)

        query = """
fields @timestamp, metadata.model_id as model, cost.total_cost_usd as cost_usd, cost.was_cached as cached, cost.cache_hit_savings_usd as savings
| filter log_type = "FM_CALL"
| stats
    sum(cost_usd) as total_spend,
    sum(savings) as total_savings,
    count(*) as call_count,
    avg(cost_usd) as avg_cost_per_call,
    sum(cached) as cache_hits
  by bin(1h), model
| sort @timestamp desc
"""
        return self._run_query(query, start_time, end_time)

    def query_rag_quality(
        self,
        window_minutes: int = 360,
    ) -> InsightsQueryResult:
        """Query RAG retrieval quality metrics."""
        end_time = datetime.now(timezone.utc)
        start_time = end_time - timedelta(minutes=window_minutes)

        query = """
fields @timestamp, avg_relevance, results_returned, query_latency_ms, opensearch_index
| filter log_type = "RAG_RETRIEVAL"
| stats
    avg(avg_relevance) as mean_relevance,
    min(min_relevance) as worst_relevance,
    percentile(query_latency_ms, 50) as p50_latency,
    percentile(query_latency_ms, 99) as p99_latency,
    avg(results_returned) as avg_results,
    count(*) as retrieval_count
  by bin(30m)
| sort @timestamp desc
"""
        return self._run_query(query, start_time, end_time)

    def query_throttle_patterns(
        self,
        window_minutes: int = 120,
    ) -> InsightsQueryResult:
        """Detect throttling bursts and patterns."""
        end_time = datetime.now(timezone.utc)
        start_time = end_time - timedelta(minutes=window_minutes)

        query = """
fields @timestamp, metadata.model_id as model, error.retry_count as retries, latency.total_e2e_ms as total_ms
| filter log_type = "FM_CALL" and status = "THROTTLED"
| stats
    count(*) as throttle_count,
    avg(retries) as avg_retries,
    max(retries) as max_retries,
    avg(total_ms) as avg_total_latency
  by bin(5m), model
| sort throttle_count desc
"""
        return self._run_query(query, start_time, end_time)

    def query_slow_requests(
        self,
        threshold_ms: float = 3000,
        window_minutes: int = 60,
    ) -> InsightsQueryResult:
        """Find requests exceeding the 3-second SLA target."""
        end_time = datetime.now(timezone.utc)
        start_time = end_time - timedelta(minutes=window_minutes)

        query = f"""
fields @timestamp, metadata.request_id, metadata.session_id, metadata.model_id as model,
       latency.total_e2e_ms as total_ms, latency.bedrock_invoke_ms as bedrock_ms,
       latency.opensearch_query_ms as os_ms, latency.redis_cache_check_ms as redis_ms,
       prompt_metrics.total_input_tokens as input_tokens
| filter log_type = "FM_CALL" and latency.total_e2e_ms > {threshold_ms}
| sort total_ms desc
| limit 50
"""
        return self._run_query(query, start_time, end_time)

8. Summary Table — Troubleshooting Tool Mapping

Tool Purpose MangaAssist Application Key Metric
CloudWatch Logs Insights Prompt/response analysis Query structured FM call logs for patterns Query latency < 5s for 1h window
AWS X-Ray FM API call tracing End-to-end trace through Bedrock, OpenSearch, DynamoDB Trace sampling at 1% normal, 100% errors
Amazon Q Developer GenAI error pattern recognition AI-assisted diagnosis from detected error patterns Pattern match confidence > 85%
CloudWatch Metrics Real-time operational dashboards Latency percentiles, error rates, token usage Dashboard refresh < 60s
CloudWatch Alarms Proactive alerting Alert on P99 latency > 3s, error rate > 5% Alarm evaluation period: 5m
SSM Automation Auto-remediation Restart tasks, clear caches, switch model fallback MTTR target < 5 minutes

9. Key Takeaways

  1. Structured logging is non-negotiable: Every FM call must produce a consistent JSON log entry with metadata, prompt metrics, response metrics, latency breakdown, error details, and cost. Without this, CloudWatch Logs Insights queries return nothing useful.

  2. X-Ray sampling must be error-biased: At 1M messages/day, sampling 100% of traces would be cost-prohibitive and would overwhelm the X-Ray backend. Sample 100% of errors, 50% of slow requests (> 3s), and 1% of normal traffic.

  3. Error pattern detection enables automation: By classifying errors into known patterns, the system can automatically apply the correct remediation (retry, fallback model, truncate context) without human intervention for transient issues.

  4. Q Developer integration closes the feedback loop: When patterns are detected that the system cannot auto-remediate, generating a contextual Q Developer prompt with error details, root cause, and stack context helps developers diagnose faster.

  5. Cost tracking per call is essential: At MangaAssist's scale ($3/$15 per 1M tokens for Sonnet), a single prompt template bug that doubles input tokens could cost thousands of dollars per day. Every log entry must include cost metrics.

  6. The 3-second SLA drives trace granularity: The latency breakdown must cover every component (API Gateway, orchestrator, Redis, DynamoDB, OpenSearch, Bedrock, response parsing, WebSocket delivery) to identify which subsystem is causing SLA breaches.