LOCAL PREVIEW View on GitHub

Integrated Observability Architecture for FM Applications

Skill Mapping

AWS Certification Task Skill Focus
AIP-C01 Task 4.3 — Monitor and observe FM applications Skill 4.3.3 Develop integrated observability solutions for actionable insights

MangaAssist Context: This file designs the unified observability platform that connects operational metrics, business impact, compliance monitoring, forensic audit trails, user interaction tracking, and model behavior analysis into a single, multi-stakeholder system. All examples use the MangaAssist e-commerce chatbot stack: Bedrock Claude 3 Sonnet/Haiku, OpenSearch Serverless, DynamoDB, ECS Fargate, and API Gateway WebSocket.


Mind Map — Integrated Observability Dimensions

mindmap
  root((Integrated<br/>Observability))
    **Operational Dashboards**
      Real-Time Widgets
        < 60s Refresh Cycle
        Headline KPI Cards
        Sparkline Trends
      SLA Tracking
        Availability vs Target
        Latency Budget Burn
        Error Budget Remaining
      Alert Correlation
        Group Related Alarms
        Suppress Duplicates
        Escalation Chains
      Infrastructure Health
        ECS Task Health
        API Gateway 4xx/5xx
        DynamoDB Capacity
    **Business Impact Visualizations**
      Revenue Attribution
        FM Quality → Cart Value
        Recommendation Lift
        Upsell Conversion
      Conversion Funnel
        FM Interaction Overlay
        Drop-Off Heatmap
        Intent-to-Purchase Path
      CSAT Correlation
        Quality Score → CSAT
        Response Time → CSAT
        Resolution Rate → CSAT
      Cost-Value Analysis
        Token Cost per Conversion
        Cost per Resolved Query
        ROI by Intent Category
    **Compliance Monitoring**
      PII Detection
        Comprehend Real-Time
        Auto-Redaction Pipeline
        Alert on PII Leakage
      Content Policy
        Hate/Violence/Adult
        Misinformation Flags
        Guardrail Block Rate
      GDPR Evidence
        Data Subject Access
        Right to Erasure
        Processing Records
      SOC2 Controls
        Access Audit Logs
        Change Management
        Incident Response
      Guardrail Effectiveness
        Block Rate by Category
        False Positive Rate
        Override Justification
    **Forensic Traceability**
      Immutable Audit Logs
        Hash-Chain Integrity
        Tamper Detection
        Retention Policy
      Request Replay
        Full Context Recreation
        Prompt + Context + Response
        Side-by-Side Comparison
      Incident Timeline
        Event Correlation
        Root Cause Mapping
        Impact Blast Radius
    **User Interaction Tracking**
      Session Analytics
        Turn Count Distribution
        Intent Transitions
        Time-to-Resolution
      Interaction Patterns
        Power User Profiles
        New User Friction
        Peak Usage Windows
      Abandonment Analysis
        Drop-Off Turn Number
        Pre-Abandonment Signals
        Recovery Opportunities
      Engagement Scoring
        Quality × Depth
        Resolution Weight
        Return Visit Factor
    **Model Behavior Patterns**
      Response Clustering
        Topic Grouping
        Style Consistency
        Length Distribution
      Behavioral Drift
        Embedding Similarity
        Vocabulary Shift
        Confidence Calibration
      Persona Consistency
        Tone Stability
        Knowledge Boundary
        Hallucination Zones
      Knowledge Mapping
        Strong Coverage Topics
        Weak Coverage Topics
        Emerging Gaps

Architecture — Unified Observability Platform

graph TB
    subgraph DS["🟠 Data Sources — MangaAssist Services"]
        style DS fill:#fff3e0,stroke:#e65100
        BEDROCK["Amazon Bedrock<br/>Claude 3 Sonnet/Haiku<br/>+ Guardrails"]
        OPENSEARCH["OpenSearch Serverless<br/>Product Embeddings<br/>Manga Knowledge Base"]
        ECS["ECS Fargate<br/>Orchestrator Service<br/>+ Intent Classifier"]
        APIGW["API Gateway<br/>WebSocket API<br/>User Connections"]
        DYNAMO["DynamoDB<br/>Sessions / Orders<br/>Product Catalog"]
        LAMBDA["Lambda Functions<br/>Auth / Routing<br/>Post-Processing"]
    end

    subgraph ETL["🔵 Processing & ETL Layer"]
        style ETL fill:#e3f2fd,stroke:#1565c0
        KDS["Kinesis Data Streams<br/>Real-Time Events<br/>< 1s Latency"]
        KDF["Kinesis Data Firehose<br/>Batch Delivery<br/>S3 Partitioned"]
        LTRANS["Lambda Transformers<br/>Enrichment<br/>PII Scrubbing"]
        COMPREHEND["Amazon Comprehend<br/>PII Detection<br/>Sentiment Analysis"]
        EB["EventBridge<br/>Event Router<br/>Rule Matching"]
    end

    subgraph STORE["🟢 Storage Layer"]
        style STORE fill:#e8f5e9,stroke:#2e7d32
        CW_METRICS["CloudWatch Metrics<br/>Hot Storage — 15-day High-Res<br/>1s/5s/60s Resolution"]
        S3_LAKE["S3 Data Lake<br/>Warm Storage — Partitioned<br/>year/month/day/hour"]
        DDB_AUDIT["DynamoDB Audit Table<br/>Immutable Hash-Chain<br/>TTL: 7 Years"]
        REDSHIFT["Redshift Serverless<br/>Cold Analytics<br/>Historical Trending"]
    end

    subgraph VIZ["🟢 Visualization & Serving Layer"]
        style VIZ fill:#f3e5f5,stroke:#6a1b9a
        CW_DASH["CloudWatch Dashboards<br/>Operations Team<br/>Real-Time < 60s"]
        QUICKSIGHT["Amazon QuickSight<br/>Business Stakeholders<br/>Revenue & Conversion"]
        GRAFANA["Amazon Managed Grafana<br/>ML Team<br/>Model Quality & Drift"]
        CUSTOM_UI["Custom React Dashboard<br/>Compliance Team<br/>Audit & Evidence"]
    end

    subgraph ACTION["🔴 Action Layer"]
        style ACTION fill:#ffebee,stroke:#c62828
        CW_ALARMS["CloudWatch Alarms<br/>Threshold + Anomaly"]
        SNS_TOPIC["SNS Topics<br/>PagerDuty / Slack<br/>Email Escalation"]
        REMEDIATE["Lambda Auto-Remediation<br/>Guardrail Tightening<br/>Model Fallback"]
        STEPFN["Step Functions<br/>Incident Workflow<br/>Evidence Collection"]
    end

    %% Data Source → ETL
    BEDROCK -->|"Invocation Logs"| KDS
    BEDROCK -->|"Guardrail Events"| EB
    OPENSEARCH -->|"Query Metrics"| KDS
    ECS -->|"App Logs + Traces"| KDS
    APIGW -->|"Connection Metrics"| CW_METRICS
    DYNAMO -->|"DDB Streams"| KDS
    LAMBDA -->|"Function Logs"| KDS

    %% ETL → Processing
    KDS -->|"Real-Time"| LTRANS
    LTRANS -->|"PII Check"| COMPREHEND
    LTRANS -->|"Enriched Events"| KDF
    EB -->|"Compliance Events"| STEPFN

    %% ETL → Storage
    LTRANS -->|"Metrics"| CW_METRICS
    KDF -->|"Partitioned Parquet"| S3_LAKE
    LTRANS -->|"Audit Entries"| DDB_AUDIT
    S3_LAKE -->|"COPY Command"| REDSHIFT

    %% Storage → Visualization
    CW_METRICS --> CW_DASH
    CW_METRICS --> GRAFANA
    S3_LAKE --> QUICKSIGHT
    REDSHIFT --> QUICKSIGHT
    DDB_AUDIT --> CUSTOM_UI
    CW_METRICS --> CUSTOM_UI

    %% Action Layer
    CW_METRICS --> CW_ALARMS
    CW_ALARMS --> SNS_TOPIC
    SNS_TOPIC --> REMEDIATE
    EB --> STEPFN

