Integrated Observability Architecture for FM Applications
Skill Mapping
| AWS Certification | Task | Skill | Focus |
|---|---|---|---|
| AIP-C01 | Task 4.3 — Monitor and observe FM applications | Skill 4.3.3 | Develop integrated observability solutions for actionable insights |
MangaAssist Context: This file designs the unified observability platform that connects operational metrics, business impact, compliance monitoring, forensic audit trails, user interaction tracking, and model behavior analysis into a single, multi-stakeholder system. All examples use the MangaAssist e-commerce chatbot stack: Bedrock Claude 3 Sonnet/Haiku, OpenSearch Serverless, DynamoDB, ECS Fargate, and API Gateway WebSocket.
Mind Map — Integrated Observability Dimensions
mindmap
root((Integrated<br/>Observability))
**Operational Dashboards**
Real-Time Widgets
< 60s Refresh Cycle
Headline KPI Cards
Sparkline Trends
SLA Tracking
Availability vs Target
Latency Budget Burn
Error Budget Remaining
Alert Correlation
Group Related Alarms
Suppress Duplicates
Escalation Chains
Infrastructure Health
ECS Task Health
API Gateway 4xx/5xx
DynamoDB Capacity
**Business Impact Visualizations**
Revenue Attribution
FM Quality → Cart Value
Recommendation Lift
Upsell Conversion
Conversion Funnel
FM Interaction Overlay
Drop-Off Heatmap
Intent-to-Purchase Path
CSAT Correlation
Quality Score → CSAT
Response Time → CSAT
Resolution Rate → CSAT
Cost-Value Analysis
Token Cost per Conversion
Cost per Resolved Query
ROI by Intent Category
**Compliance Monitoring**
PII Detection
Comprehend Real-Time
Auto-Redaction Pipeline
Alert on PII Leakage
Content Policy
Hate/Violence/Adult
Misinformation Flags
Guardrail Block Rate
GDPR Evidence
Data Subject Access
Right to Erasure
Processing Records
SOC2 Controls
Access Audit Logs
Change Management
Incident Response
Guardrail Effectiveness
Block Rate by Category
False Positive Rate
Override Justification
**Forensic Traceability**
Immutable Audit Logs
Hash-Chain Integrity
Tamper Detection
Retention Policy
Request Replay
Full Context Recreation
Prompt + Context + Response
Side-by-Side Comparison
Incident Timeline
Event Correlation
Root Cause Mapping
Impact Blast Radius
**User Interaction Tracking**
Session Analytics
Turn Count Distribution
Intent Transitions
Time-to-Resolution
Interaction Patterns
Power User Profiles
New User Friction
Peak Usage Windows
Abandonment Analysis
Drop-Off Turn Number
Pre-Abandonment Signals
Recovery Opportunities
Engagement Scoring
Quality × Depth
Resolution Weight
Return Visit Factor
**Model Behavior Patterns**
Response Clustering
Topic Grouping
Style Consistency
Length Distribution
Behavioral Drift
Embedding Similarity
Vocabulary Shift
Confidence Calibration
Persona Consistency
Tone Stability
Knowledge Boundary
Hallucination Zones
Knowledge Mapping
Strong Coverage Topics
Weak Coverage Topics
Emerging Gaps
Architecture — Unified Observability Platform
graph TB
subgraph DS["🟠 Data Sources — MangaAssist Services"]
style DS fill:#fff3e0,stroke:#e65100
BEDROCK["Amazon Bedrock<br/>Claude 3 Sonnet/Haiku<br/>+ Guardrails"]
OPENSEARCH["OpenSearch Serverless<br/>Product Embeddings<br/>Manga Knowledge Base"]
ECS["ECS Fargate<br/>Orchestrator Service<br/>+ Intent Classifier"]
APIGW["API Gateway<br/>WebSocket API<br/>User Connections"]
DYNAMO["DynamoDB<br/>Sessions / Orders<br/>Product Catalog"]
LAMBDA["Lambda Functions<br/>Auth / Routing<br/>Post-Processing"]
end
subgraph ETL["🔵 Processing & ETL Layer"]
style ETL fill:#e3f2fd,stroke:#1565c0
KDS["Kinesis Data Streams<br/>Real-Time Events<br/>< 1s Latency"]
KDF["Kinesis Data Firehose<br/>Batch Delivery<br/>S3 Partitioned"]
LTRANS["Lambda Transformers<br/>Enrichment<br/>PII Scrubbing"]
COMPREHEND["Amazon Comprehend<br/>PII Detection<br/>Sentiment Analysis"]
EB["EventBridge<br/>Event Router<br/>Rule Matching"]
end
subgraph STORE["🟢 Storage Layer"]
style STORE fill:#e8f5e9,stroke:#2e7d32
CW_METRICS["CloudWatch Metrics<br/>Hot Storage — 15-day High-Res<br/>1s/5s/60s Resolution"]
S3_LAKE["S3 Data Lake<br/>Warm Storage — Partitioned<br/>year/month/day/hour"]
DDB_AUDIT["DynamoDB Audit Table<br/>Immutable Hash-Chain<br/>TTL: 7 Years"]
REDSHIFT["Redshift Serverless<br/>Cold Analytics<br/>Historical Trending"]
end
subgraph VIZ["🟢 Visualization & Serving Layer"]
style VIZ fill:#f3e5f5,stroke:#6a1b9a
CW_DASH["CloudWatch Dashboards<br/>Operations Team<br/>Real-Time < 60s"]
QUICKSIGHT["Amazon QuickSight<br/>Business Stakeholders<br/>Revenue & Conversion"]
GRAFANA["Amazon Managed Grafana<br/>ML Team<br/>Model Quality & Drift"]
CUSTOM_UI["Custom React Dashboard<br/>Compliance Team<br/>Audit & Evidence"]
end
subgraph ACTION["🔴 Action Layer"]
style ACTION fill:#ffebee,stroke:#c62828
CW_ALARMS["CloudWatch Alarms<br/>Threshold + Anomaly"]
SNS_TOPIC["SNS Topics<br/>PagerDuty / Slack<br/>Email Escalation"]
REMEDIATE["Lambda Auto-Remediation<br/>Guardrail Tightening<br/>Model Fallback"]
STEPFN["Step Functions<br/>Incident Workflow<br/>Evidence Collection"]
end
%% Data Source → ETL
BEDROCK -->|"Invocation Logs"| KDS
BEDROCK -->|"Guardrail Events"| EB
OPENSEARCH -->|"Query Metrics"| KDS
ECS -->|"App Logs + Traces"| KDS
APIGW -->|"Connection Metrics"| CW_METRICS
DYNAMO -->|"DDB Streams"| KDS
LAMBDA -->|"Function Logs"| KDS
%% ETL → Processing
KDS -->|"Real-Time"| LTRANS
LTRANS -->|"PII Check"| COMPREHEND
LTRANS -->|"Enriched Events"| KDF
EB -->|"Compliance Events"| STEPFN
%% ETL → Storage
LTRANS -->|"Metrics"| CW_METRICS
KDF -->|"Partitioned Parquet"| S3_LAKE
LTRANS -->|"Audit Entries"| DDB_AUDIT
S3_LAKE -->|"COPY Command"| REDSHIFT
%% Storage → Visualization
CW_METRICS --> CW_DASH
CW_METRICS --> GRAFANA
S3_LAKE --> QUICKSIGHT
REDSHIFT --> QUICKSIGHT
DDB_AUDIT --> CUSTOM_UI
CW_METRICS --> CUSTOM_UI
%% Action Layer
CW_METRICS --> CW_ALARMS
CW_ALARMS --> SNS_TOPIC
SNS_TOPIC --> REMEDIATE
EB --> STEPFN
Multi-Stakeholder Dashboard Strategy
flowchart LR
subgraph SOURCES["Data Sources"]
CW["CloudWatch Metrics"]
S3["S3 Data Lake"]
RS["Redshift"]
AUDIT["DynamoDB Audit"]
end
subgraph EXEC["Tier 1: Executive Dashboard<br/>👔 C-Suite / VP"]
E1["Revenue Impact<br/>of FM Quality"]
E2["Total Cost of<br/>FM Operations"]
E3["Overall Quality<br/>Score (composite)"]
E4["System<br/>Availability %"]
E5["User Satisfaction<br/>Score (CSAT)"]
end
subgraph OPS["Tier 2: Operational Dashboard<br/>🔧 SRE / DevOps"]
O1["P95 Latency<br/>by Intent"]
O2["Error Rate<br/>5xx / Guardrail"]
O3["Token Usage<br/>& Cost Rate"]
O4["Guardrail<br/>Block Rate"]
O5["Active WebSocket<br/>Sessions"]
O6["Infrastructure<br/>Utilization"]
end
subgraph ML["Tier 3: ML Team Dashboard<br/>🧪 Data Scientists"]
M1["Quality Score<br/>by Intent"]
M2["Hallucination<br/>Rate Trend"]
M3["Drift Metrics<br/>(Embedding Sim)"]
M4["A/B Test<br/>Results"]
M5["Prompt Version<br/>Comparison"]
end
subgraph COMP["Tier 4: Compliance Dashboard<br/>🛡️ Legal / Security"]
C1["PII Detection<br/>Events"]
C2["Content Policy<br/>Violations"]
C3["Access Audit<br/>Log Review"]
C4["GDPR/SOC2<br/>Evidence Status"]
end
CW --> OPS
CW --> ML
CW --> EXEC
S3 --> ML
S3 --> EXEC
RS --> EXEC
AUDIT --> COMP
CW --> COMP
Dashboard Access Control Matrix
| Dashboard Tier | Refresh Rate | Data Retention | Access Group | Authentication |
|---|---|---|---|---|
| Executive | 5 min | 90 days displayed, 2 years archived | manga-exec-readonly |
SSO + MFA |
| Operational | < 60 sec | 15 days high-res, 63 days standard | manga-sre-team |
SSO + VPN |
| ML Team | 5 min | 30 days dashboards, S3 for historical | manga-ml-team |
SSO |
| Compliance | 15 min | 7 years (regulatory requirement) | manga-compliance-audit |
SSO + MFA + IP restrict |
Six Integration Points Deep Dive
1. Operational Metric Dashboards
Design Principle: Every widget must answer a single question in < 3 seconds.
Headline Widget Specification
| Widget | Metric | Green | Yellow | Red | Refresh |
|---|---|---|---|---|---|
| P95 Latency | MangaAssist/Operations/ResponseLatency |
< 2.0s | 2.0–4.0s | > 4.0s | 10s |
| Error Rate | MangaAssist/Operations/ErrorRate |
< 1% | 1–5% | > 5% | 10s |
| Token Cost/hr | MangaAssist/Cost/TokenCostPerHour |
< $50 | $50–100 | > $100 | 60s |
| Quality Score | MangaAssist/Quality/CompositeScore |
> 0.85 | 0.70–0.85 | < 0.70 | 60s |
| Active Sessions | MangaAssist/Operations/ActiveSessions |
< 5000 | 5000–8000 | > 8000 | 10s |
| Guardrail Blocks | MangaAssist/Guardrails/BlockRate |
< 2% | 2–5% | > 5% | 30s |
Alert Correlation Strategy
Reduce alert noise by grouping related signals:
Correlation Window: 5 minutes
Grouping Rules:
- Group by: service + intent
- Suppress child alerts if parent alarm active
- Example: If "Bedrock Throttling" alarm fires, suppress:
- "High Latency" alarms (effect of throttling)
- "Quality Degradation" alarms (effect of fallback model)
- "Token Cost Spike" alarms (effect of retry logic)
- Only page once: "Bedrock Throttling → Cascading Impact"
Widget Dependency Graph
graph LR
subgraph "Real-Time Metrics Pipeline"
KDS_W["Kinesis Data Stream"]
LAMBDA_W["Enrichment Lambda<br/>Intent + Model Tags"]
CW_W["CloudWatch Metrics<br/>Custom Namespace"]
end
subgraph "Headline Widgets"
W1["🟢 P95 Latency"]
W2["🟢 Error Rate"]
W3["🟡 Token Cost/hr"]
W4["🟢 Quality Score"]
W5["🟢 Active Sessions"]
W6["🟢 Guardrail Blocks"]
end
subgraph "Drill-Down Panels"
D1["Latency by Intent<br/>Time Series"]
D2["Error Breakdown<br/>by Type"]
D3["Cost by Model<br/>Stacked Bar"]
D4["Quality Trend<br/>7-Day Rolling"]
D5["Session Heatmap<br/>by Hour"]
D6["Block Categories<br/>Pie Chart"]
end
KDS_W --> LAMBDA_W --> CW_W
CW_W --> W1 & W2 & W3 & W4 & W5 & W6
W1 -->|"Click"| D1
W2 -->|"Click"| D2
W3 -->|"Click"| D3
W4 -->|"Click"| D4
W5 -->|"Click"| D5
W6 -->|"Click"| D6
2. Business Impact Visualizations
Core Question: How does FM quality translate to revenue?
Revenue Attribution Model
FM Quality Score → Recommendation Relevance → Cart Value Lift → Revenue Delta
Causal Chain (MangaAssist):
1. Quality Score drops from 0.90 → 0.75 (–17%)
2. Product recommendation relevance drops: Click-through rate –22%
3. Average cart value drops: $42 → $35 (–17%)
4. Daily orders affected: ~2,400 sessions × 12% conversion = 288 orders
5. Revenue impact: 288 × $7 loss = $2,016/day potential loss
Business Attribution Pipeline
flowchart TB
subgraph "FM Events"
E1["Bedrock Invocation<br/>request_id, quality_score"]
E2["Guardrail Event<br/>block/pass, category"]
E3["Session End<br/>turns, satisfaction"]
end
subgraph "Business Events"
B1["Product View<br/>from FM recommendation"]
B2["Add to Cart<br/>correlated to session"]
B3["Purchase Complete<br/>order value"]
end
subgraph "Attribution Pipeline"
JOIN["Join on session_id<br/>within 30-min window"]
ENRICH["Enrich with<br/>FM quality metrics"]
ATTRIBUTE["Calculate<br/>attribution weight"]
end
subgraph "Visualizations"
V1["Revenue by FM Quality<br/>Scatter Plot"]
V2["Conversion Funnel<br/>with FM Overlay"]
V3["CSAT × Quality<br/>Correlation Matrix"]
end
E1 & E2 & E3 --> JOIN
B1 & B2 & B3 --> JOIN
JOIN --> ENRICH --> ATTRIBUTE
ATTRIBUTE --> V1 & V2 & V3
CSAT Correlation Analysis
| FM Metric | CSAT Correlation ® | Actionable Threshold |
|---|---|---|
| Quality Score | +0.72 | Score < 0.80 → investigate prompt/retrieval |
| Response Latency | –0.58 | P95 > 3.5s → scale compute or use Haiku |
| Turn Count (resolution) | –0.45 | > 4 turns avg → improve first-response quality |
| Guardrail Block Rate | –0.31 | > 3% → review guardrail sensitivity |
| Recommendation Click-Through | +0.66 | CTR < 15% → check retrieval relevance |
3. Compliance Monitoring
Regulatory Requirement: Every FM interaction involving PII must be detected, logged, and optionally redacted within 30 seconds.
Compliance Monitoring Pipeline
flowchart TB
subgraph "Detection"
INPUT["FM Response Text"]
COMPREHEND_C["Amazon Comprehend<br/>detect_pii_entities()"]
GUARDRAIL_C["Bedrock Guardrails<br/>Content Filter"]
end
subgraph "Classification"
PII_CLASS["PII Classifier<br/>NAME / EMAIL / SSN / PHONE"]
CONTENT_CLASS["Content Classifier<br/>HATE / VIOLENCE / ADULT<br/>MISINFORMATION"]
SEVERITY["Severity Scorer<br/>INFO / WARNING / CRITICAL"]
end
subgraph "Action"
REDACT["Auto-Redact<br/>Replace PII with [REDACTED]"]
LOG_COMP["Compliance Event Log<br/>DynamoDB + S3"]
ALERT_COMP["Alert Pipeline<br/>CRITICAL → PagerDuty<br/>WARNING → Slack"]
CW_COMP["CloudWatch Metric<br/>MangaAssist/Compliance/*"]
end
subgraph "Evidence"
S3_EV["S3 Evidence Bucket<br/>Encrypted, WORM Lock"]
REPORT["Compliance Report<br/>Weekly Automated"]
DSAR["GDPR DSAR Processor<br/>Data Subject Requests"]
end
INPUT --> COMPREHEND_C & GUARDRAIL_C
COMPREHEND_C --> PII_CLASS
GUARDRAIL_C --> CONTENT_CLASS
PII_CLASS & CONTENT_CLASS --> SEVERITY
SEVERITY -->|"PII Found"| REDACT
SEVERITY -->|"All Events"| LOG_COMP
SEVERITY -->|"WARNING/CRITICAL"| ALERT_COMP
SEVERITY -->|"Metrics"| CW_COMP
LOG_COMP --> S3_EV
S3_EV --> REPORT
S3_EV --> DSAR
SOC2 Control Evidence Automation
| SOC2 Control | Evidence Source | Collection Method | Frequency |
|---|---|---|---|
| CC6.1 Logical Access | IAM CloudTrail logs | EventBridge → S3 | Real-time |
| CC6.3 Data Encryption | KMS key usage logs | CloudTrail → Athena | Daily |
| CC7.2 Monitoring | CloudWatch alarm history | API → S3 archive | Weekly |
| CC7.3 Incident Response | Step Functions execution logs | Direct S3 export | Per incident |
| CC8.1 Change Management | CodePipeline deployment logs | EventBridge → S3 | Per deployment |
| Custom FM Content Safety | Guardrail block/pass metrics | CloudWatch → S3 | Real-time |
4. Forensic Traceability & Audit Logging
Design Principle: Every FM interaction must be reconstructable months later, with tamper-proof integrity.
Immutable Audit Log — Hash-Chain Design
Entry N:
entry_id: "audit-req-00042"
previous_hash: "<hash of Entry N-1>"
content: {request_id, session_id, model_id, intent, prompt_hash, response_hash, quality_score, ...}
entry_hash: SHA256(entry_id + timestamp + previous_hash + content)
Entry N+1:
previous_hash: "<entry_hash of Entry N>" ← Links to previous
...
Tamper Detection:
- Recalculate hash chain from any point
- If computed hash ≠ stored hash → TAMPERING DETECTED
- Genesis entry: previous_hash = "GENESIS"
Forensic Investigation Workflow
sequenceDiagram
participant ALERT as CloudWatch Alarm
participant SRE as SRE Engineer
participant DASH as Ops Dashboard
participant AUDIT as Audit Trail<br/>(DynamoDB)
participant S3 as Evidence Store<br/>(S3)
participant REPLAY as Request Replayer<br/>(Lambda)
participant RCA as Root Cause<br/>Analysis
ALERT->>SRE: "Quality Score < 0.70<br/>for intent: product_recommend"
SRE->>DASH: Open Operational Dashboard
DASH->>SRE: Show quality trend —<br/>drop started 14:32 UTC
SRE->>AUDIT: Query: intent=product_recommend<br/>time_range=14:30-15:00
AUDIT->>SRE: Return 847 audit entries<br/>with request_ids
SRE->>SRE: Filter: quality_score < 0.50<br/>→ 23 entries identified
SRE->>S3: Retrieve full request context<br/>for worst 5 entries
S3->>SRE: Return: prompt, retrieved_docs,<br/>response, guardrail_action
SRE->>REPLAY: Replay request req-00847<br/>with original context
REPLAY->>SRE: Replayed response matches<br/>original — model consistent
SRE->>SRE: Compare retrieved docs:<br/>stale product data found
SRE->>RCA: Root Cause: OpenSearch index<br/>has stale embeddings from<br/>failed 14:00 re-index job
RCA->>SRE: Resolution: Re-trigger index<br/>rebuild, add index health check
5. User Interaction Tracking
Core Question: Where do users succeed, struggle, and abandon?
Session Analytics Dimensions
| Dimension | Metric | What It Reveals |
|---|---|---|
| Turn Count | avg = 3.2, p95 = 8 | Most queries resolve in 3 turns; long sessions may indicate confusion |
| Intent Transitions | product_search → order_status (12%) | Users often check order status after searching products |
| Time Between Turns | median = 8s, p95 = 45s | Long gaps may indicate user reading or frustration |
| Satisfaction Signal | 👍 23%, 👎 4%, none 73% | Low signal rate — most users don't rate |
| Abandonment Turn | mode = 2, mean = 2.7 | Users who abandon typically do so at turn 2 |
User Journey Funnel
flowchart TB
START["Session Start<br/>100% of users<br/>(~8,400/day)"]
T1["Turn 1: Initial Query<br/>98% continue<br/>(2% connection drops)"]
T2["Turn 2: FM Response<br/>82% continue<br/>(16% abandon here)"]
T3["Turn 3: Follow-Up<br/>71% continue<br/>(11% satisfied & leave)"]
T4["Turn 4+: Deep Interaction<br/>45% continue<br/>(26% resolve or abandon)"]
RESOLVE["Resolution<br/>38% resolve successfully"]
ESCALATE["Escalation<br/>7% → Human Agent"]
ABANDON["Abandonment<br/>55% leave without resolution"]
START --> T1
T1 --> T2
T2 --> T3
T3 --> T4
T4 --> RESOLVE
T4 --> ESCALATE
T1 --> ABANDON
T2 --> ABANDON
T3 --> ABANDON
T4 --> ABANDON
style ABANDON fill:#ffcdd2,stroke:#c62828
style RESOLVE fill:#c8e6c9,stroke:#2e7d32
style ESCALATE fill:#fff9c4,stroke:#f9a825
Abandonment Analysis — MangaAssist Findings
Top Abandonment Reasons (from session analysis):
1. Turn 2 — FM asks clarifying question instead of answering (38% of abandonments)
→ Action: Improve first-response completeness for top-5 intents
2. Turn 2 — Response too long (> 400 words) for simple query (22%)
→ Action: Add response length guardrails by intent
3. Turn 3 — FM repeats information from Turn 1 (15%)
→ Action: Improve session context injection into prompt
4. Turn 1 — Latency > 5s, user leaves before response (14%)
→ Action: Streaming responses, precomputed answers for FAQs
5. Turn 4+ — FM unable to resolve, no escalation offered (11%)
→ Action: Auto-offer human escalation after 3 failed turns
6. Model Behavior Pattern Tracking
Core Question: Is the FM behaving consistently and within expected boundaries?
Response Clustering
Group FM responses into behavioral clusters to detect emergent patterns:
Clustering Method:
1. Embed each FM response using a small embedder (e.g., Titan Embed v2)
2. Apply UMAP dimensionality reduction (n_neighbors=15, min_dist=0.1)
3. Cluster with HDBSCAN (min_cluster_size=50)
4. Label clusters by inspecting sample responses
MangaAssist Discovered Clusters:
Cluster 0: "Product Description" — detailed manga descriptions (34% of responses)
Cluster 1: "Order Status" — structured order info (22%)
Cluster 2: "Recommendation" — personalized suggestions (18%)
Cluster 3: "Clarifying Question" — asks for more info (12%)
Cluster 4: "Apology/Limitation" — cannot help responses (8%)
Cluster 5: "Off-Topic" — responses outside intended scope (4%)
Cluster 6: "Price/Availability" — specific product data (2%)
Alert: If Cluster 5 ("Off-Topic") grows > 6% → investigate prompt drift
Alert: If Cluster 4 ("Apology") grows > 12% → knowledge gap expanding
Behavioral Drift Detection
Method: Track cosine similarity of response embeddings over time
Baseline: Rolling 7-day window of mean response embeddings per intent
Current: Today's mean response embedding per intent
Drift Score: 1 - cosine_similarity(baseline, current)
Thresholds:
drift < 0.05 → Normal variation
0.05 ≤ drift < 0.15 → Monitor (log but don't alert)
drift ≥ 0.15 → Alert: Significant behavioral shift detected
Common Drift Causes in MangaAssist:
- Knowledge base update changed retrieval distribution
- Prompt template change altered response style
- Model version update (Claude 3 minor version)
- Seasonal product catalog changes
Persona Consistency Monitoring
MangaAssist FM Persona: "Friendly manga expert, knowledgeable about series,
volumes, genres. Professional but enthusiastic. Never uses slang.
Always provides specific volume/chapter references."
Monitoring Dimensions:
1. Tone Score: sentiment analysis should show 70-85% positive
2. Domain Vocabulary: manga-specific terms should appear in 60%+ of responses
3. Specificity: named entities (series, volumes) should appear in 50%+ of product responses
4. Length Consistency: std dev of response length should be < 40% of mean
Alert if: Tone drops below 60% positive OR domain vocabulary drops below 50%
→ Likely cause: prompt template change or context window overflow
Knowledge Boundary Mapping
| Topic Area | Confidence | Evidence | Action |
|---|---|---|---|
| Popular manga series (top 100) | High (0.92) | Accurate details, correct volumes | Maintain |
| New releases (< 30 days) | Medium (0.71) | Sometimes missing latest volume | Increase reindex frequency |
| Pricing & availability | High (0.88) | Real-time tool calling works well | Maintain |
| Shipping & logistics | Medium (0.65) | Sometimes gives outdated policies | Update knowledge base |
| Mature/adult content policy | Low (0.52) | Inconsistent guardrail application | Tighten guardrails + prompt |
| Competitor comparisons | Low (0.41) | FM hallucinates competitor data | Add explicit refusal in prompt |
HLD: Integrated Platform Data Model
from dataclasses import dataclass, field
from typing import Dict, List, Optional
from datetime import datetime
from enum import Enum
class StakeholderType(Enum):
EXECUTIVE = "executive"
OPERATIONS = "operations"
ML_TEAM = "ml_team"
COMPLIANCE = "compliance"
class DashboardWidget(Enum):
SINGLE_VALUE = "single_value"
TIME_SERIES = "time_series"
BAR_CHART = "bar_chart"
PIE_CHART = "pie_chart"
HEATMAP = "heatmap"
FUNNEL = "funnel"
TABLE = "table"
class ComplianceCategory(Enum):
PII_DETECTED = "pii_detected"
CONTENT_VIOLATION = "content_violation"
ACCESS_AUDIT = "access_audit"
DATA_SUBJECT_REQUEST = "data_subject_request"
GUARDRAIL_BLOCK = "guardrail_block"
class Severity(Enum):
INFO = "info"
WARNING = "warning"
CRITICAL = "critical"
@dataclass
class AuditLogEntry:
"""Immutable audit log entry with hash-chain integrity.
Each entry links to the previous via previous_hash, forming
a tamper-evident chain. Any modification breaks the chain.
"""
entry_id: str
timestamp: datetime
previous_hash: str
request_id: str
session_id: str
user_id_hash: str # NEVER store raw user ID
action: str # "fm_invocation", "guardrail_block", "tool_call"
model_id: str # "anthropic.claude-3-sonnet-v1"
intent: str # "product_search", "order_status", etc.
prompt_hash: str # SHA256 of sanitized prompt (never raw text)
response_hash: str # SHA256 of response (never raw text in audit)
quality_score: float # 0.0 - 1.0 composite score
guardrail_action: str # "pass", "block", "redact"
latency_ms: int = 0
token_input: int = 0
token_output: int = 0
compliance_flags: List[str] = field(default_factory=list)
entry_hash: str = "" # Computed: SHA256(all fields + previous_hash)
def compute_hash(self) -> str:
import hashlib
content = (
f"{self.entry_id}|{self.timestamp.isoformat()}|"
f"{self.previous_hash}|{self.request_id}|"
f"{self.session_id}|{self.action}|"
f"{self.model_id}|{self.intent}|"
f"{self.prompt_hash}|{self.response_hash}|"
f"{self.quality_score}|{self.guardrail_action}"
)
self.entry_hash = hashlib.sha256(content.encode()).hexdigest()
return self.entry_hash
@dataclass
class ComplianceEvent:
"""Compliance monitoring event for regulatory tracking."""
event_id: str
timestamp: datetime
category: ComplianceCategory
severity: Severity
request_id: str
description: str
details: Dict[str, str] = field(default_factory=dict)
auto_remediated: bool = False
remediation_action: Optional[str] = None
evidence_s3_key: Optional[str] = None
reviewer: Optional[str] = None
reviewed_at: Optional[datetime] = None
@dataclass
class UserInteractionMetrics:
"""Session-level user interaction tracking."""
session_id: str
user_id_hash: str
start_time: datetime
end_time: Optional[datetime] = None
turn_count: int = 0
intents_used: List[str] = field(default_factory=list)
satisfaction_signal: Optional[str] = None # "thumbs_up", "thumbs_down"
abandoned: bool = False
abandonment_turn: Optional[int] = None
escalated: bool = False
engagement_score: float = 0.0
avg_quality_score: float = 0.0
total_latency_ms: int = 0
@dataclass
class ModelBehaviorSnapshot:
"""Periodic snapshot of model behavior patterns."""
snapshot_id: str
timestamp: datetime
intent: str
period_hours: int = 24
response_count: int = 0
mean_embedding: Optional[List[float]] = None # Centroid of response embeddings
drift_score: float = 0.0 # vs previous snapshot
cluster_distribution: Dict[str, float] = field(default_factory=dict)
avg_response_length: float = 0.0
tone_score: float = 0.0
domain_vocabulary_ratio: float = 0.0
LLD: Integrated Observability Platform
import boto3
import json
import hashlib
import logging
from datetime import datetime, timezone
from typing import Dict, List, Optional
from decimal import Decimal
logger = logging.getLogger(__name__)
class IntegratedObservabilityPlatform:
"""Unified platform connecting all observability signals to stakeholder dashboards.
Processes every FM interaction through 4 integration points:
1. Operational metrics → CloudWatch → Dashboards + Alarms
2. Compliance checks → Comprehend PII + Guardrail events → Evidence
3. Immutable audit log → DynamoDB hash-chain → Forensic queries
4. User interaction tracking → Session metrics → Engagement scoring
"""
def __init__(self, config: dict):
self.cloudwatch = boto3.client("cloudwatch")
self.dynamodb = boto3.resource("dynamodb")
self.s3 = boto3.client("s3")
self.comprehend = boto3.client("comprehend")
self.audit_table = self.dynamodb.Table(config["audit_table_name"])
self.compliance_table = self.dynamodb.Table(config["compliance_table_name"])
self.evidence_bucket = config["evidence_bucket"]
self._last_audit_hash = "GENESIS"
# ------------------------------------------------------------------ #
# Main Entry Point
# ------------------------------------------------------------------ #
def process_interaction(self, event: dict) -> dict:
"""Process a complete user interaction through all integration points.
Args:
event: Dict with keys: request_id, session_id, model_id, intent,
response_text, quality_score, latency_ms, turn_number,
guardrail_action, satisfaction_signal, token_input, token_output
Returns:
Dict with results from each integration point.
"""
results = {}
# 1. Publish operational metrics (real-time dashboards)
results["operational"] = self._publish_operational_metrics(event)
# 2. Check compliance (PII detection, content policy)
results["compliance"] = self._check_compliance(event)
# 3. Write immutable audit log (hash-chain linked)
results["audit"] = self._write_audit_log(event, results["compliance"])
# 4. Track user interaction (session-level metrics)
results["interaction"] = self._track_user_interaction(event)
return results
# ------------------------------------------------------------------ #
# Integration Point 1: Operational Metrics
# ------------------------------------------------------------------ #
def _publish_operational_metrics(self, event: dict) -> dict:
"""Publish real-time metrics to CloudWatch for operational dashboards."""
dimensions = [
{"Name": "Intent", "Value": event.get("intent", "unknown")},
{"Name": "ModelId", "Value": event.get("model_id", "unknown")},
]
metric_data = [
{
"MetricName": "ResponseLatency",
"Value": event.get("latency_ms", 0),
"Unit": "Milliseconds",
"Dimensions": dimensions,
"Timestamp": datetime.now(timezone.utc),
},
{
"MetricName": "QualityScore",
"Value": event.get("quality_score", 0),
"Unit": "None",
"Dimensions": dimensions,
"Timestamp": datetime.now(timezone.utc),
},
{
"MetricName": "TokensInput",
"Value": event.get("token_input", 0),
"Unit": "Count",
"Dimensions": dimensions,
},
{
"MetricName": "TokensOutput",
"Value": event.get("token_output", 0),
"Unit": "Count",
"Dimensions": dimensions,
},
{
"MetricName": "ActiveSessions",
"Value": 1,
"Unit": "Count",
"Dimensions": [{"Name": "Intent", "Value": event.get("intent", "unknown")}],
},
]
# Guardrail block metric
if event.get("guardrail_action") == "block":
metric_data.append({
"MetricName": "GuardrailBlocks",
"Value": 1,
"Unit": "Count",
"Dimensions": dimensions,
})
self.cloudwatch.put_metric_data(
Namespace="MangaAssist/Operations",
MetricData=metric_data,
)
return {"published": True, "metric_count": len(metric_data)}
# ------------------------------------------------------------------ #
# Integration Point 2: Compliance Monitoring
# ------------------------------------------------------------------ #
def _check_compliance(self, event: dict) -> dict:
"""Run compliance checks: PII detection + content policy."""
response_text = event.get("response_text", "")
compliance_result = {
"pii_detected": False,
"pii_types": [],
"guardrail_action": event.get("guardrail_action", "pass"),
"events": [],
}
# PII detection using Comprehend (truncate to 5000 char limit)
if response_text:
text_to_check = response_text[:5000]
pii_result = self.comprehend.detect_pii_entities(
Text=text_to_check, LanguageCode="en"
)
pii_entities = pii_result.get("Entities", [])
if pii_entities:
compliance_result["pii_detected"] = True
compliance_result["pii_types"] = list(
set(e["Type"] for e in pii_entities)
)
# Publish PII metric
self.cloudwatch.put_metric_data(
Namespace="MangaAssist/Compliance",
MetricData=[{
"MetricName": "PIIDetected",
"Value": len(pii_entities),
"Unit": "Count",
"Dimensions": [
{"Name": "Intent", "Value": event.get("intent", "unknown")}
],
}],
)
# Create compliance event
comp_event = {
"event_id": f"pii-{event['request_id']}",
"timestamp": datetime.now(timezone.utc).isoformat(),
"category": "pii_detected",
"severity": "critical" if "SSN" in compliance_result["pii_types"] else "warning",
"request_id": event["request_id"],
"pii_types": compliance_result["pii_types"],
"auto_remediated": True,
"remediation_action": "response_redacted",
}
compliance_result["events"].append(comp_event)
self._store_compliance_event(comp_event)
# Track guardrail blocks as compliance events
if event.get("guardrail_action") == "block":
block_event = {
"event_id": f"block-{event['request_id']}",
"timestamp": datetime.now(timezone.utc).isoformat(),
"category": "guardrail_block",
"severity": "info",
"request_id": event["request_id"],
"auto_remediated": True,
}
compliance_result["events"].append(block_event)
self._store_compliance_event(block_event)
return compliance_result
def _store_compliance_event(self, event: dict):
"""Persist compliance event to DynamoDB + S3 evidence bucket."""
self.compliance_table.put_item(Item=event)
# Archive to S3 for long-term evidence retention
s3_key = (
f"compliance/{event['category']}/"
f"{datetime.now(timezone.utc).strftime('%Y/%m/%d')}/"
f"{event['event_id']}.json"
)
self.s3.put_object(
Bucket=self.evidence_bucket,
Key=s3_key,
Body=json.dumps(event, default=str),
ServerSideEncryption="aws:kms",
)
# ------------------------------------------------------------------ #
# Integration Point 3: Immutable Audit Log
# ------------------------------------------------------------------ #
def _write_audit_log(self, event: dict, compliance: dict) -> dict:
"""Write an immutable, hash-chain-linked audit log entry."""
entry_id = f"audit-{event['request_id']}"
timestamp = datetime.now(timezone.utc).isoformat()
# Hash the prompt and response (never store raw text in audit)
prompt_hash = hashlib.sha256(
event.get("prompt_text", "").encode()
).hexdigest()
response_hash = hashlib.sha256(
event.get("response_text", "").encode()
).hexdigest()
# Build hash-chain content
content = (
f"{entry_id}|{timestamp}|{self._last_audit_hash}|"
f"{event['request_id']}|{event.get('session_id', '')}|"
f"{event.get('model_id', '')}|{event.get('intent', 'unknown')}|"
f"{prompt_hash}|{response_hash}|"
f"{event.get('quality_score', 0)}|{event.get('guardrail_action', 'pass')}"
)
entry_hash = hashlib.sha256(content.encode()).hexdigest()
item = {
"entry_id": entry_id,
"timestamp": timestamp,
"previous_hash": self._last_audit_hash,
"entry_hash": entry_hash,
"request_id": event["request_id"],
"session_id": event.get("session_id", ""),
"model_id": event.get("model_id", ""),
"intent": event.get("intent", "unknown"),
"prompt_hash": prompt_hash,
"response_hash": response_hash,
"quality_score": Decimal(str(event.get("quality_score", 0))),
"latency_ms": event.get("latency_ms", 0),
"token_input": event.get("token_input", 0),
"token_output": event.get("token_output", 0),
"compliance_flags": compliance.get("pii_types", []),
"guardrail_action": event.get("guardrail_action", "pass"),
}
self.audit_table.put_item(Item=item)
self._last_audit_hash = entry_hash
return {"entry_id": entry_id, "hash": entry_hash}
# ------------------------------------------------------------------ #
# Integration Point 4: User Interaction Tracking
# ------------------------------------------------------------------ #
def _track_user_interaction(self, event: dict) -> dict:
"""Track session-level user interaction metrics and engagement score."""
session_id = event.get("session_id", "")
turn = event.get("turn_number", 1)
satisfaction = event.get("satisfaction_signal")
quality = event.get("quality_score", 0)
latency = event.get("latency_ms", 0)
# Engagement scoring formula:
# 40% quality × 30% interaction depth × 30% satisfaction
quality_component = quality * 0.4
depth_component = min(turn / 5.0, 1.0) * 0.3
satisfaction_component = (
1.0 if satisfaction == "thumbs_up"
else 0.0 if satisfaction == "thumbs_down"
else 0.5 # neutral/no signal
) * 0.3
engagement = min(1.0, quality_component + depth_component + satisfaction_component)
# Publish engagement metric
self.cloudwatch.put_metric_data(
Namespace="MangaAssist/UserInteraction",
MetricData=[
{
"MetricName": "EngagementScore",
"Value": engagement,
"Unit": "None",
"Dimensions": [
{"Name": "Intent", "Value": event.get("intent", "unknown")}
],
},
{
"MetricName": "TurnCount",
"Value": turn,
"Unit": "Count",
"Dimensions": [
{"Name": "SessionId", "Value": session_id}
],
},
],
)
return {
"session_id": session_id,
"turn": turn,
"engagement_score": round(engagement, 3),
"satisfaction": satisfaction,
}
# ------------------------------------------------------------------ #
# Forensic Query Utilities
# ------------------------------------------------------------------ #
def verify_audit_chain(self, start_entry_id: str, count: int = 100) -> dict:
"""Verify integrity of audit hash chain starting from a given entry."""
response = self.audit_table.query(
KeyConditionExpression="entry_id >= :start",
ExpressionAttributeValues={":start": start_entry_id},
Limit=count,
)
entries = response.get("Items", [])
if not entries:
return {"valid": False, "error": "No entries found"}
chain_valid = True
broken_at = None
for i in range(1, len(entries)):
expected_prev = entries[i - 1]["entry_hash"]
actual_prev = entries[i]["previous_hash"]
if expected_prev != actual_prev:
chain_valid = False
broken_at = entries[i]["entry_id"]
break
return {
"valid": chain_valid,
"entries_checked": len(entries),
"broken_at": broken_at,
}
def replay_request(self, request_id: str) -> dict:
"""Retrieve full context of a past request for forensic investigation."""
# Get audit entry
audit_response = self.audit_table.get_item(
Key={"entry_id": f"audit-{request_id}"}
)
audit_entry = audit_response.get("Item", {})
# Get compliance events
comp_response = self.compliance_table.query(
IndexName="request-id-index",
KeyConditionExpression="request_id = :rid",
ExpressionAttributeValues={":rid": request_id},
)
compliance_events = comp_response.get("Items", [])
# Get full request context from S3
s3_key = f"interactions/{request_id}.json"
try:
s3_obj = self.s3.get_object(Bucket=self.evidence_bucket, Key=s3_key)
full_context = json.loads(s3_obj["Body"].read())
except self.s3.exceptions.NoSuchKey:
full_context = None
return {
"audit_entry": audit_entry,
"compliance_events": compliance_events,
"full_context": full_context,
}
Integration with Existing Content
| This File's Section | Existing File | Relationship |
|---|---|---|
| Operational Dashboards (§1) | 08-reporting-visualization-systems.md | Extends dashboard patterns with FM-specific real-time widgets and alert correlation |
| Business Impact Visualizations (§2) | 13-metrics.md | Adds visualization layer and revenue attribution on top of metric definitions |
| Compliance Monitoring (§3) | 12-security-privacy.md | Operationalizes security requirements into real-time compliance monitoring pipeline |
| Forensic Audit Logging (§4) | 02-application-logging.md | Builds tamper-proof audit trail beyond standard application logging |
| User Interaction Tracking (§5) | 03-use-cases.md | Instruments user journeys defined in use cases with measurable tracking |
| Model Behavior Patterns (§6) | llmops-user-stories.md | Adds continuous behavior monitoring to the LLMOps lifecycle |
Key Design Decisions
| Decision | Choice | Rationale | Trade-Off |
|---|---|---|---|
| Dashboard refresh rate | 10s ops, 5min exec, 15min compliance | Ops needs real-time; executives need stable trends; compliance needs accuracy over speed | Higher CloudWatch API costs for real-time (~$3/month per custom metric) |
| Audit log immutability | Hash-chain (SHA256) in DynamoDB | Software-level tamper evidence without blockchain overhead | Not cryptographically provable like a real blockchain; sufficient for SOC2 |
| PII detection approach | Real-time Comprehend per response | Sub-second detection enables auto-redaction before user sees PII | Comprehend cost at ~$0.0001/request; ~$72/month at 1M requests |
| Compliance evidence retention | 7 years in S3 Glacier Deep Archive | Regulatory requirement (SOC2, GDPR); Glacier DA costs ~$1/TB/month | Retrieval takes 12–48 hours; keep 90-day hot copy in S3 Standard |
| User tracking privacy | Hash user IDs, never store raw PII | GDPR-compliant by design; anonymized analytics still useful | Cannot identify individual users from analytics; requires separate identity mapping for DSAR |
| Stakeholder access control | IAM groups per dashboard tier + SSO | Least-privilege by stakeholder role | Requires IAM group management overhead; mitigate with IaC |
| Alert correlation window | 5-minute grouping window | Balances noise reduction with detection speed | May delay alerting for fast-moving cascading failures by up to 5 min |
| Behavioral drift threshold | drift ≥ 0.15 triggers alert | Empirically tuned: 0.10 was too noisy, 0.20 missed real drift events | May need seasonal adjustment for product catalog changes |
| Engagement scoring weights | 40% quality, 30% depth, 30% satisfaction | Quality is most controllable; depth and satisfaction provide user signal | Low satisfaction signal rate (27%) makes that component noisy |
Cross-References
Within Skill 4.3.3 (Integrated Observability)
- → 02-operational-dashboards.md — CloudWatch Dashboard JSON specs, Grafana alert rules
- → 03-business-impact-visualizations.md — QuickSight calculated fields, Plotly dashboards
- → 04-compliance-monitoring.md — Full ComplianceMonitor class, GDPR DSAR automation
- → 05-forensic-traceability-audit.md — AuditLogger production code, request replay
- → 06-user-interaction-model-behavior.md — UserInteractionTracker, ModelBehaviorAnalyzer
- → 07-scenarios-and-runbooks.md — 5 production scenarios with decision trees
- → 08-3d-visualizations.md — Plotly compliance heatmap, Three.js user journey 3D
Other Skills in Task 4.3
- → Skill-4.3.1 — Foundational metrics/traces/logs that feed this integrated platform
- → Skill-4.3.2 — GenAI-specific monitoring signals consumed here
- → Skill-4.3.4 — Tool call metrics flow into operational dashboards
- → Skill-4.3.5 — Vector store health feeds infrastructure widget
- → Skill-4.3.6 — Troubleshooting workflows consume audit trail + forensic replay
Existing Content
- → Evaluation-Systems-GenAI/08-reporting-visualization-systems.md — Dashboard patterns foundation
- → 13-metrics.md — Metric definitions referenced by dashboards
- → Debugging/02-application-logging.md — Application logging foundation
- → 12-security-privacy.md — Security requirements operationalized here
- → LLMOps/llmops-user-stories.md — Lifecycle context for continuous monitoring
Key Takeaways
-
Unified > Siloed: Integrated observability connects operational metrics, business impact, compliance, and user behavior into a single platform — siloed dashboards hide the correlations that matter most (e.g., quality drop → revenue impact → user abandonment).
-
Compliance is a First-Class Signal: PII detection, content policy enforcement, and regulatory evidence collection must be real-time, automated, and auditable — not an afterthought bolted on after launch.
-
Tamper-Proof Audit Trail: Hash-chain linking of audit entries provides software-level integrity verification sufficient for SOC2 — any modification to historical entries breaks the chain and triggers immediate alerting.
-
Multi-Stakeholder Access: Executives need 5 KPIs on revenue impact; SREs need sub-minute operational metrics; ML teams need quality drift trends; compliance needs 7-year evidence archives — one platform, four views, strict access control.
-
Business Attribution is the Ultimate Justification: Connecting FM quality scores to conversion rates and revenue provides the business case for continued FM investment — without this attribution, FM quality improvements are invisible to executives.
-
Behavioral Drift Catches What Metrics Miss: Traditional metrics (latency, errors) won't detect when an FM subtly changes its response style, loses domain vocabulary, or drifts out of persona — embedding-based behavioral drift detection fills this gap.
-
User Abandonment is the Hidden Cost: 55% of MangaAssist sessions end without resolution — tracking exactly where and why users abandon (Turn 2 clarifying questions being the #1 cause) provides the most actionable improvement signal.