LOCAL PREVIEW View on GitHub

Log Analysis, Trace Debugging, and Error Pattern Recognition

MangaAssist context: JP Manga store chatbot on AWS — Bedrock Claude 3 (Sonnet at $3/$15 per 1M tokens input/output, Haiku at $0.25/$1.25), OpenSearch Serverless (vector store), DynamoDB (sessions/products), ECS Fargate (orchestrator), API Gateway WebSocket, ElastiCache Redis. Target: useful answer in under 3 seconds, 1M messages/day scale.


Skill Mapping

Attribute Value
Certification AWS Certified AI Practitioner (AIP-C01)
Domain 2 — Development and Implementation of FM Applications
Task 2.5 — Describe methods to integrate FM applications into existing systems
Skill 2.5.6 — Improve troubleshooting efficiency for FM applications
Focus Areas Log correlation across FM calls, trace analysis for latency debugging, common FM error patterns, Q Developer integration

1. Log Correlation Across FM Calls

1.1 The Correlation Challenge

A single user message in MangaAssist triggers a chain of 5-8 service calls. Without correlation, a log entry from OpenSearch saying "query took 2100ms" is meaningless unless you can tie it back to the specific user request, the Bedrock call it fed into, and the end-to-end latency the user experienced.

Correlation dimensions in MangaAssist:

Dimension Scope Example Value
request_id Single user message req-a1b2c3d4-e5f6-7890
session_id Conversation (multi-turn) sess-manga-user-9876
user_id All sessions for a user user-jp-tokyo-001
trace_id X-Ray distributed trace 1-65a8f7e2-abc123def456
conversation_turn Position in conversation 3 (third message)
model_invocation_id Bedrock-generated ID inv-bedrock-xyz789

1.2 LogCorrelator Implementation

import json
import uuid
import time
import threading
from datetime import datetime, timezone
from typing import Optional, Dict, Any, List, Callable, Set
from dataclasses import dataclass, field
from collections import defaultdict
from contextlib import contextmanager
from enum import Enum


class CorrelationLevel(Enum):
    """Levels at which logs can be correlated."""
    REQUEST = "request"
    SESSION = "session"
    USER = "user"
    TRACE = "trace"


class ServiceName(Enum):
    """Downstream services in the MangaAssist call chain."""
    API_GATEWAY = "api-gateway"
    ORCHESTRATOR = "orchestrator"
    REDIS_CACHE = "redis-cache"
    DYNAMODB_SESSION = "dynamodb-session"
    DYNAMODB_PRODUCT = "dynamodb-product"
    OPENSEARCH_VECTOR = "opensearch-vector"
    BEDROCK_INVOKE = "bedrock-invoke"
    WEBSOCKET_SEND = "websocket-send"


@dataclass
class CorrelatedLogEntry:
    """A single log entry enriched with correlation identifiers."""
    timestamp: str
    request_id: str
    session_id: str
    user_id: str
    trace_id: str
    service: str
    operation: str
    duration_ms: float
    status: str
    log_level: str
    message: str
    metadata: Dict[str, Any] = field(default_factory=dict)
    parent_span_id: Optional[str] = None
    span_id: Optional[str] = None

    def to_dict(self) -> Dict[str, Any]:
        """Convert to dictionary for JSON serialization."""
        return {
            "timestamp": self.timestamp,
            "correlation": {
                "request_id": self.request_id,
                "session_id": self.session_id,
                "user_id": self.user_id,
                "trace_id": self.trace_id,
                "span_id": self.span_id,
                "parent_span_id": self.parent_span_id,
            },
            "service": self.service,
            "operation": self.operation,
            "duration_ms": self.duration_ms,
            "status": self.status,
            "log_level": self.log_level,
            "message": self.message,
            "metadata": self.metadata,
        }


@dataclass
class CorrelationContext:
    """
    Thread-safe correlation context passed through the call chain.

    Created at the API Gateway entry point and propagated through
    every downstream service call within the orchestrator.
    """
    request_id: str
    session_id: str
    user_id: str
    trace_id: str
    conversation_turn: int
    start_time: float
    _entries: List[CorrelatedLogEntry] = field(default_factory=list)
    _lock: threading.Lock = field(default_factory=threading.Lock)
    _span_stack: List[str] = field(default_factory=list)

    def generate_span_id(self) -> str:
        """Generate a unique span ID for a service call."""
        return uuid.uuid4().hex[:16]

    def current_span_id(self) -> Optional[str]:
        """Get the current (innermost) span ID."""
        return self._span_stack[-1] if self._span_stack else None

    def push_span(self, span_id: str) -> None:
        """Push a new span onto the stack."""
        self._span_stack.append(span_id)

    def pop_span(self) -> Optional[str]:
        """Pop the current span from the stack."""
        return self._span_stack.pop() if self._span_stack else None

    def add_entry(self, entry: CorrelatedLogEntry) -> None:
        """Thread-safe addition of a log entry."""
        with self._lock:
            self._entries.append(entry)

    def get_entries(self) -> List[CorrelatedLogEntry]:
        """Return all correlated entries for this request."""
        with self._lock:
            return list(self._entries)

    def get_call_chain(self) -> List[Dict[str, Any]]:
        """
        Return the ordered call chain with timing for this request.

        This is what you would see in an X-Ray trace view — the
        waterfall of service calls with their durations.
        """
        with self._lock:
            sorted_entries = sorted(self._entries, key=lambda e: e.timestamp)
            return [
                {
                    "service": e.service,
                    "operation": e.operation,
                    "duration_ms": e.duration_ms,
                    "status": e.status,
                    "span_id": e.span_id,
                    "parent_span_id": e.parent_span_id,
                }
                for e in sorted_entries
            ]

    def elapsed_ms(self) -> float:
        """Milliseconds elapsed since request start."""
        return (time.time() - self.start_time) * 1000