Multi-Stakeholder Dashboard Strategy

flowchart LR
    subgraph SOURCES["Data Sources"]
        CW["CloudWatch Metrics"]
        S3["S3 Data Lake"]
        RS["Redshift"]
        AUDIT["DynamoDB Audit"]
    end

    subgraph EXEC["Tier 1: Executive Dashboard<br/>👔 C-Suite / VP"]
        E1["Revenue Impact<br/>of FM Quality"]
        E2["Total Cost of<br/>FM Operations"]
        E3["Overall Quality<br/>Score (composite)"]
        E4["System<br/>Availability %"]
        E5["User Satisfaction<br/>Score (CSAT)"]
    end

    subgraph OPS["Tier 2: Operational Dashboard<br/>🔧 SRE / DevOps"]
        O1["P95 Latency<br/>by Intent"]
        O2["Error Rate<br/>5xx / Guardrail"]
        O3["Token Usage<br/>& Cost Rate"]
        O4["Guardrail<br/>Block Rate"]
        O5["Active WebSocket<br/>Sessions"]
        O6["Infrastructure<br/>Utilization"]
    end

    subgraph ML["Tier 3: ML Team Dashboard<br/>🧪 Data Scientists"]
        M1["Quality Score<br/>by Intent"]
        M2["Hallucination<br/>Rate Trend"]
        M3["Drift Metrics<br/>(Embedding Sim)"]
        M4["A/B Test<br/>Results"]
        M5["Prompt Version<br/>Comparison"]
    end

    subgraph COMP["Tier 4: Compliance Dashboard<br/>🛡️ Legal / Security"]
        C1["PII Detection<br/>Events"]
        C2["Content Policy<br/>Violations"]
        C3["Access Audit<br/>Log Review"]
        C4["GDPR/SOC2<br/>Evidence Status"]
    end

    CW --> OPS
    CW --> ML
    CW --> EXEC
    S3 --> ML
    S3 --> EXEC
    RS --> EXEC
    AUDIT --> COMP
    CW --> COMP

Dashboard Access Control Matrix

Dashboard Tier Refresh Rate Data Retention Access Group Authentication
Executive 5 min 90 days displayed, 2 years archived manga-exec-readonly SSO + MFA
Operational < 60 sec 15 days high-res, 63 days standard manga-sre-team SSO + VPN
ML Team 5 min 30 days dashboards, S3 for historical manga-ml-team SSO
Compliance 15 min 7 years (regulatory requirement) manga-compliance-audit SSO + MFA + IP restrict

Six Integration Points Deep Dive

1. Operational Metric Dashboards

Design Principle: Every widget must answer a single question in < 3 seconds.

Headline Widget Specification

Widget Metric Green Yellow Red Refresh
P95 Latency MangaAssist/Operations/ResponseLatency < 2.0s 2.0–4.0s > 4.0s 10s
Error Rate MangaAssist/Operations/ErrorRate < 1% 1–5% > 5% 10s
Token Cost/hr MangaAssist/Cost/TokenCostPerHour < $50 $50–100 > $100 60s
Quality Score MangaAssist/Quality/CompositeScore > 0.85 0.70–0.85 < 0.70 60s
Active Sessions MangaAssist/Operations/ActiveSessions < 5000 5000–8000 > 8000 10s
Guardrail Blocks MangaAssist/Guardrails/BlockRate < 2% 2–5% > 5% 30s

Alert Correlation Strategy

Reduce alert noise by grouping related signals:

Correlation Window: 5 minutes
Grouping Rules:
  - Group by: service + intent
  - Suppress child alerts if parent alarm active
  - Example: If "Bedrock Throttling" alarm fires, suppress:
    - "High Latency" alarms (effect of throttling)
    - "Quality Degradation" alarms (effect of fallback model)
    - "Token Cost Spike" alarms (effect of retry logic)
  - Only page once: "Bedrock Throttling → Cascading Impact"

