LOCAL PREVIEW View on GitHub

Holistic Observability Architecture for GenAI Applications

AWS AIP-C01 Task 4.3 — Skill 4.3.1: Create holistic observability systems for FM application performance
Context: MangaAssist e-commerce chatbot — Bedrock Claude 3, OpenSearch Serverless, DynamoDB, ECS Fargate, API Gateway WebSocket


Skill Mapping

Certification Domain Task Skill
AWS AIP-C01 Domain 4 — Responsible AI & Monitoring Task 4.3 — Monitor FM applications Skill 4.3.1 — Create holistic observability systems for FM application performance

Skill scope: Design and implement end-to-end observability covering metrics, traces, logs, and events across every layer of a GenAI application — from infrastructure through FM invocation to business outcomes.


Mind Map — Holistic Observability Topology

mindmap
  root((Holistic<br/>Observability))
    Metrics
      Infrastructure
        ECS CPU & Memory
        Lambda Concurrency
        API Gateway Request Count
        OpenSearch IOPS
      Service
        Orchestrator Latency
        Cache Hit Rate
        Circuit Breaker State
        Queue Depth
      FM-Specific
        Tokens per Second
        Model Latency P50/P95/P99
        Throttle Rate
        Cost per Request
        Guardrail Trigger Rate
      Business
        Conversion Rate
        CSAT Score
        Revenue per Session
        Cart Additions via Chat
    Traces
      Distributed Traces
        X-Ray Segments
        Cross-Service Spans
        Async Correlation
      FM Interaction Traces
        Prompt Assembly
        RAG Retrieval
        Model Invocation
        Guardrail Check
        Response Formatting
      Business Correlation
        Trace-to-Session Link
        Trace-to-Outcome Link
        Revenue Attribution
    Logs
      Structured Application Logs
        JSON with Correlation IDs
        Error Context
        Request/Response Pairs
      Bedrock Invocation Logs
        Input/Output Tokens
        Model Latency
        Model ID & Version
        Prompt Text (sampled)
      Guardrail Trace Logs
        Filter Actions
        Blocked Content
        Policy Violations
        PII Detection Events
    Events
      State Changes
        Deployment Events
        Config Updates
        Prompt Version Changes
      Anomalies
        Drift Detection
        Threshold Breaches
        Cost Spikes
      Business Events
        Flash Sale Start/End
        Catalog Updates
        Inventory Changes

The 4 Pillars of GenAI Observability

Pillar 1 — Metrics: Quantitative Measurements

Metrics are numeric values collected at regular intervals. They are the cheapest form of observability data and the first thing you check during an incident. In GenAI, metrics extend far beyond CPU and memory.

Infrastructure Metrics (the foundation)

Metric Source CloudWatch Namespace Why It Matters for GenAI
ECS CPU Utilization ECS Agent AWS/ECS Embedding generation and prompt assembly are CPU-intensive
ECS Memory Utilization ECS Agent AWS/ECS Large context windows require substantial memory for prompt building
Lambda Concurrent Executions Lambda Service AWS/Lambda Throttling here blocks guardrail checks or post-processing
API Gateway WebSocket Connections API GW AWS/ApiGateway Active user count; WebSocket disconnects degrade UX
OpenSearch Search Latency OS Serverless AWS/AOSS Slow vector search = slow RAG = slow response
DynamoDB Read/Write Capacity DynamoDB AWS/DynamoDB Session history reads scale with conversation length

Service Metrics (the middleware)

Metric Type Description
orchestrator.latency_ms Histogram End-to-end time from request receipt to response dispatch
cache.hit_rate Gauge Percentage of requests served from semantic cache
cache.semantic_similarity Histogram Cosine similarity scores for cache matches
circuit_breaker.state Gauge 0=closed, 0.5=half-open, 1=open per downstream service
circuit_breaker.trip_count Counter Number of times circuit opened in the window
retry.count Counter Retries to Bedrock, OpenSearch, DynamoDB
queue.depth Gauge Pending async requests (for streaming responses)

FM-Specific Metrics (the differentiator)

These metrics do not exist in traditional applications. They are the core of GenAI observability:

Metric Type Unit Alert Threshold Description
fm.input_tokens Counter Count Tokens sent to model per invocation
fm.output_tokens Counter Count Tokens received from model per invocation
fm.total_tokens Counter Count >4000/request Combined token count (cost driver)
fm.latency_ms Histogram ms P95 > 5000 Model invocation latency
fm.tokens_per_second Gauge Count/s <20 tps Generation speed (UX quality)
fm.throttle_count Counter Count >5/min Bedrock throttling events
fm.cost_per_request Gauge USD >$0.05 Estimated cost per invocation
fm.cost_per_hour Gauge USD >$50/hr Rolling hourly spend
fm.guardrail_block_rate Gauge Percent >10% Percentage of responses blocked by guardrails
fm.quality_score Gauge 0-1 <0.7 Automated relevance/quality score
fm.hallucination_rate Gauge Percent >5% Responses containing ungrounded claims
fm.cache_hit_rate Gauge Percent Semantic cache effectiveness