class LogCorrelator:
    """
    Correlates logs across multiple services for a single FM request chain.

    The correlator provides:
    1. Context propagation: Ensures request_id, session_id, and trace_id
       flow through every downstream call.
    2. Span management: Creates parent-child relationships between service
       calls for X-Ray-style waterfall visualization.
    3. Call chain reconstruction: Assembles the full call chain from
       individual log entries for troubleshooting.
    4. Cross-request correlation: Links multiple requests in a session
       to track conversation-level patterns.

    Usage:
        correlator = LogCorrelator(service_name="manga-assist-orchestrator")

        # At request entry point (API Gateway handler)
        ctx = correlator.create_context(
            session_id="sess-123",
            user_id="user-456",
            conversation_turn=3,
        )

        # In each service call
        with correlator.trace_service_call(ctx, ServiceName.REDIS_CACHE, "get") as span:
            result = redis_client.get(key)

        with correlator.trace_service_call(ctx, ServiceName.BEDROCK_INVOKE, "invoke_model") as span:
            response = bedrock.invoke_model(body=body)

        # After request completes
        chain = ctx.get_call_chain()
        correlator.emit_correlation_log(ctx)
    """

    def __init__(self, service_name: str):
        self.service_name = service_name
        self._session_history: Dict[str, List[str]] = defaultdict(list)
        self._active_contexts: Dict[str, CorrelationContext] = {}
        self._lock = threading.Lock()

    def create_context(
        self,
        session_id: str,
        user_id: str,
        conversation_turn: int,
        trace_id: Optional[str] = None,
        request_id: Optional[str] = None,
    ) -> CorrelationContext:
        """
        Create a new correlation context for an incoming request.

        Called once at the API Gateway WebSocket handler when a
        new user message arrives.
        """
        req_id = request_id or f"req-{uuid.uuid4().hex[:12]}"
        t_id = trace_id or f"1-{uuid.uuid4().hex[:8]}-{uuid.uuid4().hex[:24]}"

        ctx = CorrelationContext(
            request_id=req_id,
            session_id=session_id,
            user_id=user_id,
            trace_id=t_id,
            conversation_turn=conversation_turn,
            start_time=time.time(),
        )

        with self._lock:
            self._active_contexts[req_id] = ctx
            self._session_history[session_id].append(req_id)

        return ctx

    @contextmanager
    def trace_service_call(
        self,
        ctx: CorrelationContext,
        service: ServiceName,
        operation: str,
        metadata: Optional[Dict[str, Any]] = None,
    ):
        """
        Context manager that traces a single downstream service call.

        Creates a span with parent-child relationships and records
        timing, status, and error information.
        """
        span_id = ctx.generate_span_id()
        parent_span_id = ctx.current_span_id()
        ctx.push_span(span_id)

        start = time.time()
        status = "SUCCESS"
        error_msg = ""

        try:
            yield span_id
        except Exception as e:
            status = "ERROR"
            error_msg = str(e)
            raise
        finally:
            duration = (time.time() - start) * 1000
            ctx.pop_span()

            entry = CorrelatedLogEntry(
                timestamp=datetime.now(timezone.utc).isoformat(),
                request_id=ctx.request_id,
                session_id=ctx.session_id,
                user_id=ctx.user_id,
                trace_id=ctx.trace_id,
                service=service.value,
                operation=operation,
                duration_ms=round(duration, 2),
                status=status,
                log_level="ERROR" if status == "ERROR" else "INFO",
                message=error_msg if error_msg else f"{service.value}:{operation} completed",
                metadata=metadata or {},
                span_id=span_id,
                parent_span_id=parent_span_id,
            )
            ctx.add_entry(entry)

    def emit_correlation_log(self, ctx: CorrelationContext) -> Dict[str, Any]:
        """
        Emit a summary correlation log at the end of a request.

        This single log entry contains the full call chain and is
        optimized for CloudWatch Logs Insights correlation queries.
        """
        chain = ctx.get_call_chain()
        entries = ctx.get_entries()
        total_ms = ctx.elapsed_ms()

        has_errors = any(e.status == "ERROR" for e in entries)
        error_services = [e.service for e in entries if e.status == "ERROR"]

        summary = {
            "log_type": "CORRELATION_SUMMARY",
            "request_id": ctx.request_id,
            "session_id": ctx.session_id,
            "user_id": ctx.user_id,
            "trace_id": ctx.trace_id,
            "conversation_turn": ctx.conversation_turn,
            "total_duration_ms": round(total_ms, 2),
            "service_call_count": len(entries),
            "has_errors": has_errors,
            "error_services": error_services,
            "call_chain": chain,
            "timing_breakdown": {
                e.service: e.duration_ms for e in entries
            },
            "sla_met": total_ms <= 3000,
            "timestamp": datetime.now(timezone.utc).isoformat(),
        }

        # In production, this would be logger.info(json.dumps(summary))
        return summary

    def get_session_history(self, session_id: str) -> List[str]:
        """Return all request IDs for a session, in order."""
        with self._lock:
            return list(self._session_history.get(session_id, []))

    def find_correlated_errors(
        self,
        session_id: str,
    ) -> List[Dict[str, Any]]:
        """
        Find error patterns across all requests in a session.

        Useful for detecting recurring failures that affect the
        user's conversation experience.
        """
        request_ids = self.get_session_history(session_id)
        errors = []

        with self._lock:
            for req_id in request_ids:
                ctx = self._active_contexts.get(req_id)
                if not ctx:
                    continue
                for entry in ctx.get_entries():
                    if entry.status == "ERROR":
                        errors.append({
                            "request_id": req_id,
                            "conversation_turn": ctx.conversation_turn,
                            "service": entry.service,
                            "operation": entry.operation,
                            "error_message": entry.message,
                            "timestamp": entry.timestamp,
                        })

        return errors

2. Trace Analysis for Latency Debugging

2.1 The Latency Budget

MangaAssist has a 3-second end-to-end SLA. Here is how the latency budget is typically allocated:

Component Budget (ms) Typical (ms) Alarm Threshold (ms)
API Gateway overhead 50 15-30 100
Orchestrator processing 100 20-50 200
Redis cache check 20 2-5 50
DynamoDB session lookup 50 10-25 100
OpenSearch vector search 300 100-200 500
Bedrock InvokeModel 2300 800-1800 2500
Response parsing 50 5-15 100
WebSocket delivery 30 5-10 50
Total 3000 957-2155 3000

Bedrock consumes 60-77% of the total latency budget. This means that any degradation in other components quickly breaks the SLA.

2.2 LatencyAnalyzer Implementation

import time
import json
import statistics
from datetime import datetime, timezone, timedelta
from typing import Optional, Dict, Any, List, Tuple
from dataclasses import dataclass, field
from collections import defaultdict
from enum import Enum


class LatencyStatus(Enum):
    """Status of a latency measurement relative to its budget."""
    WITHIN_BUDGET = "WITHIN_BUDGET"
    WARNING = "WARNING"       # > 80% of budget
    BUDGET_EXCEEDED = "BUDGET_EXCEEDED"
    CRITICAL = "CRITICAL"     # > 150% of budget


@dataclass
class ComponentLatencyConfig:
    """Latency budget and thresholds for a single component."""
    component_name: str
    budget_ms: float
    warning_threshold_ms: float
    alarm_threshold_ms: float
    critical_threshold_ms: float


@dataclass
class LatencyMeasurement:
    """A single latency measurement for a component in a request."""
    request_id: str
    component: str
    duration_ms: float
    timestamp: str
    status: LatencyStatus
    budget_utilization_pct: float
    metadata: Dict[str, Any] = field(default_factory=dict)