Widget Dependency Graph

graph LR
    subgraph "Real-Time Metrics Pipeline"
        KDS_W["Kinesis Data Stream"]
        LAMBDA_W["Enrichment Lambda<br/>Intent + Model Tags"]
        CW_W["CloudWatch Metrics<br/>Custom Namespace"]
    end

    subgraph "Headline Widgets"
        W1["🟢 P95 Latency"]
        W2["🟢 Error Rate"]
        W3["🟡 Token Cost/hr"]
        W4["🟢 Quality Score"]
        W5["🟢 Active Sessions"]
        W6["🟢 Guardrail Blocks"]
    end

    subgraph "Drill-Down Panels"
        D1["Latency by Intent<br/>Time Series"]
        D2["Error Breakdown<br/>by Type"]
        D3["Cost by Model<br/>Stacked Bar"]
        D4["Quality Trend<br/>7-Day Rolling"]
        D5["Session Heatmap<br/>by Hour"]
        D6["Block Categories<br/>Pie Chart"]
    end

    KDS_W --> LAMBDA_W --> CW_W
    CW_W --> W1 & W2 & W3 & W4 & W5 & W6
    W1 -->|"Click"| D1
    W2 -->|"Click"| D2
    W3 -->|"Click"| D3
    W4 -->|"Click"| D4
    W5 -->|"Click"| D5
    W6 -->|"Click"| D6

2. Business Impact Visualizations

Core Question: How does FM quality translate to revenue?

Revenue Attribution Model

FM Quality Score → Recommendation Relevance → Cart Value Lift → Revenue Delta

Causal Chain (MangaAssist):
  1. Quality Score drops from 0.90 → 0.75 (–17%)
  2. Product recommendation relevance drops: Click-through rate –22%
  3. Average cart value drops: $42 → $35 (–17%)
  4. Daily orders affected: ~2,400 sessions × 12% conversion = 288 orders
  5. Revenue impact: 288 × $7 loss = $2,016/day potential loss

Business Attribution Pipeline

flowchart TB
    subgraph "FM Events"
        E1["Bedrock Invocation<br/>request_id, quality_score"]
        E2["Guardrail Event<br/>block/pass, category"]
        E3["Session End<br/>turns, satisfaction"]
    end

    subgraph "Business Events"
        B1["Product View<br/>from FM recommendation"]
        B2["Add to Cart<br/>correlated to session"]
        B3["Purchase Complete<br/>order value"]
    end

    subgraph "Attribution Pipeline"
        JOIN["Join on session_id<br/>within 30-min window"]
        ENRICH["Enrich with<br/>FM quality metrics"]
        ATTRIBUTE["Calculate<br/>attribution weight"]
    end

    subgraph "Visualizations"
        V1["Revenue by FM Quality<br/>Scatter Plot"]
        V2["Conversion Funnel<br/>with FM Overlay"]
        V3["CSAT × Quality<br/>Correlation Matrix"]
    end

    E1 & E2 & E3 --> JOIN
    B1 & B2 & B3 --> JOIN
    JOIN --> ENRICH --> ATTRIBUTE
    ATTRIBUTE --> V1 & V2 & V3

CSAT Correlation Analysis

FM Metric CSAT Correlation ® Actionable Threshold
Quality Score +0.72 Score < 0.80 → investigate prompt/retrieval
Response Latency –0.58 P95 > 3.5s → scale compute or use Haiku
Turn Count (resolution) –0.45 > 4 turns avg → improve first-response quality
Guardrail Block Rate –0.31 > 3% → review guardrail sensitivity
Recommendation Click-Through +0.66 CTR < 15% → check retrieval relevance

3. Compliance Monitoring

Regulatory Requirement: Every FM interaction involving PII must be detected, logged, and optionally redacted within 30 seconds.

Compliance Monitoring Pipeline

flowchart TB
    subgraph "Detection"
        INPUT["FM Response Text"]
        COMPREHEND_C["Amazon Comprehend<br/>detect_pii_entities()"]
        GUARDRAIL_C["Bedrock Guardrails<br/>Content Filter"]
    end

    subgraph "Classification"
        PII_CLASS["PII Classifier<br/>NAME / EMAIL / SSN / PHONE"]
        CONTENT_CLASS["Content Classifier<br/>HATE / VIOLENCE / ADULT<br/>MISINFORMATION"]
        SEVERITY["Severity Scorer<br/>INFO / WARNING / CRITICAL"]
    end

    subgraph "Action"
        REDACT["Auto-Redact<br/>Replace PII with [REDACTED]"]
        LOG_COMP["Compliance Event Log<br/>DynamoDB + S3"]
        ALERT_COMP["Alert Pipeline<br/>CRITICAL → PagerDuty<br/>WARNING → Slack"]
        CW_COMP["CloudWatch Metric<br/>MangaAssist/Compliance/*"]
    end

    subgraph "Evidence"
        S3_EV["S3 Evidence Bucket<br/>Encrypted, WORM Lock"]
        REPORT["Compliance Report<br/>Weekly Automated"]
        DSAR["GDPR DSAR Processor<br/>Data Subject Requests"]
    end

    INPUT --> COMPREHEND_C & GUARDRAIL_C
    COMPREHEND_C --> PII_CLASS
    GUARDRAIL_C --> CONTENT_CLASS
    PII_CLASS & CONTENT_CLASS --> SEVERITY

    SEVERITY -->|"PII Found"| REDACT
    SEVERITY -->|"All Events"| LOG_COMP
    SEVERITY -->|"WARNING/CRITICAL"| ALERT_COMP
    SEVERITY -->|"Metrics"| CW_COMP

    LOG_COMP --> S3_EV
    S3_EV --> REPORT
    S3_EV --> DSAR

SOC2 Control Evidence Automation