Business Metrics (the purpose)

Metric Calculation Target Observability Source
Conversion Rate Orders via chat / total chat sessions >3% DynamoDB session + order events
CSAT Post-chat survey score >4.⅖ Survey Lambda → CloudWatch
Revenue per Chat Session Total revenue / chat sessions >$2.50 DynamoDB aggregation
Cart Addition Rate Add-to-cart events via chat / recommendations shown >15% Frontend events → Kinesis
Deflection Rate Issues resolved without human / total issues >80% Escalation service metrics
Time to Resolution Median turns to fulfill user intent <4 turns Session log analysis

Pillar 2 — Traces: Request Path Tracking

Traces reveal the full journey of a single request through all services. For GenAI, traces must capture both infrastructure hops and the internal FM interaction pipeline.

Distributed Traces via AWS X-Ray

[User WebSocket Message]
  └─ API Gateway (segment: 12ms)
      └─ ECS Orchestrator (segment: 4200ms)
          ├─ DynamoDB GetItem — session history (subsegment: 8ms)
          ├─ Intent Classifier — Bedrock Haiku (subsegment: 180ms)
          ├─ OpenSearch kNN — vector retrieval (subsegment: 95ms)
          ├─ Prompt Assembly (subsegment: 5ms)
          ├─ Bedrock InvokeModel — Claude 3 Sonnet (subsegment: 3400ms) ★ dominant
          ├─ Guardrail Check (subsegment: 45ms)
          ├─ DynamoDB PutItem — save turn (subsegment: 6ms)
          └─ Response Formatting (subsegment: 3ms)
      └─ API Gateway Response (segment: 8ms)

Key X-Ray annotations for FM traces:

Annotation Key Example Value Purpose
intent product_search Filter traces by user intent
model_id anthropic.claude-3-sonnet-20240229-v1:0 Filter by model version
input_tokens 1847 Identify expensive invocations
cache_hit true Measure cache effectiveness
guardrail_action pass Find blocked requests
quality_score 0.85 Correlate quality with latency/cost

FM Interaction Trace — Internal Pipeline

Unlike traditional traces that track service-to-service calls, FM traces track the reasoning pipeline:

[FM Interaction Trace]
├─ 1. Context Loading (8ms)
│     Load 5 prior turns from DynamoDB
├─ 2. Intent Classification (180ms)  
│     Bedrock Haiku → "product_search" (confidence: 0.94)
├─ 3. Query Understanding (12ms)
│     Extract: genre=similar_to_one_piece, price<20, format=manga
├─ 4. RAG Retrieval (95ms)
│     OpenSearch kNN: 8 chunks, max_score=0.89
├─ 5. Prompt Assembly (5ms)
│     System prompt + context (5 turns) + RAG chunks (8) + user query
│     Total tokens: 1,847 input
├─ 6. Model Invocation (3,400ms) ← 80% of total time
│     Bedrock Claude 3 Sonnet
│     Output tokens: 342
├─ 7. Guardrail Check (45ms)
│     Content filter: pass, PII check: pass, Topic guard: pass
└─ 8. Response Delivery (3ms)
      Format + stream to WebSocket

Business Correlation — Trace → Outcome

trace_id: abc-123-def
  → session_id: sess-456
    → user_id: user-789
      → outcome: purchase ($18.99)
      → satisfaction: 5/5
      → turns_to_convert: 3

This correlation lets you answer: "Requests that go through the slow path (P95 latency) — do they convert less?"


Pillar 3 — Logs: Structured Event Records

Application Logs — JSON Structured

{
  "timestamp": "2025-03-15T14:22:33.456Z",
  "level": "INFO",
  "service": "mangaassist-orchestrator",
  "correlation_id": "corr-abc-123",
  "session_id": "sess-456",
  "trace_id": "1-abc-def",
  "span_id": "span-789",
  "event": "fm_invocation_complete",
  "model_id": "anthropic.claude-3-sonnet-20240229-v1:0",
  "intent": "product_search",
  "input_tokens": 1847,
  "output_tokens": 342,
  "latency_ms": 3400,
  "cache_hit": false,
  "guardrail_action": "pass",
  "quality_score": 0.85,
  "cost_usd": 0.01068,
  "rag_chunks_retrieved": 8,
  "rag_max_similarity": 0.89
}

Critical rule: Every log line includes correlation_id, session_id, and trace_id. This is non-negotiable for GenAI debugging.

Bedrock Model Invocation Logs

Enabled via Bedrock → Model Invocation Logging → S3 + CloudWatch:

{
  "schemaType": "ModelInvocationLog",
  "schemaVersion": "1.0",
  "timestamp": "2025-03-15T14:22:33.000Z",
  "accountId": "123456789012",
  "identity": { "arn": "arn:aws:sts::123456789012:assumed-role/MangaAssistECSRole/..." },
  "region": "us-east-1",
  "requestId": "req-abc-123",
  "operation": "InvokeModel",
  "modelId": "anthropic.claude-3-sonnet-20240229-v1:0",
  "input": {
    "inputContentType": "application/json",
    "inputTokenCount": 1847,
    "inputBodyJson": { "...": "prompt text (if logging enabled)" }
  },
  "output": {
    "outputContentType": "application/json",
    "outputTokenCount": 342,
    "outputBodyJson": { "...": "response text (if logging enabled)" }
  }
}