@dataclass
class LatencyReport:
    """Aggregated latency analysis for a time window."""
    window_start: str
    window_end: str
    total_requests: int
    sla_met_count: int
    sla_met_pct: float
    component_stats: Dict[str, Dict[str, float]]
    bottleneck_component: str
    bottleneck_avg_ms: float
    bottleneck_pct_of_total: float
    recommendations: List[str]


class LatencyAnalyzer:
    """
    Analyzes latency traces to identify bottlenecks and SLA violations
    in the MangaAssist FM call chain.

    Uses X-Ray trace data and structured logs to:
    1. Track latency per component against defined budgets
    2. Identify the bottleneck component in slow requests
    3. Detect latency trends (degradation over time)
    4. Generate optimization recommendations

    The analyzer maintains a sliding window of measurements for
    statistical analysis (percentiles, trends, correlations).

    Usage:
        analyzer = LatencyAnalyzer(sla_target_ms=3000)

        # Feed it trace data from X-Ray or structured logs
        analyzer.record_trace(trace_data)

        # Get analysis
        report = analyzer.generate_report(window_minutes=60)
        bottleneck = analyzer.identify_bottleneck(request_id)
        anomalies = analyzer.detect_anomalies(window_minutes=30)
    """

    # Default latency budgets for MangaAssist components
    DEFAULT_BUDGETS: Dict[str, ComponentLatencyConfig] = {
        "api_gateway": ComponentLatencyConfig(
            component_name="api_gateway",
            budget_ms=50,
            warning_threshold_ms=40,
            alarm_threshold_ms=100,
            critical_threshold_ms=150,
        ),
        "orchestrator": ComponentLatencyConfig(
            component_name="orchestrator",
            budget_ms=100,
            warning_threshold_ms=80,
            alarm_threshold_ms=200,
            critical_threshold_ms=300,
        ),
        "redis_cache": ComponentLatencyConfig(
            component_name="redis_cache",
            budget_ms=20,
            warning_threshold_ms=15,
            alarm_threshold_ms=50,
            critical_threshold_ms=100,
        ),
        "dynamodb_session": ComponentLatencyConfig(
            component_name="dynamodb_session",
            budget_ms=50,
            warning_threshold_ms=40,
            alarm_threshold_ms=100,
            critical_threshold_ms=200,
        ),
        "opensearch_vector": ComponentLatencyConfig(
            component_name="opensearch_vector",
            budget_ms=300,
            warning_threshold_ms=240,
            alarm_threshold_ms=500,
            critical_threshold_ms=800,
        ),
        "bedrock_invoke": ComponentLatencyConfig(
            component_name="bedrock_invoke",
            budget_ms=2300,
            warning_threshold_ms=1800,
            alarm_threshold_ms=2500,
            critical_threshold_ms=3000,
        ),
        "response_parsing": ComponentLatencyConfig(
            component_name="response_parsing",
            budget_ms=50,
            warning_threshold_ms=40,
            alarm_threshold_ms=100,
            critical_threshold_ms=150,
        ),
        "websocket_delivery": ComponentLatencyConfig(
            component_name="websocket_delivery",
            budget_ms=30,
            warning_threshold_ms=25,
            alarm_threshold_ms=50,
            critical_threshold_ms=100,
        ),
    }

    def __init__(
        self,
        sla_target_ms: float = 3000,
        max_history_size: int = 100_000,
        budgets: Optional[Dict[str, ComponentLatencyConfig]] = None,
    ):
        self.sla_target_ms = sla_target_ms
        self.max_history_size = max_history_size
        self.budgets = budgets or self.DEFAULT_BUDGETS
        self._measurements: Dict[str, List[LatencyMeasurement]] = defaultdict(list)
        self._request_totals: Dict[str, float] = {}
        self._request_timestamps: Dict[str, str] = {}

    def _classify_latency(
        self, component: str, duration_ms: float
    ) -> Tuple[LatencyStatus, float]:
        """Classify a latency measurement against its budget."""
        config = self.budgets.get(component)
        if not config:
            return LatencyStatus.WITHIN_BUDGET, 0.0

        utilization = (duration_ms / config.budget_ms) * 100 if config.budget_ms > 0 else 0

        if duration_ms >= config.critical_threshold_ms:
            status = LatencyStatus.CRITICAL
        elif duration_ms >= config.alarm_threshold_ms:
            status = LatencyStatus.BUDGET_EXCEEDED
        elif duration_ms >= config.warning_threshold_ms:
            status = LatencyStatus.WARNING
        else:
            status = LatencyStatus.WITHIN_BUDGET

        return status, round(utilization, 1)

    def record_trace(self, trace_data: Dict[str, Any]) -> List[LatencyMeasurement]:
        """
        Record latency measurements from an X-Ray trace or structured log.

        Expected trace_data format:
        {
            "request_id": "req-abc123",
            "total_e2e_ms": 2450,
            "components": {
                "api_gateway": 25,
                "orchestrator": 35,
                "redis_cache": 3,
                "dynamodb_session": 18,
                "opensearch_vector": 180,
                "bedrock_invoke": 2100,
                "response_parsing": 12,
                "websocket_delivery": 7,
            }
        }
        """
        request_id = trace_data["request_id"]
        total_ms = trace_data.get("total_e2e_ms", 0)
        components = trace_data.get("components", {})
        now_iso = datetime.now(timezone.utc).isoformat()

        self._request_totals[request_id] = total_ms
        self._request_timestamps[request_id] = now_iso

        measurements = []
        for component_name, duration_ms in components.items():
            status, utilization = self._classify_latency(component_name, duration_ms)

            measurement = LatencyMeasurement(
                request_id=request_id,
                component=component_name,
                duration_ms=duration_ms,
                timestamp=now_iso,
                status=status,
                budget_utilization_pct=utilization,
            )
            measurements.append(measurement)

            self._measurements[component_name].append(measurement)

            # Trim history if needed
            if len(self._measurements[component_name]) > self.max_history_size:
                self._measurements[component_name] = self._measurements[component_name][
                    -self.max_history_size:
                ]

        return measurements

    def identify_bottleneck(self, request_id: str) -> Dict[str, Any]:
        """
        Identify the bottleneck component for a specific request.

        Returns the component consuming the highest percentage of
        the total latency, along with optimization suggestions.
        """
        total_ms = self._request_totals.get(request_id, 0)
        if total_ms == 0:
            return {"error": f"No trace data for request {request_id}"}

        component_durations = {}
        for comp_name, measurements in self._measurements.items():
            for m in measurements:
                if m.request_id == request_id:
                    component_durations[comp_name] = m.duration_ms
                    break

        if not component_durations:
            return {"error": f"No component data for request {request_id}"}

        bottleneck = max(component_durations, key=component_durations.get)
        bottleneck_ms = component_durations[bottleneck]
        bottleneck_pct = (bottleneck_ms / total_ms) * 100 if total_ms > 0 else 0

        suggestions = self._generate_optimization_suggestions(bottleneck, bottleneck_ms)

        return {
            "request_id": request_id,
            "total_ms": total_ms,
            "sla_met": total_ms <= self.sla_target_ms,
            "bottleneck": {
                "component": bottleneck,
                "duration_ms": bottleneck_ms,
                "pct_of_total": round(bottleneck_pct, 1),
                "status": self._classify_latency(bottleneck, bottleneck_ms)[0].value,
            },
            "all_components": {
                k: {
                    "duration_ms": v,
                    "pct_of_total": round((v / total_ms) * 100, 1) if total_ms > 0 else 0,
                }
                for k, v in sorted(
                    component_durations.items(), key=lambda x: x[1], reverse=True
                )
            },
            "suggestions": suggestions,
        }

    def _generate_optimization_suggestions(
        self, component: str, duration_ms: float
    ) -> List[str]:
        """Generate component-specific optimization suggestions."""
        suggestions_map = {
            "bedrock_invoke": [
                "Consider switching to Haiku ($0.25/$1.25) for simple queries to reduce latency by 40-60%",
                "Implement InvokeModelWithResponseStream for perceived speed improvement",
                "Reduce input tokens by limiting RAG chunks from 5 to 3",
                "Add prompt caching to eliminate redundant system prompt processing",
                "Check if model-specific provisioned throughput would help",
            ],
            "opensearch_vector": [
                "Verify HNSW index parameters (ef_search, m) are tuned for latency vs recall",
                "Reduce k from 5 to 3 for faster approximate nearest neighbor search",
                "Check if OpenSearch Serverless OCU count needs scaling",
                "Add a relevance score threshold to skip low-quality results early",
                "Consider pre-computing embeddings for popular manga queries",
            ],
            "redis_cache": [
                "Check Redis node CPU utilization — may need larger instance type",
                "Verify connection pooling is configured (max_connections=50)",
                "Check for hot key patterns causing single-node bottleneck",
                "Consider pipeline mode for batch cache operations",
                "Ensure VPC endpoint is in same AZ as ECS tasks",
            ],
            "dynamodb_session": [
                "Verify DynamoDB table is using on-demand capacity mode for burst handling",
                "Check item size — large conversation histories may need compression",
                "Consider DAX for microsecond-latency session reads",
                "Implement read-through cache pattern with Redis",
                "Use projection expressions to fetch only needed attributes",
            ],
            "api_gateway": [
                "Check WebSocket connection idle timeout settings",
                "Verify integration timeout is not causing premature closure",
                "Monitor concurrent connection count against account limits",
                "Consider regional endpoint vs edge-optimized for JP traffic",
            ],
            "orchestrator": [
                "Profile Python code for CPU-bound bottlenecks",
                "Check ECS task CPU/memory allocation — may need scaling",
                "Verify async I/O is used for parallel service calls where possible",
                "Check for lock contention in shared state",
            ],
            "response_parsing": [
                "Optimize JSON parsing — use orjson instead of json for 3-10x speedup",
                "Pre-compile regex patterns used in response extraction",
                "Check for unnecessary response transformations",
            ],
            "websocket_delivery": [
                "Verify API Gateway WebSocket connection management API latency",
                "Check message size — compress large responses",
                "Ensure connection ID is still valid before sending",
            ],
        }

        return suggestions_map.get(component, [
            f"Component '{component}' took {duration_ms}ms — review configuration and scaling"
        ])

    def generate_report(self, window_minutes: int = 60) -> LatencyReport:
        """
        Generate a comprehensive latency report for a time window.

        Aggregates all measurements, calculates percentiles per
        component, identifies overall bottleneck, and provides
        actionable recommendations.
        """
        cutoff = datetime.now(timezone.utc) - timedelta(minutes=window_minutes)
        cutoff_iso = cutoff.isoformat()

        # Filter measurements within window
        component_stats = {}
        for comp_name, measurements in self._measurements.items():
            recent = [m for m in measurements if m.timestamp >= cutoff_iso]
            if not recent:
                continue

            durations = [m.duration_ms for m in recent]
            component_stats[comp_name] = {
                "avg_ms": round(statistics.mean(durations), 2),
                "p50_ms": round(statistics.median(durations), 2),
                "p90_ms": round(
                    sorted(durations)[int(len(durations) * 0.9)] if durations else 0, 2
                ),
                "p99_ms": round(
                    sorted(durations)[int(len(durations) * 0.99)] if durations else 0, 2
                ),
                "min_ms": round(min(durations), 2),
                "max_ms": round(max(durations), 2),
                "count": len(durations),
                "budget_exceeded_pct": round(
                    sum(1 for m in recent if m.status in (LatencyStatus.BUDGET_EXCEEDED, LatencyStatus.CRITICAL))
                    / len(recent)
                    * 100,
                    1,
                ),
            }

        # Calculate SLA compliance
        recent_totals = {
            rid: total
            for rid, total in self._request_totals.items()
            if self._request_timestamps.get(rid, "") >= cutoff_iso
        }
        total_requests = len(recent_totals)
        sla_met = sum(1 for t in recent_totals.values() if t <= self.sla_target_ms)

        # Identify overall bottleneck
        bottleneck_comp = ""
        bottleneck_avg = 0.0
        for comp, stats in component_stats.items():
            if stats["avg_ms"] > bottleneck_avg:
                bottleneck_avg = stats["avg_ms"]
                bottleneck_comp = comp

        avg_total = statistics.mean(recent_totals.values()) if recent_totals else 0
        bottleneck_pct = (bottleneck_avg / avg_total * 100) if avg_total > 0 else 0

        # Generate recommendations
        recommendations = []
        for comp, stats in component_stats.items():
            if stats["budget_exceeded_pct"] > 5:
                recommendations.append(
                    f"{comp}: {stats['budget_exceeded_pct']}% of requests exceed budget "
                    f"(avg={stats['avg_ms']}ms, p99={stats['p99_ms']}ms)"
                )

        if total_requests > 0 and (sla_met / total_requests) < 0.95:
            recommendations.insert(
                0,
                f"SLA compliance is {round(sla_met / total_requests * 100, 1)}% — "
                f"below 95% target. Focus on {bottleneck_comp} optimization.",
            )

        return LatencyReport(
            window_start=cutoff_iso,
            window_end=datetime.now(timezone.utc).isoformat(),
            total_requests=total_requests,
            sla_met_count=sla_met,
            sla_met_pct=round(sla_met / total_requests * 100, 1) if total_requests > 0 else 0,
            component_stats=component_stats,
            bottleneck_component=bottleneck_comp,
            bottleneck_avg_ms=round(bottleneck_avg, 2),
            bottleneck_pct_of_total=round(bottleneck_pct, 1),
            recommendations=recommendations,
        )

    def detect_anomalies(
        self,
        window_minutes: int = 30,
        std_dev_threshold: float = 2.0,
    ) -> List[Dict[str, Any]]:
        """
        Detect latency anomalies using statistical analysis.

        A measurement is anomalous if it exceeds the mean + N standard
        deviations for its component within the analysis window.
        """
        cutoff = datetime.now(timezone.utc) - timedelta(minutes=window_minutes)
        cutoff_iso = cutoff.isoformat()
        anomalies = []

        for comp_name, measurements in self._measurements.items():
            recent = [m for m in measurements if m.timestamp >= cutoff_iso]
            if len(recent) < 10:  # Need minimum samples
                continue

            durations = [m.duration_ms for m in recent]
            mean = statistics.mean(durations)
            stdev = statistics.stdev(durations) if len(durations) > 1 else 0
            threshold = mean + (std_dev_threshold * stdev)

            for m in recent:
                if m.duration_ms > threshold:
                    anomalies.append({
                        "component": comp_name,
                        "request_id": m.request_id,
                        "duration_ms": m.duration_ms,
                        "mean_ms": round(mean, 2),
                        "stdev_ms": round(stdev, 2),
                        "threshold_ms": round(threshold, 2),
                        "deviation_factor": round(
                            (m.duration_ms - mean) / stdev if stdev > 0 else 0, 2
                        ),
                        "timestamp": m.timestamp,
                    })

        anomalies.sort(key=lambda a: a.get("deviation_factor", 0), reverse=True)
        return anomalies