SOC2 Control Evidence Source Collection Method Frequency
CC6.1 Logical Access IAM CloudTrail logs EventBridge → S3 Real-time
CC6.3 Data Encryption KMS key usage logs CloudTrail → Athena Daily
CC7.2 Monitoring CloudWatch alarm history API → S3 archive Weekly
CC7.3 Incident Response Step Functions execution logs Direct S3 export Per incident
CC8.1 Change Management CodePipeline deployment logs EventBridge → S3 Per deployment
Custom FM Content Safety Guardrail block/pass metrics CloudWatch → S3 Real-time

4. Forensic Traceability & Audit Logging

Design Principle: Every FM interaction must be reconstructable months later, with tamper-proof integrity.

Immutable Audit Log — Hash-Chain Design

Entry N:
  entry_id: "audit-req-00042"
  previous_hash: "<hash of Entry N-1>"
  content: {request_id, session_id, model_id, intent, prompt_hash, response_hash, quality_score, ...}
  entry_hash: SHA256(entry_id + timestamp + previous_hash + content)

Entry N+1:
  previous_hash: "<entry_hash of Entry N>"   ← Links to previous
  ...

Tamper Detection:
  - Recalculate hash chain from any point
  - If computed hash ≠ stored hash → TAMPERING DETECTED
  - Genesis entry: previous_hash = "GENESIS"

Forensic Investigation Workflow

sequenceDiagram
    participant ALERT as CloudWatch Alarm
    participant SRE as SRE Engineer
    participant DASH as Ops Dashboard
    participant AUDIT as Audit Trail<br/>(DynamoDB)
    participant S3 as Evidence Store<br/>(S3)
    participant REPLAY as Request Replayer<br/>(Lambda)
    participant RCA as Root Cause<br/>Analysis

    ALERT->>SRE: "Quality Score < 0.70<br/>for intent: product_recommend"
    SRE->>DASH: Open Operational Dashboard
    DASH->>SRE: Show quality trend —<br/>drop started 14:32 UTC

    SRE->>AUDIT: Query: intent=product_recommend<br/>time_range=14:30-15:00
    AUDIT->>SRE: Return 847 audit entries<br/>with request_ids

    SRE->>SRE: Filter: quality_score < 0.50<br/>→ 23 entries identified

    SRE->>S3: Retrieve full request context<br/>for worst 5 entries
    S3->>SRE: Return: prompt, retrieved_docs,<br/>response, guardrail_action

    SRE->>REPLAY: Replay request req-00847<br/>with original context
    REPLAY->>SRE: Replayed response matches<br/>original — model consistent

    SRE->>SRE: Compare retrieved docs:<br/>stale product data found

    SRE->>RCA: Root Cause: OpenSearch index<br/>has stale embeddings from<br/>failed 14:00 re-index job
    RCA->>SRE: Resolution: Re-trigger index<br/>rebuild, add index health check

5. User Interaction Tracking

Core Question: Where do users succeed, struggle, and abandon?

Session Analytics Dimensions

Dimension Metric What It Reveals
Turn Count avg = 3.2, p95 = 8 Most queries resolve in 3 turns; long sessions may indicate confusion
Intent Transitions product_search → order_status (12%) Users often check order status after searching products
Time Between Turns median = 8s, p95 = 45s Long gaps may indicate user reading or frustration
Satisfaction Signal 👍 23%, 👎 4%, none 73% Low signal rate — most users don't rate
Abandonment Turn mode = 2, mean = 2.7 Users who abandon typically do so at turn 2

User Journey Funnel

flowchart TB
    START["Session Start<br/>100% of users<br/>(~8,400/day)"]
    T1["Turn 1: Initial Query<br/>98% continue<br/>(2% connection drops)"]
    T2["Turn 2: FM Response<br/>82% continue<br/>(16% abandon here)"]
    T3["Turn 3: Follow-Up<br/>71% continue<br/>(11% satisfied & leave)"]
    T4["Turn 4+: Deep Interaction<br/>45% continue<br/>(26% resolve or abandon)"]
    RESOLVE["Resolution<br/>38% resolve successfully"]
    ESCALATE["Escalation<br/>7% → Human Agent"]
    ABANDON["Abandonment<br/>55% leave without resolution"]

    START --> T1
    T1 --> T2
    T2 --> T3
    T3 --> T4
    T4 --> RESOLVE
    T4 --> ESCALATE
    T1 --> ABANDON
    T2 --> ABANDON
    T3 --> ABANDON
    T4 --> ABANDON

    style ABANDON fill:#ffcdd2,stroke:#c62828
    style RESOLVE fill:#c8e6c9,stroke:#2e7d32
    style ESCALATE fill:#fff9c4,stroke:#f9a825

Abandonment Analysis — MangaAssist Findings

Top Abandonment Reasons (from session analysis):
  1. Turn 2 — FM asks clarifying question instead of answering (38% of abandonments)
     → Action: Improve first-response completeness for top-5 intents
  2. Turn 2 — Response too long (> 400 words) for simple query (22%)
     → Action: Add response length guardrails by intent
  3. Turn 3 — FM repeats information from Turn 1 (15%)
     → Action: Improve session context injection into prompt
  4. Turn 1 — Latency > 5s, user leaves before response (14%)
     → Action: Streaming responses, precomputed answers for FAQs
  5. Turn 4+ — FM unable to resolve, no escalation offered (11%)
     → Action: Auto-offer human escalation after 3 failed turns

6. Model Behavior Pattern Tracking

Core Question: Is the FM behaving consistently and within expected boundaries?

Response Clustering

Group FM responses into behavioral clusters to detect emergent patterns:

Clustering Method:
  1. Embed each FM response using a small embedder (e.g., Titan Embed v2)
  2. Apply UMAP dimensionality reduction (n_neighbors=15, min_dist=0.1)
  3. Cluster with HDBSCAN (min_cluster_size=50)
  4. Label clusters by inspecting sample responses