Note: Input/output body logging is optional and should be sampled (e.g., 5%) in production to control costs and avoid storing PII.

Guardrail Trace Logs

{
  "timestamp": "2025-03-15T14:22:33.500Z",
  "guardrail_id": "guard-manga-001",
  "action": "BLOCKED",
  "trace": {
    "contentFilter": { "hate": "NONE", "violence": "LOW", "sexual": "NONE" },
    "topicPolicy": { "topics": ["competitor_promotion"], "action": "BLOCKED" },
    "wordPolicy": { "matchedWords": [], "action": "NONE" },
    "piiDetection": { "entities": ["EMAIL"], "action": "ANONYMIZED" }
  },
  "input_assessment": "PASS",
  "output_assessment": "BLOCKED",
  "correlation_id": "corr-abc-123"
}

Pillar 4 — Events: State Changes and Signals

Events are discrete occurrences that provide context for metric anomalies.

Deployment Events

{
  "event_type": "deployment",
  "timestamp": "2025-03-15T10:00:00Z",
  "details": {
    "service": "mangaassist-orchestrator",
    "version": "v2.3.1",
    "change_type": "prompt_update",
    "description": "Updated product search system prompt — added price comparison instructions",
    "deployer": "ci-pipeline",
    "rollback_id": "v2.3.0"
  }
}

Anomaly Events

{
  "event_type": "anomaly",
  "timestamp": "2025-03-15T14:30:00Z",
  "details": {
    "metric": "fm.cost_per_hour",
    "expected_value": 32.50,
    "actual_value": 78.20,
    "deviation_band": 2.4,
    "detection_model": "CloudWatch Anomaly Detection",
    "probable_cause": "New prompt version increased average token count by 40%"
  }
}

Business Events

{
  "event_type": "business",
  "timestamp": "2025-03-15T08:00:00Z",
  "details": {
    "event": "flash_sale_start",
    "category": "manga_shonen",
    "expected_traffic_multiplier": 3.5,
    "duration_hours": 4,
    "auto_scaling_triggered": true
  }
}

Overlay deployment and business events on metric dashboards so operators can immediately correlate "cost spiked when we deployed the new prompt" or "latency increased when the flash sale started."


How GenAI Observability Differs from Traditional App Monitoring

Dimension Traditional Application GenAI Application Why It Matters
Determinism Same input → same output (usually) Same input → different output each time Cannot test with simple assertion-based monitoring; need statistical quality checks
Cost Model Fixed infra cost, predictable Per-token variable cost, can spike 10x Must monitor cost in real-time; a single bad prompt can cost $100s
Quality Measurement HTTP 200 = success HTTP 200 but answer is wrong/hallucinated Need content-level quality metrics (relevance, groundedness, faithfulness)
Failure Modes Crash, timeout, error code Silent degradation, hallucination, drift Traditional alerting misses GenAI-specific failures; need semantic monitoring
Debugging Approach Read stack trace, reproduce bug Analyze prompt, context, retrieval quality, model behavior Logs must capture full prompt chain, RAG context, and model reasoning
Latency Profile ms range, predictable Seconds range, high variance (P50 vs P99 can differ 5x) Streaming UX critical; must monitor time-to-first-token separately
Scaling Behavior CPU/memory proportional Token throughput limited by model quotas Scaling is constrained by Bedrock TPM/RPM limits, not just compute
Security Concerns SQL injection, XSS Prompt injection, data exfiltration via prompts, PII leakage Guardrail monitoring is a first-class observability concern
Compliance Data retention, access logs Model input/output auditing, bias monitoring, content filtering Must log model interactions for audit; retention policies differ
User Experience Page load time, click-through Response relevance, conversation coherence, personality consistency UX metrics require NLP-based evaluation, not just timing
Dependency Chain DB → Cache → API DB → Cache → Vector Store → Embedding Model → LLM → Guardrails Longer chain = more failure points; each link needs distinct monitoring
Capacity Planning Forecast from request volume Forecast from token volume × model pricing Capacity planning requires predicting token consumption patterns

Architecture — Data Collection to Action