3. Common FM Error Patterns and Root Causes

3.1 Pattern Catalog

# Pattern Name Error Signature Root Cause Frequency (MangaAssist) Auto-Remediable?
1 Bedrock Burst Throttle ThrottlingException in 5-min burst Traffic spike exceeds on-demand token/min limit 2-3x daily at peak Yes (backoff + model fallback)
2 Long Context Timeout ReadTimeoutError when input > 80K tokens Large manga catalog context exceeds inference time Weekly Yes (truncate + retry)
3 Stale Embedding Mismatch Low relevance scores (< 0.3) post-update Product catalog re-indexed but embedding model version changed After each reindex No (requires reindex)
4 Session Item Size Exceeded ValidationException on DynamoDB put Conversation history exceeds 400KB item limit Rare (long conversations) Yes (archive + compress)
5 Redis Failover Gap CLUSTERDOWN for 10-30 seconds Multi-AZ failover during maintenance or node failure Monthly Yes (cache-aside fallback)
6 Content Filter False Positive AccessDeniedException for manga terms Guardrail overly aggressive on manga genre terminology 2-5% of queries No (guardrail config change)
7 WebSocket Premature Close GoneException on PostToConnection Client disconnected before response delivery 1-2% of requests Yes (graceful handling)
8 Model Access Not Enabled AccessDeniedException on InvokeModel Bedrock model access not enabled for new region/model After deployments No (console action needed)
9 Token Count Estimation Drift Actual tokens 10-20% higher than estimated Tokenizer version mismatch between estimator and model Continuous (minor) Yes (use model tokenizer)
10 Concurrent Session Write Conflict ConditionalCheckFailedException Multiple Lambda/ECS tasks writing same session item Under high load Yes (optimistic locking)