MangaAssist Discovered Clusters:
  Cluster 0: "Product Description" — detailed manga descriptions (34% of responses)
  Cluster 1: "Order Status" — structured order info (22%)
  Cluster 2: "Recommendation" — personalized suggestions (18%)
  Cluster 3: "Clarifying Question" — asks for more info (12%)
  Cluster 4: "Apology/Limitation" — cannot help responses (8%)
  Cluster 5: "Off-Topic" — responses outside intended scope (4%)
  Cluster 6: "Price/Availability" — specific product data (2%)

Alert: If Cluster 5 ("Off-Topic") grows > 6% → investigate prompt drift
Alert: If Cluster 4 ("Apology") grows > 12% → knowledge gap expanding

Behavioral Drift Detection

Method: Track cosine similarity of response embeddings over time

Baseline: Rolling 7-day window of mean response embeddings per intent
Current: Today's mean response embedding per intent
Drift Score: 1 - cosine_similarity(baseline, current)

Thresholds:
  drift < 0.05  → Normal variation
  0.05 ≤ drift < 0.15 → Monitor (log but don't alert)
  drift ≥ 0.15 → Alert: Significant behavioral shift detected

Common Drift Causes in MangaAssist:
  - Knowledge base update changed retrieval distribution
  - Prompt template change altered response style
  - Model version update (Claude 3 minor version)
  - Seasonal product catalog changes

Persona Consistency Monitoring

MangaAssist FM Persona: "Friendly manga expert, knowledgeable about series,
  volumes, genres. Professional but enthusiastic. Never uses slang.
  Always provides specific volume/chapter references."

Monitoring Dimensions:
  1. Tone Score: sentiment analysis should show 70-85% positive
  2. Domain Vocabulary: manga-specific terms should appear in 60%+ of responses
  3. Specificity: named entities (series, volumes) should appear in 50%+ of product responses
  4. Length Consistency: std dev of response length should be < 40% of mean

Alert if: Tone drops below 60% positive OR domain vocabulary drops below 50%
  → Likely cause: prompt template change or context window overflow

Knowledge Boundary Mapping

Topic Area Confidence Evidence Action
Popular manga series (top 100) High (0.92) Accurate details, correct volumes Maintain
New releases (< 30 days) Medium (0.71) Sometimes missing latest volume Increase reindex frequency
Pricing & availability High (0.88) Real-time tool calling works well Maintain
Shipping & logistics Medium (0.65) Sometimes gives outdated policies Update knowledge base
Mature/adult content policy Low (0.52) Inconsistent guardrail application Tighten guardrails + prompt
Competitor comparisons Low (0.41) FM hallucinates competitor data Add explicit refusal in prompt

HLD: Integrated Platform Data Model

from dataclasses import dataclass, field
from typing import Dict, List, Optional
from datetime import datetime
from enum import Enum


class StakeholderType(Enum):
    EXECUTIVE = "executive"
    OPERATIONS = "operations"
    ML_TEAM = "ml_team"
    COMPLIANCE = "compliance"


class DashboardWidget(Enum):
    SINGLE_VALUE = "single_value"
    TIME_SERIES = "time_series"
    BAR_CHART = "bar_chart"
    PIE_CHART = "pie_chart"
    HEATMAP = "heatmap"
    FUNNEL = "funnel"
    TABLE = "table"


class ComplianceCategory(Enum):
    PII_DETECTED = "pii_detected"
    CONTENT_VIOLATION = "content_violation"
    ACCESS_AUDIT = "access_audit"
    DATA_SUBJECT_REQUEST = "data_subject_request"
    GUARDRAIL_BLOCK = "guardrail_block"


class Severity(Enum):
    INFO = "info"
    WARNING = "warning"
    CRITICAL = "critical"


@dataclass
class AuditLogEntry:
    """Immutable audit log entry with hash-chain integrity.

    Each entry links to the previous via previous_hash, forming
    a tamper-evident chain. Any modification breaks the chain.
    """
    entry_id: str
    timestamp: datetime
    previous_hash: str
    request_id: str
    session_id: str
    user_id_hash: str          # NEVER store raw user ID
    action: str                # "fm_invocation", "guardrail_block", "tool_call"
    model_id: str              # "anthropic.claude-3-sonnet-v1"
    intent: str                # "product_search", "order_status", etc.
    prompt_hash: str           # SHA256 of sanitized prompt (never raw text)
    response_hash: str         # SHA256 of response (never raw text in audit)
    quality_score: float       # 0.0 - 1.0 composite score
    guardrail_action: str      # "pass", "block", "redact"
    latency_ms: int = 0
    token_input: int = 0
    token_output: int = 0
    compliance_flags: List[str] = field(default_factory=list)
    entry_hash: str = ""       # Computed: SHA256(all fields + previous_hash)

    def compute_hash(self) -> str:
        import hashlib
        content = (
            f"{self.entry_id}|{self.timestamp.isoformat()}|"
            f"{self.previous_hash}|{self.request_id}|"
            f"{self.session_id}|{self.action}|"
            f"{self.model_id}|{self.intent}|"
            f"{self.prompt_hash}|{self.response_hash}|"
            f"{self.quality_score}|{self.guardrail_action}"
        )
        self.entry_hash = hashlib.sha256(content.encode()).hexdigest()
        return self.entry_hash


@dataclass
class ComplianceEvent:
    """Compliance monitoring event for regulatory tracking."""
    event_id: str
    timestamp: datetime
    category: ComplianceCategory
    severity: Severity
    request_id: str
    description: str
    details: Dict[str, str] = field(default_factory=dict)
    auto_remediated: bool = False
    remediation_action: Optional[str] = None
    evidence_s3_key: Optional[str] = None
    reviewer: Optional[str] = None
    reviewed_at: Optional[datetime] = None


@dataclass
class UserInteractionMetrics:
    """Session-level user interaction tracking."""
    session_id: str
    user_id_hash: str
    start_time: datetime
    end_time: Optional[datetime] = None
    turn_count: int = 0
    intents_used: List[str] = field(default_factory=list)
    satisfaction_signal: Optional[str] = None  # "thumbs_up", "thumbs_down"
    abandoned: bool = False
    abandonment_turn: Optional[int] = None
    escalated: bool = False
    engagement_score: float = 0.0
    avg_quality_score: float = 0.0
    total_latency_ms: int = 0


@dataclass
class ModelBehaviorSnapshot:
    """Periodic snapshot of model behavior patterns."""
    snapshot_id: str
    timestamp: datetime
    intent: str
    period_hours: int = 24
    response_count: int = 0
    mean_embedding: Optional[List[float]] = None  # Centroid of response embeddings
    drift_score: float = 0.0                       # vs previous snapshot
    cluster_distribution: Dict[str, float] = field(default_factory=dict)
    avg_response_length: float = 0.0
    tone_score: float = 0.0
    domain_vocabulary_ratio: float = 0.0

LLD: Integrated Observability Platform

import boto3
import json
import hashlib
import logging
from datetime import datetime, timezone
from typing import Dict, List, Optional
from decimal import Decimal

logger = logging.getLogger(__name__)


class IntegratedObservabilityPlatform:
    """Unified platform connecting all observability signals to stakeholder dashboards.

    Processes every FM interaction through 4 integration points:
      1. Operational metrics → CloudWatch → Dashboards + Alarms
      2. Compliance checks → Comprehend PII + Guardrail events → Evidence
      3. Immutable audit log → DynamoDB hash-chain → Forensic queries
      4. User interaction tracking → Session metrics → Engagement scoring
    """

    def __init__(self, config: dict):
        self.cloudwatch = boto3.client("cloudwatch")
        self.dynamodb = boto3.resource("dynamodb")
        self.s3 = boto3.client("s3")
        self.comprehend = boto3.client("comprehend")
        self.audit_table = self.dynamodb.Table(config["audit_table_name"])
        self.compliance_table = self.dynamodb.Table(config["compliance_table_name"])
        self.evidence_bucket = config["evidence_bucket"]
        self._last_audit_hash = "GENESIS"

    # ------------------------------------------------------------------ #
    #  Main Entry Point
    # ------------------------------------------------------------------ #

    def process_interaction(self, event: dict) -> dict:
        """Process a complete user interaction through all integration points.

        Args:
            event: Dict with keys: request_id, session_id, model_id, intent,
                   response_text, quality_score, latency_ms, turn_number,
                   guardrail_action, satisfaction_signal, token_input, token_output

        Returns:
            Dict with results from each integration point.
        """
        results = {}

        # 1. Publish operational metrics (real-time dashboards)
        results["operational"] = self._publish_operational_metrics(event)

        # 2. Check compliance (PII detection, content policy)
        results["compliance"] = self._check_compliance(event)

        # 3. Write immutable audit log (hash-chain linked)
        results["audit"] = self._write_audit_log(event, results["compliance"])

        # 4. Track user interaction (session-level metrics)
        results["interaction"] = self._track_user_interaction(event)

        return results

    # ------------------------------------------------------------------ #
    #  Integration Point 1: Operational Metrics
    # ------------------------------------------------------------------ #

    def _publish_operational_metrics(self, event: dict) -> dict:
        """Publish real-time metrics to CloudWatch for operational dashboards."""
        dimensions = [
            {"Name": "Intent", "Value": event.get("intent", "unknown")},
            {"Name": "ModelId", "Value": event.get("model_id", "unknown")},
        ]

        metric_data = [
            {
                "MetricName": "ResponseLatency",
                "Value": event.get("latency_ms", 0),
                "Unit": "Milliseconds",
                "Dimensions": dimensions,
                "Timestamp": datetime.now(timezone.utc),
            },
            {
                "MetricName": "QualityScore",
                "Value": event.get("quality_score", 0),
                "Unit": "None",
                "Dimensions": dimensions,
                "Timestamp": datetime.now(timezone.utc),
            },
            {
                "MetricName": "TokensInput",
                "Value": event.get("token_input", 0),
                "Unit": "Count",
                "Dimensions": dimensions,
            },
            {
                "MetricName": "TokensOutput",
                "Value": event.get("token_output", 0),
                "Unit": "Count",
                "Dimensions": dimensions,
            },
            {
                "MetricName": "ActiveSessions",
                "Value": 1,
                "Unit": "Count",
                "Dimensions": [{"Name": "Intent", "Value": event.get("intent", "unknown")}],
            },
        ]

        # Guardrail block metric
        if event.get("guardrail_action") == "block":
            metric_data.append({
                "MetricName": "GuardrailBlocks",
                "Value": 1,
                "Unit": "Count",
                "Dimensions": dimensions,
            })

        self.cloudwatch.put_metric_data(
            Namespace="MangaAssist/Operations",
            MetricData=metric_data,
        )
        return {"published": True, "metric_count": len(metric_data)}

    # ------------------------------------------------------------------ #
    #  Integration Point 2: Compliance Monitoring
    # ------------------------------------------------------------------ #

    def _check_compliance(self, event: dict) -> dict:
        """Run compliance checks: PII detection + content policy."""
        response_text = event.get("response_text", "")
        compliance_result = {
            "pii_detected": False,
            "pii_types": [],
            "guardrail_action": event.get("guardrail_action", "pass"),
            "events": [],
        }

        # PII detection using Comprehend (truncate to 5000 char limit)
        if response_text:
            text_to_check = response_text[:5000]
            pii_result = self.comprehend.detect_pii_entities(
                Text=text_to_check, LanguageCode="en"
            )
            pii_entities = pii_result.get("Entities", [])

            if pii_entities:
                compliance_result["pii_detected"] = True
                compliance_result["pii_types"] = list(
                    set(e["Type"] for e in pii_entities)
                )

                # Publish PII metric
                self.cloudwatch.put_metric_data(
                    Namespace="MangaAssist/Compliance",
                    MetricData=[{
                        "MetricName": "PIIDetected",
                        "Value": len(pii_entities),
                        "Unit": "Count",
                        "Dimensions": [
                            {"Name": "Intent", "Value": event.get("intent", "unknown")}
                        ],
                    }],
                )

                # Create compliance event
                comp_event = {
                    "event_id": f"pii-{event['request_id']}",
                    "timestamp": datetime.now(timezone.utc).isoformat(),
                    "category": "pii_detected",
                    "severity": "critical" if "SSN" in compliance_result["pii_types"] else "warning",
                    "request_id": event["request_id"],
                    "pii_types": compliance_result["pii_types"],
                    "auto_remediated": True,
                    "remediation_action": "response_redacted",
                }
                compliance_result["events"].append(comp_event)
                self._store_compliance_event(comp_event)

        # Track guardrail blocks as compliance events
        if event.get("guardrail_action") == "block":
            block_event = {
                "event_id": f"block-{event['request_id']}",
                "timestamp": datetime.now(timezone.utc).isoformat(),
                "category": "guardrail_block",
                "severity": "info",
                "request_id": event["request_id"],
                "auto_remediated": True,
            }
            compliance_result["events"].append(block_event)
            self._store_compliance_event(block_event)

        return compliance_result

    def _store_compliance_event(self, event: dict):
        """Persist compliance event to DynamoDB + S3 evidence bucket."""
        self.compliance_table.put_item(Item=event)

        # Archive to S3 for long-term evidence retention
        s3_key = (
            f"compliance/{event['category']}/"
            f"{datetime.now(timezone.utc).strftime('%Y/%m/%d')}/"
            f"{event['event_id']}.json"
        )
        self.s3.put_object(
            Bucket=self.evidence_bucket,
            Key=s3_key,
            Body=json.dumps(event, default=str),
            ServerSideEncryption="aws:kms",
        )

    # ------------------------------------------------------------------ #
    #  Integration Point 3: Immutable Audit Log
    # ------------------------------------------------------------------ #

    def _write_audit_log(self, event: dict, compliance: dict) -> dict:
        """Write an immutable, hash-chain-linked audit log entry."""
        entry_id = f"audit-{event['request_id']}"
        timestamp = datetime.now(timezone.utc).isoformat()

        # Hash the prompt and response (never store raw text in audit)
        prompt_hash = hashlib.sha256(
            event.get("prompt_text", "").encode()
        ).hexdigest()
        response_hash = hashlib.sha256(
            event.get("response_text", "").encode()
        ).hexdigest()

        # Build hash-chain content
        content = (
            f"{entry_id}|{timestamp}|{self._last_audit_hash}|"
            f"{event['request_id']}|{event.get('session_id', '')}|"
            f"{event.get('model_id', '')}|{event.get('intent', 'unknown')}|"
            f"{prompt_hash}|{response_hash}|"
            f"{event.get('quality_score', 0)}|{event.get('guardrail_action', 'pass')}"
        )
        entry_hash = hashlib.sha256(content.encode()).hexdigest()

        item = {
            "entry_id": entry_id,
            "timestamp": timestamp,
            "previous_hash": self._last_audit_hash,
            "entry_hash": entry_hash,
            "request_id": event["request_id"],
            "session_id": event.get("session_id", ""),
            "model_id": event.get("model_id", ""),
            "intent": event.get("intent", "unknown"),
            "prompt_hash": prompt_hash,
            "response_hash": response_hash,
            "quality_score": Decimal(str(event.get("quality_score", 0))),
            "latency_ms": event.get("latency_ms", 0),
            "token_input": event.get("token_input", 0),
            "token_output": event.get("token_output", 0),
            "compliance_flags": compliance.get("pii_types", []),
            "guardrail_action": event.get("guardrail_action", "pass"),
        }
        self.audit_table.put_item(Item=item)
        self._last_audit_hash = entry_hash

        return {"entry_id": entry_id, "hash": entry_hash}

    # ------------------------------------------------------------------ #
    #  Integration Point 4: User Interaction Tracking
    # ------------------------------------------------------------------ #

    def _track_user_interaction(self, event: dict) -> dict:
        """Track session-level user interaction metrics and engagement score."""
        session_id = event.get("session_id", "")
        turn = event.get("turn_number", 1)
        satisfaction = event.get("satisfaction_signal")
        quality = event.get("quality_score", 0)
        latency = event.get("latency_ms", 0)

        # Engagement scoring formula:
        #   40% quality × 30% interaction depth × 30% satisfaction
        quality_component = quality * 0.4
        depth_component = min(turn / 5.0, 1.0) * 0.3
        satisfaction_component = (
            1.0 if satisfaction == "thumbs_up"
            else 0.0 if satisfaction == "thumbs_down"
            else 0.5  # neutral/no signal
        ) * 0.3
        engagement = min(1.0, quality_component + depth_component + satisfaction_component)

        # Publish engagement metric
        self.cloudwatch.put_metric_data(
            Namespace="MangaAssist/UserInteraction",
            MetricData=[
                {
                    "MetricName": "EngagementScore",
                    "Value": engagement,
                    "Unit": "None",
                    "Dimensions": [
                        {"Name": "Intent", "Value": event.get("intent", "unknown")}
                    ],
                },
                {
                    "MetricName": "TurnCount",
                    "Value": turn,
                    "Unit": "Count",
                    "Dimensions": [
                        {"Name": "SessionId", "Value": session_id}
                    ],
                },
            ],
        )

        return {
            "session_id": session_id,
            "turn": turn,
            "engagement_score": round(engagement, 3),
            "satisfaction": satisfaction,
        }

    # ------------------------------------------------------------------ #
    #  Forensic Query Utilities
    # ------------------------------------------------------------------ #

    def verify_audit_chain(self, start_entry_id: str, count: int = 100) -> dict:
        """Verify integrity of audit hash chain starting from a given entry."""
        response = self.audit_table.query(
            KeyConditionExpression="entry_id >= :start",
            ExpressionAttributeValues={":start": start_entry_id},
            Limit=count,
        )

        entries = response.get("Items", [])
        if not entries:
            return {"valid": False, "error": "No entries found"}

        chain_valid = True
        broken_at = None

        for i in range(1, len(entries)):
            expected_prev = entries[i - 1]["entry_hash"]
            actual_prev = entries[i]["previous_hash"]
            if expected_prev != actual_prev:
                chain_valid = False
                broken_at = entries[i]["entry_id"]
                break

        return {
            "valid": chain_valid,
            "entries_checked": len(entries),
            "broken_at": broken_at,
        }

    def replay_request(self, request_id: str) -> dict:
        """Retrieve full context of a past request for forensic investigation."""
        # Get audit entry
        audit_response = self.audit_table.get_item(
            Key={"entry_id": f"audit-{request_id}"}
        )
        audit_entry = audit_response.get("Item", {})

        # Get compliance events
        comp_response = self.compliance_table.query(
            IndexName="request-id-index",
            KeyConditionExpression="request_id = :rid",
            ExpressionAttributeValues={":rid": request_id},
        )
        compliance_events = comp_response.get("Items", [])

        # Get full request context from S3
        s3_key = f"interactions/{request_id}.json"
        try:
            s3_obj = self.s3.get_object(Bucket=self.evidence_bucket, Key=s3_key)
            full_context = json.loads(s3_obj["Body"].read())
        except self.s3.exceptions.NoSuchKey:
            full_context = None

        return {
            "audit_entry": audit_entry,
            "compliance_events": compliance_events,
            "full_context": full_context,
        }

Integration with Existing Content

This File's Section Existing File Relationship
Operational Dashboards (§1) 08-reporting-visualization-systems.md Extends dashboard patterns with FM-specific real-time widgets and alert correlation
Business Impact Visualizations (§2) 13-metrics.md Adds visualization layer and revenue attribution on top of metric definitions
Compliance Monitoring (§3) 12-security-privacy.md Operationalizes security requirements into real-time compliance monitoring pipeline
Forensic Audit Logging (§4) 02-application-logging.md Builds tamper-proof audit trail beyond standard application logging
User Interaction Tracking (§5) 03-use-cases.md Instruments user journeys defined in use cases with measurable tracking
Model Behavior Patterns (§6) llmops-user-stories.md Adds continuous behavior monitoring to the LLMOps lifecycle

Key Design Decisions

Decision Choice Rationale Trade-Off
Dashboard refresh rate 10s ops, 5min exec, 15min compliance Ops needs real-time; executives need stable trends; compliance needs accuracy over speed Higher CloudWatch API costs for real-time (~$3/month per custom metric)
Audit log immutability Hash-chain (SHA256) in DynamoDB Software-level tamper evidence without blockchain overhead Not cryptographically provable like a real blockchain; sufficient for SOC2
PII detection approach Real-time Comprehend per response Sub-second detection enables auto-redaction before user sees PII Comprehend cost at ~$0.0001/request; ~$72/month at 1M requests
Compliance evidence retention 7 years in S3 Glacier Deep Archive Regulatory requirement (SOC2, GDPR); Glacier DA costs ~$1/TB/month Retrieval takes 12–48 hours; keep 90-day hot copy in S3 Standard
User tracking privacy Hash user IDs, never store raw PII GDPR-compliant by design; anonymized analytics still useful Cannot identify individual users from analytics; requires separate identity mapping for DSAR
Stakeholder access control IAM groups per dashboard tier + SSO Least-privilege by stakeholder role Requires IAM group management overhead; mitigate with IaC
Alert correlation window 5-minute grouping window Balances noise reduction with detection speed May delay alerting for fast-moving cascading failures by up to 5 min
Behavioral drift threshold drift ≥ 0.15 triggers alert Empirically tuned: 0.10 was too noisy, 0.20 missed real drift events May need seasonal adjustment for product catalog changes
Engagement scoring weights 40% quality, 30% depth, 30% satisfaction Quality is most controllable; depth and satisfaction provide user signal Low satisfaction signal rate (27%) makes that component noisy

Cross-References

Within Skill 4.3.3 (Integrated Observability)

Other Skills in Task 4.3

  • Skill-4.3.1 — Foundational metrics/traces/logs that feed this integrated platform
  • Skill-4.3.2 — GenAI-specific monitoring signals consumed here
  • Skill-4.3.4 — Tool call metrics flow into operational dashboards
  • Skill-4.3.5 — Vector store health feeds infrastructure widget
  • Skill-4.3.6 — Troubleshooting workflows consume audit trail + forensic replay

Existing Content


Key Takeaways

  1. Unified > Siloed: Integrated observability connects operational metrics, business impact, compliance, and user behavior into a single platform — siloed dashboards hide the correlations that matter most (e.g., quality drop → revenue impact → user abandonment).

  2. Compliance is a First-Class Signal: PII detection, content policy enforcement, and regulatory evidence collection must be real-time, automated, and auditable — not an afterthought bolted on after launch.

  3. Tamper-Proof Audit Trail: Hash-chain linking of audit entries provides software-level integrity verification sufficient for SOC2 — any modification to historical entries breaks the chain and triggers immediate alerting.

  4. Multi-Stakeholder Access: Executives need 5 KPIs on revenue impact; SREs need sub-minute operational metrics; ML teams need quality drift trends; compliance needs 7-year evidence archives — one platform, four views, strict access control.

  5. Business Attribution is the Ultimate Justification: Connecting FM quality scores to conversion rates and revenue provides the business case for continued FM investment — without this attribution, FM quality improvements are invisible to executives.

  6. Behavioral Drift Catches What Metrics Miss: Traditional metrics (latency, errors) won't detect when an FM subtly changes its response style, loses domain vocabulary, or drifts out of persona — embedding-based behavioral drift detection fills this gap.

  7. User Abandonment is the Hidden Cost: 55% of MangaAssist sessions end without resolution — tracking exactly where and why users abandon (Turn 2 clarifying questions being the #1 cause) provides the most actionable improvement signal.