graph TB
    subgraph Sources["Data Sources"]
        style Sources fill:#f59e0b,stroke:#d97706,color:#000
        ECS["ECS Fargate<br/>Orchestrator"]
        BDK["Bedrock<br/>Claude 3"]
        OSS["OpenSearch<br/>Serverless"]
        DDB["DynamoDB<br/>Sessions"]
        APIGW["API Gateway<br/>WebSocket"]
        GRL["Bedrock<br/>Guardrails"]
        FE["Frontend<br/>React App"]
    end

    subgraph Collection["Collection Layer"]
        CWAgent["CloudWatch Agent<br/>(ECS sidecar)"]
        XRaySDK["X-Ray SDK<br/>(instrumented code)"]
        BDKLog["Bedrock Invocation<br/>Logging (native)"]
        CustPub["Custom Metric<br/>Publisher (Python)"]
        KDS["Kinesis Data<br/>Stream"]
        FEBeacon["Frontend<br/>Beacon API"]
    end

    subgraph Aggregation["Aggregation Layer"]
        CWMetrics["CloudWatch<br/>Metrics"]
        CWLogs["CloudWatch<br/>Logs"]
        XRayTraces["X-Ray<br/>Traces"]
        S3Raw["S3 Raw Logs<br/>(Bedrock invocations)"]
        S3Events["S3 Events<br/>(business signals)"]
    end

    subgraph Processing["Processing Layer"]
        CWInsights["CloudWatch<br/>Logs Insights"]
        Athena["Athena<br/>(S3 query)"]
        LambdaProc["Lambda<br/>Processors"]
        CWAnomaly["CloudWatch<br/>Anomaly Detection"]
        MetricMath["CloudWatch<br/>Metric Math"]
    end

    subgraph Visualization["Visualization Layer"]
        CWDash["CloudWatch<br/>Dashboards"]
        Grafana["Grafana<br/>(optional)"]
        QS["QuickSight<br/>(business reports)"]
    end

    subgraph Actions["Action Layer"]
        style Actions fill:#ef4444,stroke:#dc2626,color:#fff
        CWAlarm["CloudWatch<br/>Alarms"]
        SNS["SNS<br/>Notifications"]
        LambdaAuto["Lambda<br/>Auto-Remediation"]
        PD["PagerDuty /<br/>OpsGenie"]
        ASG["Auto Scaling<br/>Actions"]
        Runbook["SSM<br/>Runbooks"]
    end

    %% Source → Collection
    ECS --> CWAgent
    ECS --> XRaySDK
    ECS --> CustPub
    BDK --> BDKLog
    BDK --> XRaySDK
    OSS --> CWAgent
    DDB --> CWAgent
    APIGW --> CWAgent
    GRL --> BDKLog
    FE --> FEBeacon

    %% Collection → Aggregation
    CWAgent --> CWMetrics
    CWAgent --> CWLogs
    XRaySDK --> XRayTraces
    BDKLog --> CWLogs
    BDKLog --> S3Raw
    CustPub --> CWMetrics
    KDS --> S3Events
    FEBeacon --> KDS

    %% Aggregation → Processing
    CWLogs --> CWInsights
    S3Raw --> Athena
    S3Events --> Athena
    CWMetrics --> CWAnomaly
    CWMetrics --> MetricMath
    CWLogs --> LambdaProc

    %% Processing → Visualization
    CWMetrics --> CWDash
    CWInsights --> CWDash
    XRayTraces --> CWDash
    CWMetrics --> Grafana
    Athena --> QS
    MetricMath --> CWDash

    %% Processing → Actions
    CWAnomaly --> CWAlarm
    MetricMath --> CWAlarm
    CWAlarm --> SNS
    SNS --> PD
    SNS --> LambdaAuto
    LambdaAuto --> ASG
    LambdaAuto --> Runbook

MangaAssist Observability Stack Mapping

sequenceDiagram
    actor User
    participant APIGW as API Gateway<br/>WebSocket
    participant ECS as ECS Orchestrator
    participant DDB as DynamoDB
    participant INTENT as Bedrock Haiku<br/>(Intent Classifier)
    participant OSS as OpenSearch<br/>Serverless
    participant BDK as Bedrock Claude 3<br/>Sonnet
    participant GRL as Bedrock Guardrails
    participant CW as CloudWatch

    User->>APIGW: "Find me manga similar to One Piece under $20"
    Note over APIGW: 📊 Metric: connection_count++<br/>📝 Log: access_log (requestId, IP)

    APIGW->>ECS: Route message (12ms)
    Note over ECS: 🔍 Trace: X-Ray segment starts<br/>📝 Log: request_received (correlation_id)

    ECS->>DDB: GetItem — session history (8ms)
    Note over DDB: 📊 Metric: read_capacity_units<br/>🔍 Trace: DDB subsegment (8ms)<br/>📝 Log: context_loaded (5 prior turns)

    ECS->>INTENT: Classify intent (180ms)
    Note over INTENT: 📊 Metric: intent_latency=180ms<br/>📊 Metric: input_tokens=120, output_tokens=15<br/>🔍 Trace: intent subsegment<br/>📝 Log: intent=product_search (conf=0.94)

    ECS->>OSS: kNN vector search (95ms)
    Note over OSS: 📊 Metric: search_latency=95ms<br/>📊 Metric: results_count=8<br/>🔍 Trace: OSS subsegment<br/>📝 Log: rag_retrieval (8 chunks, max_score=0.89)

    ECS->>ECS: Prompt assembly (5ms)
    Note over ECS: 📝 Log: prompt_assembled (1847 input tokens)<br/>🔍 Trace: assembly subsegment

    ECS->>BDK: InvokeModel — Claude 3 Sonnet (3400ms)
    Note over BDK: 📊 Metric: fm.latency=3400ms<br/>📊 Metric: fm.input_tokens=1847<br/>📊 Metric: fm.output_tokens=342<br/>📊 Metric: fm.cost=$0.01068<br/>🔍 Trace: bedrock subsegment (dominant)<br/>📝 Log: invocation_log (model_id, tokens)

    BDK->>GRL: Check response (45ms)
    Note over GRL: 📊 Metric: guardrail_latency=45ms<br/>📊 Metric: guardrail_action=pass<br/>🔍 Trace: guardrail subsegment<br/>📝 Log: guardrail_trace (all policy results)

    GRL-->>ECS: Response approved

    ECS->>DDB: PutItem — save turn (6ms)
    Note over DDB: 📊 Metric: write_capacity_units<br/>📝 Log: turn_saved

    ECS->>APIGW: Formatted response (3ms)
    APIGW->>User: Manga recommendations displayed

    Note over ECS: 📊 Metric: e2e_latency=3754ms<br/>📊 Metric: quality_score=0.85<br/>🔍 Trace: segment complete<br/>📝 Log: request_complete<br/>🎯 Event: publish all metrics to CW

    ECS->>CW: Publish metrics batch
    Note over CW: 📊 All metrics aggregated<br/>🚨 Anomaly detection evaluates<br/>📈 Dashboards update (60s refresh)