3.2 Detailed Pattern: Bedrock Burst Throttle

sequenceDiagram
    participant U as Users (burst)
    participant O as Orchestrator
    participant B as Bedrock
    participant D as ErrorPatternDetector
    participant R as RemediationEngine

    U->>O: 50 concurrent requests
    O->>B: InvokeModel (Sonnet)
    B-->>O: ThrottlingException
    O->>D: detect("ThrottlingException")
    D->>D: Match EP-001, confidence=0.95
    D->>R: Remediation: RETRY_WITH_BACKOFF
    R->>O: Apply exponential backoff (1s, 2s, 4s)
    O->>B: Retry InvokeModel (Sonnet)
    B-->>O: ThrottlingException (still throttled)
    R->>O: Switch to Haiku fallback
    O->>B: InvokeModel (Haiku)
    B-->>O: Success (200ms faster)
    O->>U: Response (with Haiku quality caveat)
    Note over D: Log pattern trend: increasing throttle rate
    D->>R: Alert: sustained throttle > 5 min
    R->>R: Trigger quota increase request

4. Amazon Q Developer Integration for Automated Diagnosis

4.1 AutoDiagnosisEngine

import json
import time
import re
from datetime import datetime, timezone
from typing import Optional, Dict, Any, List, Tuple
from dataclasses import dataclass, field
from enum import Enum
from collections import defaultdict


class DiagnosisConfidence(Enum):
    """Confidence level for automated diagnoses."""
    HIGH = "HIGH"           # > 90% match, seen before, auto-remediate
    MEDIUM = "MEDIUM"       # 70-90% match, needs confirmation
    LOW = "LOW"             # < 70% match, suggest investigation
    UNKNOWN = "UNKNOWN"     # No pattern match


class DiagnosisStatus(Enum):
    """Status of a diagnosis workflow."""
    PENDING = "PENDING"
    ANALYZING = "ANALYZING"
    DIAGNOSED = "DIAGNOSED"
    REMEDIATED = "REMEDIATED"
    ESCALATED = "ESCALATED"
    FALSE_POSITIVE = "FALSE_POSITIVE"


@dataclass
class DiagnosisResult:
    """Complete result of an automated diagnosis."""
    diagnosis_id: str
    timestamp: str
    request_id: str
    error_message: str
    error_code: str
    confidence: DiagnosisConfidence
    status: DiagnosisStatus
    pattern_id: Optional[str]
    pattern_name: Optional[str]
    root_cause: str
    impact_assessment: str
    remediation_applied: Optional[str]
    remediation_success: Optional[bool]
    q_developer_query: str
    q_developer_response: Optional[str]
    time_to_diagnose_ms: float
    time_to_remediate_ms: Optional[float]
    related_request_ids: List[str]
    metadata: Dict[str, Any] = field(default_factory=dict)


@dataclass
class RemediationRule:
    """Rule for automated remediation of known error patterns."""
    rule_id: str
    pattern_id: str
    condition: str
    action: str
    cooldown_seconds: int
    max_auto_remediations_per_hour: int
    requires_confirmation: bool
    rollback_action: Optional[str]


class AutoDiagnosisEngine:
    """
    Automated diagnosis engine for MangaAssist FM application errors.

    Combines error pattern detection, log correlation, and trace
    analysis to provide automated diagnosis and remediation. Integrates
    with Amazon Q Developer for AI-assisted root cause analysis.

    The engine operates in three modes:
    1. AUTO: Fully automated detection, diagnosis, and remediation
    2. ASSISTED: Automated detection and diagnosis, manual remediation
    3. MANUAL: Generates Q Developer prompts for human investigation

    Workflow:
    1. Error detected in logs or X-Ray trace
    2. ErrorPatternDetector classifies the error
    3. LogCorrelator provides request context
    4. LatencyAnalyzer identifies timing anomalies
    5. AutoDiagnosisEngine synthesizes a diagnosis
    6. If high confidence and auto-remediable: apply fix
    7. If low confidence: generate Q Developer prompt for human review

    Usage:
        engine = AutoDiagnosisEngine(mode="ASSISTED")
        result = engine.diagnose(
            error_message="ThrottlingException: Rate exceeded",
            error_code="ThrottlingException",
            request_id="req-abc123",
            context={...}
        )
        if result.confidence == DiagnosisConfidence.HIGH:
            engine.auto_remediate(result)
    """

    def __init__(
        self,
        mode: str = "ASSISTED",
        max_auto_remediations_per_hour: int = 10,
    ):
        self.mode = mode
        self.max_auto_remediations = max_auto_remediations_per_hour
        self._remediation_count: int = 0
        self._remediation_reset_time: float = time.time()
        self._diagnosis_history: List[DiagnosisResult] = []
        self._remediation_rules: Dict[str, RemediationRule] = {}
        self._false_positive_patterns: Dict[str, int] = defaultdict(int)
        self._register_default_rules()

    def _register_default_rules(self) -> None:
        """Register default remediation rules for known patterns."""
        default_rules = [
            RemediationRule(
                rule_id="RR-001",
                pattern_id="EP-001",  # Bedrock Throttling
                condition="error_count > 3 in 5 minutes",
                action="enable_model_fallback(from='sonnet', to='haiku')",
                cooldown_seconds=300,
                max_auto_remediations_per_hour=5,
                requires_confirmation=False,
                rollback_action="disable_model_fallback()",
            ),
            RemediationRule(
                rule_id="RR-002",
                pattern_id="EP-002",  # Bedrock Timeout
                condition="timeout_count > 2 for same session",
                action="switch_to_streaming_mode()",
                cooldown_seconds=60,
                max_auto_remediations_per_hour=20,
                requires_confirmation=False,
                rollback_action="revert_to_sync_mode()",
            ),
            RemediationRule(
                rule_id="RR-003",
                pattern_id="EP-003",  # Token Limit Exceeded
                condition="token_count > model_limit * 0.95",
                action="truncate_context(keep_recent=5, max_rag_chunks=3)",
                cooldown_seconds=0,
                max_auto_remediations_per_hour=100,
                requires_confirmation=False,
                rollback_action=None,
            ),
            RemediationRule(
                rule_id="RR-004",
                pattern_id="EP-004",  # OpenSearch Failure
                condition="opensearch_error_count > 1",
                action="fallback_to_keyword_search()",
                cooldown_seconds=120,
                max_auto_remediations_per_hour=10,
                requires_confirmation=False,
                rollback_action="restore_vector_search()",
            ),
            RemediationRule(
                rule_id="RR-005",
                pattern_id="EP-006",  # Session Corruption
                condition="session_error for user",
                action="reset_session_with_summary()",
                cooldown_seconds=0,
                max_auto_remediations_per_hour=50,
                requires_confirmation=True,
                rollback_action=None,
            ),
            RemediationRule(
                rule_id="RR-006",
                pattern_id="EP-007",  # Redis Cache Failure
                condition="redis_connection_error",
                action="bypass_cache_for_request()",
                cooldown_seconds=0,
                max_auto_remediations_per_hour=1000,
                requires_confirmation=False,
                rollback_action="restore_cache_path()",
            ),
        ]
        for rule in default_rules:
            self._remediation_rules[rule.pattern_id] = rule

    def diagnose(
        self,
        error_message: str,
        error_code: str,
        request_id: str,
        context: Optional[Dict[str, Any]] = None,
    ) -> DiagnosisResult:
        """
        Perform automated diagnosis of an FM application error.

        Steps:
        1. Pattern matching against known error signatures
        2. Context enrichment from correlation data
        3. Confidence scoring based on pattern match + context
        4. Generate Q Developer query for further analysis
        5. Apply remediation if confidence is HIGH and mode allows
        """
        t0 = time.time()
        ctx = context or {}

        # Step 1: Pattern matching (simulated — in production, uses ErrorPatternDetector)
        pattern_match = self._match_pattern(error_message, error_code)

        # Step 2: Build diagnosis
        diagnosis_id = f"diag-{int(time.time())}-{request_id[:8]}"
        confidence = self._assess_confidence(pattern_match, ctx)

        # Step 3: Generate Q Developer query
        q_query = self._generate_q_developer_query(
            error_message, error_code, pattern_match, ctx
        )

        # Step 4: Determine root cause
        root_cause = (
            pattern_match.get("root_cause", "Unknown — requires manual investigation")
            if pattern_match
            else "No matching pattern found. Error requires manual investigation."
        )

        # Step 5: Impact assessment
        impact = self._assess_impact(pattern_match, ctx)

        diagnosis_time = (time.time() - t0) * 1000

        result = DiagnosisResult(
            diagnosis_id=diagnosis_id,
            timestamp=datetime.now(timezone.utc).isoformat(),
            request_id=request_id,
            error_message=error_message,
            error_code=error_code,
            confidence=confidence,
            status=DiagnosisStatus.DIAGNOSED,
            pattern_id=pattern_match.get("pattern_id") if pattern_match else None,
            pattern_name=pattern_match.get("name") if pattern_match else None,
            root_cause=root_cause,
            impact_assessment=impact,
            remediation_applied=None,
            remediation_success=None,
            q_developer_query=q_query,
            q_developer_response=None,
            time_to_diagnose_ms=round(diagnosis_time, 2),
            time_to_remediate_ms=None,
            related_request_ids=ctx.get("related_request_ids", []),
            metadata=ctx,
        )

        self._diagnosis_history.append(result)
        return result

    def _match_pattern(
        self, error_message: str, error_code: str
    ) -> Optional[Dict[str, Any]]:
        """Match error against known patterns (simplified)."""
        patterns = {
            "ThrottlingException": {
                "pattern_id": "EP-001",
                "name": "Bedrock Throttling",
                "root_cause": "Request rate exceeds provisioned throughput",
                "is_transient": True,
            },
            "ReadTimeoutError": {
                "pattern_id": "EP-002",
                "name": "Bedrock Invoke Timeout",
                "root_cause": "Model inference exceeds client timeout",
                "is_transient": True,
            },
            "ValidationException": {
                "pattern_id": "EP-003",
                "name": "Token Limit Exceeded",
                "root_cause": "Input tokens exceed model context window",
                "is_transient": False,
            },
            "OpenSearchException": {
                "pattern_id": "EP-004",
                "name": "OpenSearch Vector Search Failure",
                "root_cause": "Vector search index unavailable or query timeout",
                "is_transient": True,
            },
            "AccessDeniedException": {
                "pattern_id": "EP-005",
                "name": "Content Filter or Permission Error",
                "root_cause": "Content policy violation or IAM permission missing",
                "is_transient": False,
            },
        }

        if error_code in patterns:
            return patterns[error_code]

        for code, pattern in patterns.items():
            if code.lower() in error_message.lower():
                return pattern

        return None

    def _assess_confidence(
        self, pattern_match: Optional[Dict[str, Any]], context: Dict[str, Any]
    ) -> DiagnosisConfidence:
        """Assess diagnostic confidence based on pattern match and context."""
        if not pattern_match:
            return DiagnosisConfidence.UNKNOWN

        pattern_id = pattern_match.get("pattern_id", "")

        # Check for known false positives
        if self._false_positive_patterns.get(pattern_id, 0) > 3:
            return DiagnosisConfidence.LOW

        # High confidence if error code matches and pattern seen before
        history_matches = sum(
            1
            for d in self._diagnosis_history
            if d.pattern_id == pattern_id and d.status != DiagnosisStatus.FALSE_POSITIVE
        )

        if history_matches > 5:
            return DiagnosisConfidence.HIGH
        elif history_matches > 0:
            return DiagnosisConfidence.MEDIUM
        else:
            return DiagnosisConfidence.MEDIUM

    def _generate_q_developer_query(
        self,
        error_message: str,
        error_code: str,
        pattern_match: Optional[Dict[str, Any]],
        context: Dict[str, Any],
    ) -> str:
        """
        Generate a contextual prompt for Amazon Q Developer.

        The prompt includes:
        - The error details
        - The matched pattern (if any)
        - Relevant context (model, tokens, latency)
        - Specific question for Q Developer to answer
        """
        parts = [
            "I am troubleshooting an error in my MangaAssist chatbot application.",
            f"Error Code: {error_code}",
            f"Error Message: {error_message}",
        ]

        if pattern_match:
            parts.append(f"Known Pattern: {pattern_match.get('name', 'Unknown')}")
            parts.append(f"Root Cause (suspected): {pattern_match.get('root_cause', 'Unknown')}")

        if context.get("model_id"):
            parts.append(f"Model: {context['model_id']}")
        if context.get("input_tokens"):
            parts.append(f"Input Tokens: {context['input_tokens']}")
        if context.get("latency_ms"):
            parts.append(f"Latency: {context['latency_ms']}ms")

        parts.append("")
        parts.append(
            "The application stack is: ECS Fargate (Python orchestrator) -> "
            "Bedrock Claude 3 (Sonnet/Haiku), OpenSearch Serverless (RAG), "
            "DynamoDB (sessions), ElastiCache Redis (cache), "
            "API Gateway WebSocket. Target SLA: 3 seconds."
        )
        parts.append("")
        parts.append("Questions for Q Developer:")
        parts.append("1. What is the most likely root cause?")
        parts.append("2. What is the recommended fix?")
        parts.append("3. Are there any preventive measures?")
        parts.append("4. Could this error be related to other recent issues?")

        return "\n".join(parts)

    def _assess_impact(
        self,
        pattern_match: Optional[Dict[str, Any]],
        context: Dict[str, Any],
    ) -> str:
        """Assess the user and business impact of the error."""
        if not pattern_match:
            return "Unknown impact — requires manual assessment"

        pattern_id = pattern_match.get("pattern_id", "")
        impact_map = {
            "EP-001": "Users may experience 2-5 second delays due to retry backoff. At scale, this affects burst traffic windows. Cost impact: retries consume additional API calls.",
            "EP-002": "User receives no response for this message. WebSocket may timeout. Session state remains consistent but user must resend.",
            "EP-003": "Request fails before inference. No cost incurred. User sees error message. Long conversations are most affected.",
            "EP-004": "FM responds without context, leading to generic or hallucinated answers. Product recommendations will be absent.",
            "EP-005": "User query is blocked with generic error. Legitimate manga queries may be incorrectly filtered.",
        }
        return impact_map.get(pattern_id, "Impact assessment requires manual review")

    def auto_remediate(self, diagnosis: DiagnosisResult) -> bool:
        """
        Apply automated remediation for a diagnosed error.

        Only applies if:
        1. Mode is AUTO
        2. Confidence is HIGH
        3. Remediation rule exists for the pattern
        4. Hourly remediation limit not exceeded
        5. Rule does not require confirmation
        """
        if self.mode not in ("AUTO", "ASSISTED"):
            diagnosis.status = DiagnosisStatus.ESCALATED
            return False

        if diagnosis.confidence != DiagnosisConfidence.HIGH:
            diagnosis.status = DiagnosisStatus.ESCALATED
            return False

        if not diagnosis.pattern_id:
            diagnosis.status = DiagnosisStatus.ESCALATED
            return False

        rule = self._remediation_rules.get(diagnosis.pattern_id)
        if not rule:
            diagnosis.status = DiagnosisStatus.ESCALATED
            return False

        # Check rate limit
        now = time.time()
        if now - self._remediation_reset_time > 3600:
            self._remediation_count = 0
            self._remediation_reset_time = now

        if self._remediation_count >= self.max_auto_remediations:
            diagnosis.status = DiagnosisStatus.ESCALATED
            return False

        if rule.requires_confirmation and self.mode != "AUTO":
            diagnosis.status = DiagnosisStatus.ESCALATED
            return False

        # Apply remediation (in production, these would be real actions)
        t0 = time.time()
        diagnosis.remediation_applied = rule.action
        diagnosis.status = DiagnosisStatus.REMEDIATED
        diagnosis.remediation_success = True
        diagnosis.time_to_remediate_ms = round((time.time() - t0) * 1000, 2)

        self._remediation_count += 1
        return True

    def mark_false_positive(self, diagnosis_id: str) -> None:
        """
        Mark a diagnosis as a false positive.

        Used to train the system to avoid incorrect classifications.
        After 3 false positives for a pattern, confidence is downgraded.
        """
        for d in self._diagnosis_history:
            if d.diagnosis_id == diagnosis_id:
                d.status = DiagnosisStatus.FALSE_POSITIVE
                if d.pattern_id:
                    self._false_positive_patterns[d.pattern_id] += 1
                break

    def get_diagnosis_stats(self) -> Dict[str, Any]:
        """Return summary statistics for the diagnosis engine."""
        total = len(self._diagnosis_history)
        if total == 0:
            return {"total_diagnoses": 0}

        by_confidence = defaultdict(int)
        by_status = defaultdict(int)
        by_pattern = defaultdict(int)
        avg_diagnosis_time = []

        for d in self._diagnosis_history:
            by_confidence[d.confidence.value] += 1
            by_status[d.status.value] += 1
            if d.pattern_id:
                by_pattern[d.pattern_id] += 1
            avg_diagnosis_time.append(d.time_to_diagnose_ms)

        return {
            "total_diagnoses": total,
            "by_confidence": dict(by_confidence),
            "by_status": dict(by_status),
            "by_pattern": dict(by_pattern),
            "avg_diagnosis_time_ms": round(
                sum(avg_diagnosis_time) / len(avg_diagnosis_time), 2
            ),
            "auto_remediations_this_hour": self._remediation_count,
            "false_positive_patterns": dict(self._false_positive_patterns),
        }