Total observability signals for a single request: ~15 metrics, 1 distributed trace (8 spans), ~10 structured log entries, 1 Bedrock invocation log.


Observability Maturity Model

graph LR
    L0["Level 0<br/>🚫 No Observability<br/>Flying blind"]
    L1["Level 1<br/>📝 Basic Logs<br/>Console output only"]
    L2["Level 2<br/>📊 Metrics + Alerts<br/>CloudWatch basics"]
    L3["Level 3<br/>🔍 Distributed Tracing<br/>X-Ray across services"]
    L4["Level 4<br/>🤖 FM-Specific Monitoring<br/>Token, cost, quality metrics"]
    L5["Level 5<br/>🧠 Predictive & Self-Healing<br/>Anomaly detection + auto-remediation"]

    L0 --> L1 --> L2 --> L3 --> L4 --> L5

    style L0 fill:#dc2626,color:#fff
    style L1 fill:#ea580c,color:#fff
    style L2 fill:#d97706,color:#fff
    style L3 fill:#65a30d,color:#fff
    style L4 fill:#0891b2,color:#fff
    style L5 fill:#7c3aed,color:#fff
Level Capabilities AWS Tools Used Gaps at This Level
L0 — None No monitoring; learn about issues from user complaints None Everything — complete blind spot
L1 — Basic Logs Console.log / print statements; basic CloudWatch log groups CloudWatch Logs No metrics, no alerting, no correlation; reactive debugging only
L2 — Metrics + Alerts CloudWatch metrics for infra (CPU, memory); basic alarms (5xx rate, high CPU) CloudWatch Metrics, Alarms, SNS No request-level visibility; cannot trace a single user journey
L3 — Distributed Tracing X-Ray traces across ECS, API Gateway, DynamoDB, OpenSearch; can follow a single request X-Ray, CloudWatch ServiceLens No FM-specific insights; model latency is a black box; no cost tracking
L4 — FM Monitoring Token counting, model latency percentiles, cost per request, quality scores, guardrail metrics; Bedrock invocation logging Bedrock Logging, Custom CloudWatch Metrics, S3, Athena Reactive — alerts fire after the problem; no prediction; manual remediation
L5 — Predictive Anomaly detection on cost/latency/quality; auto-remediation (switch models, enable cache, scale); drift detection; business correlation CloudWatch Anomaly Detection, Lambda auto-remediation, EventBridge, ML-based alerting Continuous improvement needed; model reliability depends on historical data quality

MangaAssist target: Level 4 at MVP, Level 5 within 6 months.


HLD: Observability Data Model

from dataclasses import dataclass, field
from typing import Optional, Dict, List
from datetime import datetime
from enum import Enum


class ObservabilityPillar(Enum):
    METRICS = "metrics"
    TRACES = "traces"
    LOGS = "logs"
    EVENTS = "events"


class AlertSeverity(Enum):
    CRITICAL = "critical"   # Page on-call immediately
    HIGH = "high"           # Notify team within 15 min
    MEDIUM = "medium"       # Investigate within 1 hour
    LOW = "low"             # Review in next business day


@dataclass
class ObservabilitySignal:
    """Base class for all observability data.

    Every signal emitted by MangaAssist carries these fields,
    enabling cross-pillar correlation (join a metric spike to
    the trace that caused it, to the log that explains it).
    """
    timestamp: datetime
    source_service: str          # e.g., "mangaassist-orchestrator"
    correlation_id: str          # unique per user request
    session_id: str              # unique per chat session
    pillar: ObservabilityPillar
    environment: str = "production"
    metadata: Dict[str, str] = field(default_factory=dict)