5. Q Developer Integration Workflow

5.1 How Q Developer Fits Into the Troubleshooting Loop

flowchart TD
    A[Error Detected<br/>in CloudWatch Logs] --> B{Pattern<br/>Matched?}

    B -->|Yes, HIGH confidence| C[Auto-Remediate<br/>via Remediation Rule]
    B -->|Yes, MEDIUM confidence| D[Generate Q Developer<br/>Diagnosis Query]
    B -->|No match| E[Generate Q Developer<br/>Investigation Query]

    C --> F{Remediation<br/>Successful?}
    F -->|Yes| G[Log Resolution<br/>Update Dashboard]
    F -->|No| D

    D --> H[Q Developer<br/>Analyzes Error Context]
    E --> H

    H --> I[Q Developer Suggests<br/>Root Cause + Fix]

    I --> J{Developer<br/>Confirms?}
    J -->|Yes, correct| K[Apply Fix<br/>Add to Pattern Registry]
    J -->|No, false positive| L[Mark False Positive<br/>Retrain Pattern Detector]
    J -->|Partial| M[Refine Diagnosis<br/>Re-query Q Developer]

    K --> G
    L --> N[Update Error Pattern<br/>Confidence Scoring]
    M --> H

    style C fill:#c8e6c9,stroke:#2e7d32
    style H fill:#e3f2fd,stroke:#1565c0
    style K fill:#c8e6c9,stroke:#2e7d32
    style L fill:#ffcdd2,stroke:#c62828

5.2 Q Developer Query Templates

Scenario Template
Bedrock API error "My Bedrock InvokeModel call to {model_id} failed with {error_code}. Input was {input_tokens} tokens. How do I fix this in my boto3 Python application?"
Latency spike "Bedrock response latency increased from {baseline_ms}ms to {current_ms}ms for Claude 3 Sonnet. What could cause this degradation?"
RAG quality drop "OpenSearch vector search relevance scores dropped from {baseline_score} to {current_score}. My index uses {dimension}D embeddings. What should I check?"
Cost anomaly "My daily Bedrock cost jumped from ${baseline_cost} to ${current_cost}. I'm using Claude 3 Sonnet at 1M messages/day. What could cause this?"
Session management error "DynamoDB {error_code} when writing session data. Item size is {item_size_kb}KB. How do I handle large conversation histories?"
Deployment regression "After deploying version {version}, error rate jumped from {baseline_pct}% to {current_pct}%. The only change was {change_description}."

6. Integrated Troubleshooting Flow — End-to-End Example

6.1 Scenario: User Reports Slow Manga Recommendation

Timeline:
  T+0ms     API Gateway receives WebSocket message
  T+15ms    Orchestrator picks up message
  T+18ms    Redis cache MISS (key not found)
  T+35ms    DynamoDB session lookup (conversation history)
  T+220ms   OpenSearch vector search (manga product embeddings)
  T+240ms   Orchestrator builds prompt (system + RAG context + history)
  T+2900ms  Bedrock InvokeModel returns (Claude 3 Sonnet)
  T+2920ms  Response parsed and formatted
  T+2945ms  WebSocket response delivered
  T+2945ms  Total: 2945ms — within 3s SLA but tight

Analysis:
  - Bedrock consumed 2660ms (90.3% of total)
  - Input tokens: 4,200 (system: 800, RAG: 2,400, history: 1,000)
  - Output tokens: 350
  - OpenSearch returned 5 chunks with avg relevance 0.72

Diagnosis:
  - No error, but latency is within 55ms of SLA breach
  - Bedrock latency is high due to 4,200 input tokens
  - Recommendation: reduce RAG chunks from 5 to 3, compress history

CloudWatch Logs Insights Query:
  fields @timestamp, latency.bedrock_invoke_ms, prompt_metrics.total_input_tokens
  | filter metadata.request_id = "req-abc123"
  | limit 1

X-Ray Trace: Shows waterfall with Bedrock as 90.3% of total time
Q Developer: "Bedrock latency is 2660ms for 4200 input tokens. How do I reduce this?"

7. Key Takeaways

  1. Correlation is the foundation: Without request_id, session_id, and trace_id flowing through every log entry, troubleshooting becomes guesswork. The LogCorrelator must be the first thing initialized in the request lifecycle.

  2. Latency budgets drive alerting: The LatencyAnalyzer assigns concrete budgets to each component. When Bedrock has 2300ms of 3000ms total, even a 200ms regression in OpenSearch can break the SLA. Monitor budget utilization percentage, not just absolute values.

  3. Auto-diagnosis reduces MTTR: The AutoDiagnosisEngine combines pattern matching, context analysis, and Q Developer integration to reduce mean-time-to-resolution from hours to minutes for known patterns.

  4. False positive tracking prevents incorrect remediation: When an auto-remediation is wrong, marking it as a false positive degrades that pattern's confidence score, preventing future incorrect actions.

  5. Q Developer prompts must be contextual: A generic "fix this error" prompt to Q Developer is useless. The system generates prompts with specific error codes, token counts, latency values, model IDs, and stack details. The more context, the better the suggestion.

  6. Statistical anomaly detection catches gradual degradation: Simple threshold alarms miss slow degradation (P99 creeping from 2500ms to 2800ms over a week). The detect_anomalies method uses standard deviation to flag gradual changes.