@dataclass
class FMMetricSignal(ObservabilitySignal):
    """FM-specific metric signal — the core of GenAI observability.

    Captures everything needed to understand cost, performance,
    and quality of a single model invocation.
    """
    model_id: str = ""
    intent: str = ""
    input_tokens: int = 0
    output_tokens: int = 0
    latency_ms: float = 0.0
    cost_usd: float = 0.0
    quality_score: float = 0.0       # 0.0 - 1.0, from automated evaluator
    cache_hit: bool = False
    guardrail_action: str = "pass"   # pass | block | anonymize
    rag_chunks_retrieved: int = 0
    rag_max_similarity: float = 0.0
    time_to_first_token_ms: float = 0.0  # streaming UX metric


@dataclass
class TraceSpan(ObservabilitySignal):
    """Individual span within a distributed trace.

    Maps to an X-Ray subsegment. The span tree reconstructs
    the full request path through MangaAssist services.
    """
    trace_id: str = ""
    span_id: str = ""
    parent_span_id: Optional[str] = None
    operation_name: str = ""         # e.g., "bedrock.invoke_model"
    duration_ms: float = 0.0
    status: str = "OK"               # OK | ERROR | THROTTLED
    annotations: Dict[str, str] = field(default_factory=dict)
    # annotations carry indexed, filterable key-value pairs
    # e.g., {"intent": "product_search", "model_id": "claude-3-sonnet"}


@dataclass
class StructuredLogEntry(ObservabilitySignal):
    """Structured log entry — always JSON, always correlated."""
    level: str = "INFO"             # DEBUG | INFO | WARN | ERROR
    event: str = ""                 # e.g., "fm_invocation_complete"
    message: str = ""
    trace_id: str = ""
    span_id: str = ""
    error_type: Optional[str] = None
    error_message: Optional[str] = None
    attributes: Dict[str, str] = field(default_factory=dict)


@dataclass
class ObservabilityEvent(ObservabilitySignal):
    """Discrete event — deployment, anomaly, or business signal."""
    event_type: str = ""            # deployment | anomaly | business
    severity: AlertSeverity = AlertSeverity.LOW
    details: Dict[str, str] = field(default_factory=dict)
    related_metric: Optional[str] = None
    auto_remediation_triggered: bool = False


@dataclass
class ObservabilityConfig:
    """Configuration for the MangaAssist observability stack.

    These values control cost vs. granularity trade-offs.
    """
    metrics_namespace: str = "MangaAssist"
    trace_sampling_rate: float = 0.05      # 5% in production (cost control)
    log_retention_days: int = 30           # CloudWatch Logs retention
    s3_log_retention_days: int = 90        # Raw Bedrock invocation logs
    alarm_evaluation_periods: int = 3      # 3 consecutive breaches before alarm
    alarm_datapoints_to_alarm: int = 2     # 2 of 3 periods must breach
    dashboard_refresh_seconds: int = 60    # CloudWatch dashboard auto-refresh
    anomaly_detection_band: int = 2        # Standard deviations for anomaly band
    enable_invocation_logging: bool = True  # Bedrock invocation logging on/off
    log_prompt_body: bool = False          # Log full prompt text (cost + privacy)
    prompt_body_sample_rate: float = 0.05  # If logging bodies, sample 5%
    s3_log_bucket: str = "mangaassist-observability-logs"
    cost_alert_hourly_threshold: float = 50.0   # USD
    cost_alert_daily_threshold: float = 800.0   # USD

LLD: CloudWatch Metric Publisher

import boto3
import time
import logging
from datetime import datetime, timezone
from typing import Dict, List, Optional
from contextlib import contextmanager

logger = logging.getLogger(__name__)


class FMObservabilityPublisher:
    """Publishes FM observability signals to CloudWatch.

    Design decisions:
    - Buffers up to 20 metric data points (CloudWatch PutMetricData limit)
    - Auto-flushes when buffer is full or on explicit flush()
    - Uses 2 dimensions (ModelId, Intent) to balance cardinality vs. queryability
    - Estimates cost based on known Bedrock pricing tiers

    Usage:
        publisher = FMObservabilityPublisher()
        publisher.record_invocation(
            model_id="anthropic.claude-3-sonnet-20240229-v1:0",
            intent="product_search",
            input_tokens=1847,
            output_tokens=342,
            latency_ms=3400.0,
            quality_score=0.85
        )
        publisher.flush()  # ensure remaining buffer is sent
    """

    CLOUDWATCH_BATCH_LIMIT = 20

    # Bedrock pricing per 1K tokens (us-east-1, on-demand, as of 2025)
    PRICING: Dict[str, Dict[str, float]] = {
        "anthropic.claude-3-sonnet": {"input": 0.003, "output": 0.015},
        "anthropic.claude-3-haiku": {"input": 0.00025, "output": 0.00125},
        "anthropic.claude-3-opus": {"input": 0.015, "output": 0.075},
        "amazon.titan-embed-text": {"input": 0.0001, "output": 0.0},
    }

    def __init__(self, namespace: str = "MangaAssist/FM"):
        self.cloudwatch = boto3.client("cloudwatch")
        self.namespace = namespace
        self._buffer: List[dict] = []

    def record_invocation(
        self,
        model_id: str,
        intent: str,
        input_tokens: int,
        output_tokens: int,
        latency_ms: float,
        quality_score: float,
        cache_hit: bool = False,
        guardrail_action: str = "pass",
        time_to_first_token_ms: Optional[float] = None,
    ) -> None:
        """Record a single FM invocation with all observability dimensions."""
        timestamp = datetime.now(timezone.utc)
        dimensions = [
            {"Name": "ModelId", "Value": self._short_model_id(model_id)},
            {"Name": "Intent", "Value": intent},
        ]

        metrics = [
            {"MetricName": "InputTokens", "Value": input_tokens, "Unit": "Count"},
            {"MetricName": "OutputTokens", "Value": output_tokens, "Unit": "Count"},
            {"MetricName": "TotalTokens", "Value": input_tokens + output_tokens, "Unit": "Count"},
            {"MetricName": "InvocationLatency", "Value": latency_ms, "Unit": "Milliseconds"},
            {"MetricName": "QualityScore", "Value": quality_score, "Unit": "None"},
            {"MetricName": "CacheHit", "Value": 1 if cache_hit else 0, "Unit": "Count"},
            {"MetricName": "GuardrailBlock", "Value": 1 if guardrail_action == "block" else 0, "Unit": "Count"},
            {"MetricName": "InvocationCount", "Value": 1, "Unit": "Count"},
        ]

        if time_to_first_token_ms is not None:
            metrics.append({
                "MetricName": "TimeToFirstToken",
                "Value": time_to_first_token_ms,
                "Unit": "Milliseconds",
            })

        # Cost estimation
        cost = self._estimate_cost(model_id, input_tokens, output_tokens)
        metrics.append({"MetricName": "EstimatedCostMicro", "Value": cost * 1_000_000, "Unit": "Count"})
        # Store as micro-dollars to avoid CloudWatch floating-point precision issues

        for metric in metrics:
            self._buffer.append({
                "MetricName": metric["MetricName"],
                "Timestamp": timestamp,
                "Value": metric["Value"],
                "Unit": metric["Unit"],
                "Dimensions": dimensions,
            })

        if len(self._buffer) >= self.CLOUDWATCH_BATCH_LIMIT:
            self._flush()

    @contextmanager
    def invocation_timer(self, model_id: str, intent: str, **kwargs):
        """Context manager to automatically time and record an invocation.

        Usage:
            with publisher.invocation_timer("anthropic.claude-3-sonnet...", "product_search") as ctx:
                response = bedrock.invoke_model(...)
                ctx["input_tokens"] = response["usage"]["input_tokens"]
                ctx["output_tokens"] = response["usage"]["output_tokens"]
                ctx["quality_score"] = evaluate(response)
        """
        ctx = {"input_tokens": 0, "output_tokens": 0, "quality_score": 0.0, **kwargs}
        start = time.monotonic()
        try:
            yield ctx
        finally:
            elapsed_ms = (time.monotonic() - start) * 1000
            self.record_invocation(
                model_id=model_id,
                intent=intent,
                input_tokens=ctx["input_tokens"],
                output_tokens=ctx["output_tokens"],
                latency_ms=elapsed_ms,
                quality_score=ctx["quality_score"],
                cache_hit=ctx.get("cache_hit", False),
                guardrail_action=ctx.get("guardrail_action", "pass"),
                time_to_first_token_ms=ctx.get("time_to_first_token_ms"),
            )

    def _estimate_cost(self, model_id: str, input_tokens: int, output_tokens: int) -> float:
        """Estimate invocation cost in USD based on known pricing."""
        model_key = next(
            (k for k in self.PRICING if k in model_id),
            "anthropic.claude-3-haiku",  # conservative default
        )
        rates = self.PRICING[model_key]
        return (input_tokens * rates["input"] + output_tokens * rates["output"]) / 1000

    def _short_model_id(self, model_id: str) -> str:
        """Shorten model ID for CloudWatch dimension (reduce cardinality)."""
        # "anthropic.claude-3-sonnet-20240229-v1:0" → "claude-3-sonnet"
        parts = model_id.split(".")
        if len(parts) > 1:
            name_parts = parts[1].split("-")
            # Take up to the version date
            return "-".join(p for p in name_parts if not p[0].isdigit() and p != "v1:0")
        return model_id[:50]

    def flush(self) -> None:
        """Flush remaining buffered metrics to CloudWatch."""
        self._flush()

    def _flush(self) -> None:
        """Internal flush — sends buffered metrics in batches of 20."""
        if not self._buffer:
            return
        for i in range(0, len(self._buffer), self.CLOUDWATCH_BATCH_LIMIT):
            batch = self._buffer[i : i + self.CLOUDWATCH_BATCH_LIMIT]
            try:
                self.cloudwatch.put_metric_data(
                    Namespace=self.namespace,
                    MetricData=batch,
                )
            except Exception:
                logger.exception("Failed to publish metrics batch to CloudWatch")
                # In production: push to a dead-letter queue or local file
                # Do NOT raise — observability failures must not break the application
        self._buffer.clear()

Key Design Decisions

# Decision Choice Rationale Trade-off
1 Metric granularity Per-intent + per-model dimensions Enables drill-down by use case (product_search vs. order_status); keeps cardinality manageable (~20 intent × 3 model = 60 dimension combos) Higher cardinality = higher CloudWatch cost; mitigated by limiting to 2 dimensions
2 Trace sampling rate 5% in production, 100% in staging Cost control — X-Ray charges per trace recorded; 5% gives statistical significance at MangaAssist scale (~200K requests/day → 10K traces/day) May miss rare edge-case traces; compensate by force-sampling error paths at 100%
3 Log retention 30 days CloudWatch, 90 days S3 CloudWatch is expensive for long retention; S3 + Athena provides cheap long-term query; 90 days covers monthly business reviews Querying S3 via Athena is slower than CloudWatch Logs Insights; acceptable for non-urgent analysis
4 Alert thresholds 3 evaluation periods, 2-of-3 datapoints Avoids alert fatigue from transient spikes; 3-period window (3 × 60s = 3 min) balances responsiveness with stability Delays detection by up to 3 minutes; acceptable because GenAI issues rarely need sub-minute response
5 Dashboard refresh 60 seconds Balances real-time visibility with CloudWatch API cost; during incidents, operators can manually refresh Not real-time; for live debugging, operators use CloudWatch Logs Insights with Live Tail
6 Anomaly detection CloudWatch Anomaly Detection (2-band) Native AWS service, no ML infrastructure to manage; 2-band catches significant deviations without false positives Less flexible than custom models; supplement with static thresholds for known hard limits
7 Cost monitoring Micro-dollar metric + hourly aggregate CloudWatch doesn't support float precision well below $0.01; storing as micro-dollars (×1,000,000) preserves precision; hourly aggregate for alerting Requires Metric Math to convert back to USD for display; small added complexity
8 Multi-account strategy Single account with namespace isolation MangaAssist is a single-team product; multi-account adds operational overhead not justified at current scale If MangaAssist grows to multi-team, migrate to AWS Organizations with cross-account CloudWatch

Cross-References

Related Document Topic Connection
Debugging/01-bedrock-logging.md Bedrock invocation logging setup Detailed Bedrock logging configuration; this doc references those logs as data source
Debugging/02-application-logging.md Structured logging patterns Application log format and correlation ID patterns used in Pillar 3
13-metrics.md Business metric definitions Business metrics referenced in Pillar 1 are defined in detail here
LLMOps/llmops-user-stories.md LLMOps lifecycle monitoring Observability feeds into the LLMOps feedback loop for model updates
Skill 4.3.2 — CloudWatch Implementation CloudWatch setup Hands-on implementation of the CloudWatch metrics/alarms designed here
Skill 4.3.3 — Distributed Tracing X-Ray deep dive Detailed X-Ray instrumentation code for the traces described here
Skill 4.3.4 — Cost Monitoring Cost observability Deep dive into the cost metric pipeline sketched in this architecture
Skill 4.3.5 — Quality Monitoring Quality metrics Automated quality evaluation that feeds the quality_score metric
Skill 4.3.6 — Alerting & Remediation Alert design Alert rules and auto-remediation Lambda code for the action layer

Key Takeaways

  1. GenAI observability requires 4 pillars: Metrics, traces, logs, and events work together. No single pillar is sufficient — a cost spike (metric) needs a trace to find the expensive request, a log to see the prompt, and an event to identify the deployment that caused it.

  2. FM-specific metrics don't exist in traditional monitoring: Token counts, cost-per-request, quality scores, guardrail trigger rates, and hallucination rates are entirely new metric categories that must be explicitly instrumented.

  3. Correlation IDs are non-negotiable: Every observability signal — metric, trace, log, event — must carry correlation_id and session_id. Without them, debugging a GenAI issue is impossible because you need to reconstruct the full prompt chain.

  4. Cost is a first-class operational metric: Unlike traditional apps where infra costs are relatively fixed, GenAI costs scale with token volume and can spike unexpectedly. Real-time cost monitoring with anomaly detection prevents billing surprises.

  5. Quality monitoring replaces simple health checks: A GenAI app can return HTTP 200 with a completely wrong answer. Observability must include content-level quality evaluation, not just availability and latency.

  6. Observability must not break the application: The metric publisher swallows exceptions, uses buffering, and never blocks the request path. A failure in observability should be logged but must never degrade user experience.

  7. Maturity is a journey — start at Level 2, target Level 4: Don't attempt to build Level 5 (predictive) before you have solid Level 3 (tracing). Each level provides compounding value and reduces the mean-time-to-resolution for the next class of issues.