US-07: Analytics Pipeline Cost Optimization
User Story
As a data engineer, I want to optimize Kinesis stream provisioning and Redshift query costs for the chatbot analytics pipeline, So that analytics infrastructure costs decrease by 40-60% while preserving all operational and business insights.
Acceptance Criteria
- Kinesis Data Stream uses on-demand mode to avoid shard over-provisioning.
- Analytics events are batched and compressed before ingestion.
- Redshift uses RA3 nodes with managed storage for cost-effective scaling.
- Cold data (older than 30 days) is offloaded to S3 via Redshift Spectrum.
- Dashboard queries use materialized views to reduce compute.
- Total analytics pipeline costs decrease by 40-60%.
High-Level Design
Cost Problem
Analytics pipeline (HLD + LLD-8): - Kinesis Data Stream: Provisioned mode with N shards. Each shard = $0.015/hr + $0.014/million PUT units. Over-provisioned for peak = waste during off-peak. - Redshift: dc2.large cluster (2 nodes) = $0.25/hr × 2 = $360/month - Data growth: ~9.4M events/day × ~1KB/event = ~9.4 GB/day raw = ~282 GB/month
Baseline: ~$500-700/month
Optimization Architecture
graph TD
subgraph "Event Production"
A[Orchestrator] --> B[Event Batcher<br>Aggregate + compress]
B --> C[Kinesis Data Stream<br>On-Demand Mode]
end
subgraph "Event Processing"
C --> D[Kinesis Firehose<br>Batched delivery]
D --> E[S3 Data Lake<br>Parquet format]
D --> F[Redshift<br>Hot data: last 30 days]
end
subgraph "Cold Storage"
F --> G{Data age > 30 days?}
G -->|Yes| H[Unload to S3<br>Compressed Parquet]
H --> I[Redshift Spectrum<br>Query cold data via S3]
end
subgraph "Query Optimization"
F --> J[Materialized Views<br>Pre-computed dashboards]
J --> K[CloudWatch Dashboards]
I --> L[Ad-hoc Analysis<br>Redshift Spectrum]
end
style B fill:#2d8,stroke:#333
style E fill:#2d8,stroke:#333
style H fill:#2d8,stroke:#333
style J fill:#2d8,stroke:#333
Savings Breakdown
| Technique | Reduction | Monthly Savings |
|---|---|---|
| Kinesis on-demand (vs. over-provisioned) | 30% of stream cost | ~$30 |
| Event batching + compression | 50% fewer PUT records | ~$20 |
| Redshift RA3 + S3 offload | 40% storage savings | ~$80 |
| Materialized views (fewer full scans) | 30% fewer compute-hours | ~$50 |
| Firehose + Parquet (vs. raw JSON) | 70% storage compression | ~$40 |
| Total | ~$220/month |
Low-Level Design
1. Event Batching and Compression
Instead of sending one Kinesis record per event, batch multiple events and compress.
sequenceDiagram
participant Orchestrator
participant Batcher as Event Batcher<br>(In-Process)
participant Kinesis
Orchestrator->>Batcher: Add event (response completed)
Orchestrator->>Batcher: Add event (feedback received)
Orchestrator->>Batcher: Add event (message received)
Note over Batcher: Buffer: 3 events<br>Flush at 50 events or 5 seconds
Batcher->>Batcher: Compress batch (gzip)
Batcher->>Kinesis: PutRecords (1 record with 3 events)
Note over Kinesis: 1 PUT unit vs. 3 PUT units<br>66% cost reduction
Code Example: Event Batcher
import gzip
import json
import logging
import threading
import time
from collections import deque
from typing import Any
import boto3
logger = logging.getLogger(__name__)
class AnalyticsEventBatcher:
"""Batch and compress analytics events before sending to Kinesis."""
MAX_BATCH_SIZE = 50
FLUSH_INTERVAL_SECONDS = 5
MAX_RECORD_SIZE = 1_000_000 # 1 MB Kinesis limit
def __init__(self, stream_name: str, region: str = "ap-northeast-1"):
self._stream_name = stream_name
self._kinesis = boto3.client("kinesis", region_name=region)
self._buffer: deque[dict] = deque()
self._lock = threading.Lock()
self._running = True
self._flush_thread = threading.Thread(
target=self._flush_loop, daemon=True
)
self._flush_thread.start()
def add_event(self, event: dict) -> None:
with self._lock:
self._buffer.append(event)
if len(self._buffer) >= self.MAX_BATCH_SIZE:
self._flush()
def _flush_loop(self) -> None:
while self._running:
time.sleep(self.FLUSH_INTERVAL_SECONDS)
with self._lock:
if self._buffer:
self._flush()
def _flush(self) -> None:
events = []
while self._buffer and len(events) < self.MAX_BATCH_SIZE:
events.append(self._buffer.popleft())
if not events:
return
# Compress the batch
batch_json = json.dumps(events).encode("utf-8")
compressed = gzip.compress(batch_json)
# Partition key: distribute across shards by session
partition_key = events[0].get("session_id", "default")
try:
self._kinesis.put_record(
StreamName=self._stream_name,
Data=compressed,
PartitionKey=partition_key,
)
logger.debug(
f"Flushed {len(events)} events "
f"({len(batch_json)} -> {len(compressed)} bytes, "
f"{len(compressed)/len(batch_json)*100:.0f}% of original)"
)
except Exception:
logger.exception("Failed to flush events to Kinesis")
# Re-queue events for retry
with self._lock:
self._buffer.extendleft(reversed(events))
def shutdown(self) -> None:
self._running = False
with self._lock:
if self._buffer:
self._flush()
2. Kinesis On-Demand Mode
graph TD
A{Traffic Pattern?} --> B{Predictable<br>and steady?}
B -->|No — chatbot traffic<br>is spiky| C[On-Demand Mode<br>Auto-scales shards<br>Pay per GB ingested]
B -->|Yes| D[Provisioned Mode<br>Fixed shard count]
C --> E[No shard<br>management needed]
C --> F[Scales 0 to<br>200 MB/s automatically]
style C fill:#2d8,stroke:#333
Code Example: Kinesis Stream Configuration
import boto3
def create_on_demand_stream(
stream_name: str = "manga-chatbot-events",
) -> dict:
"""Create Kinesis stream in on-demand mode for cost optimization."""
kinesis = boto3.client("kinesis")
response = kinesis.create_stream(
StreamName=stream_name,
StreamModeDetails={"StreamMode": "ON_DEMAND"},
)
kinesis.get_waiter("stream_exists").wait(StreamName=stream_name)
return {"stream_name": stream_name, "mode": "ON_DEMAND"}
def migrate_to_on_demand(
stream_name: str = "manga-chatbot-events",
) -> dict:
"""Migrate existing provisioned stream to on-demand mode."""
kinesis = boto3.client("kinesis")
response = kinesis.update_stream_mode(
StreamARN=_get_stream_arn(stream_name),
StreamModeDetails={"StreamMode": "ON_DEMAND"},
)
return {"stream_name": stream_name, "new_mode": "ON_DEMAND"}
def _get_stream_arn(stream_name: str) -> str:
kinesis = boto3.client("kinesis")
desc = kinesis.describe_stream(StreamName=stream_name)
return desc["StreamDescription"]["StreamARN"]
3. Firehose to S3 with Parquet Conversion
Store analytics data in Parquet format on S3 for 70% compression vs. raw JSON.
graph LR
A[Kinesis Data Stream] --> B[Firehose Delivery Stream]
B --> C[Decompress gzip batches]
C --> D[Convert to Parquet<br>via Glue Data Catalog]
D --> E[S3: s3://manga-analytics/<br>year=YYYY/month=MM/day=DD/]
D --> F[Redshift COPY<br>from S3 Parquet]
style D fill:#2d8,stroke:#333
Code Example: Firehose Configuration
import boto3
def create_firehose_with_parquet(
stream_name: str = "manga-chatbot-events",
s3_bucket: str = "manga-analytics-data",
redshift_cluster: str = "manga-analytics",
) -> dict:
"""Create Firehose delivery stream with Parquet conversion."""
firehose = boto3.client("firehose")
response = firehose.create_delivery_stream(
DeliveryStreamName="manga-events-to-s3",
DeliveryStreamType="KinesisStreamAsSource",
KinesisStreamSourceConfiguration={
"KinesisStreamARN": f"arn:aws:kinesis:ap-northeast-1:123456789012:"
f"stream/{stream_name}",
"RoleARN": "arn:aws:iam::123456789012:role/firehose-kinesis-role",
},
ExtendedS3DestinationConfiguration={
"RoleARN": "arn:aws:iam::123456789012:role/firehose-s3-role",
"BucketARN": f"arn:aws:s3:::{s3_bucket}",
"Prefix": "events/year=!{timestamp:yyyy}/month=!{timestamp:MM}/"
"day=!{timestamp:dd}/",
"ErrorOutputPrefix": "errors/",
"BufferingHints": {
"SizeInMBs": 128,
"IntervalInSeconds": 300, # 5 min buffer
},
"CompressionFormat": "UNCOMPRESSED", # Parquet handles compression
"DataFormatConversionConfiguration": {
"Enabled": True,
"SchemaConfiguration": {
"DatabaseName": "manga_analytics",
"TableName": "chatbot_events",
"Region": "ap-northeast-1",
"RoleARN": "arn:aws:iam::123456789012:role/firehose-glue-role",
},
"InputFormatConfiguration": {
"Deserializer": {
"OpenXJsonSerDe": {}
}
},
"OutputFormatConfiguration": {
"Serializer": {
"ParquetSerDe": {
"Compression": "SNAPPY"
}
}
},
},
},
)
return {"delivery_stream": "manga-events-to-s3"}
4. Redshift Cold Data Offload
Move data older than 30 days to S3 and query via Redshift Spectrum.
graph TD
subgraph "Redshift (Hot Data)"
A[chatbot_events<br>Last 30 days<br>~9 GB compressed]
end
subgraph "S3 via Spectrum (Cold Data)"
B[chatbot_events_archive<br>31+ days old<br>Parquet on S3]
end
subgraph "Unified Query"
C[CREATE VIEW all_events AS<br>SELECT * FROM chatbot_events<br>UNION ALL<br>SELECT * FROM spectrum.chatbot_events_archive]
end
A --> C
B --> C
D[Nightly Job] --> E[UNLOAD last month's data<br>to S3 as Parquet]
E --> F[DELETE from Redshift<br>WHERE created_at < 30 days]
style B fill:#2d8,stroke:#333
Code Example: Data Offload Script
-- 1. Create external schema for Spectrum
CREATE EXTERNAL SCHEMA spectrum
FROM DATA CATALOG
DATABASE 'manga_analytics'
IAM_ROLE 'arn:aws:iam::123456789012:role/redshift-spectrum-role'
CREATE EXTERNAL DATABASE IF NOT EXISTS;
-- 2. Create external table pointing to S3 Parquet data
CREATE EXTERNAL TABLE spectrum.chatbot_events_archive (
event_id VARCHAR(64),
session_id VARCHAR(64),
customer_id VARCHAR(64),
event_type VARCHAR(32),
intent VARCHAR(32),
message_text VARCHAR(2000),
response_text VARCHAR(4000),
products_shown VARCHAR(500),
latency_ms INTEGER,
model_id VARCHAR(64),
feedback VARCHAR(16),
created_at TIMESTAMP
)
PARTITIONED BY (year INT, month INT)
STORED AS PARQUET
LOCATION 's3://manga-analytics-data/archive/';
-- 3. Nightly offload: Move data older than 30 days to S3
UNLOAD (
'SELECT *, EXTRACT(YEAR FROM created_at) AS year,
EXTRACT(MONTH FROM created_at) AS month
FROM chatbot_events
WHERE created_at < DATEADD(day, -30, GETDATE())'
)
TO 's3://manga-analytics-data/archive/'
IAM_ROLE 'arn:aws:iam::123456789012:role/redshift-unload-role'
FORMAT PARQUET
PARTITION BY (year, month)
ALLOWOVERWRITE;
-- 4. Delete offloaded data from Redshift
DELETE FROM chatbot_events
WHERE created_at < DATEADD(day, -30, GETDATE());
VACUUM chatbot_events;
-- 5. Create unified view
CREATE OR REPLACE VIEW all_chatbot_events AS
SELECT event_id, session_id, customer_id, event_type, intent,
message_text, response_text, products_shown, latency_ms,
model_id, feedback, created_at
FROM chatbot_events
UNION ALL
SELECT event_id, session_id, customer_id, event_type, intent,
message_text, response_text, products_shown, latency_ms,
model_id, feedback, created_at
FROM spectrum.chatbot_events_archive;
5. Materialized Views for Dashboards
Pre-compute common dashboard queries to avoid repeated full-table scans.
-- Daily intent distribution (refreshed every 15 minutes)
CREATE MATERIALIZED VIEW mv_daily_intent_summary AS
SELECT
DATE_TRUNC('hour', created_at) AS hour,
intent,
COUNT(*) AS message_count,
AVG(latency_ms) AS avg_latency_ms,
COUNT(CASE WHEN feedback = 'thumbs_up' THEN 1 END) AS positive_feedback,
COUNT(CASE WHEN feedback = 'thumbs_down' THEN 1 END) AS negative_feedback
FROM chatbot_events
WHERE event_type = 'response'
AND created_at >= DATEADD(day, -7, GETDATE())
GROUP BY 1, 2;
-- Hourly cost tracking
CREATE MATERIALIZED VIEW mv_hourly_cost_tracking AS
SELECT
DATE_TRUNC('hour', created_at) AS hour,
model_id,
COUNT(*) AS llm_calls,
SUM(latency_ms) AS total_latency_ms
FROM chatbot_events
WHERE event_type = 'response'
AND model_id IS NOT NULL
AND created_at >= DATEADD(day, -7, GETDATE())
GROUP BY 1, 2;
-- Escalation rate tracking
CREATE MATERIALIZED VIEW mv_escalation_rate AS
SELECT
DATE_TRUNC('day', created_at) AS day,
COUNT(*) AS total_sessions,
COUNT(CASE WHEN event_type = 'escalation' THEN 1 END) AS escalations,
ROUND(
COUNT(CASE WHEN event_type = 'escalation' THEN 1 END)::FLOAT
/ NULLIF(COUNT(*), 0) * 100, 2
) AS escalation_rate_pct
FROM chatbot_events
WHERE created_at >= DATEADD(day, -30, GETDATE())
GROUP BY 1;
-- Refresh materialized views (scheduled via Redshift scheduler)
REFRESH MATERIALIZED VIEW mv_daily_intent_summary;
REFRESH MATERIALIZED VIEW mv_hourly_cost_tracking;
REFRESH MATERIALIZED VIEW mv_escalation_rate;
Monitoring and Metrics
| Metric | Target | Alert |
|---|---|---|
| Kinesis ingestion lag | < 5 seconds | > 30 seconds |
| Event batch compression ratio | ≥ 60% | < 40% |
| Redshift hot table size | < 10 GB | > 20 GB |
| S3 archive cost | < $5/month | > $15/month |
| Materialized view refresh time | < 2 min | > 5 min |
| Monthly analytics pipeline cost | ≤ $300 | > $500 |
Risks and Mitigations
| Risk | Impact | Mitigation |
|---|---|---|
| Event batching delays real-time dashboards | 5-second delay in metrics | Acceptable for analytics; operational alerts use CloudWatch directly |
| Kinesis on-demand pricing exceeds provisioned at high steady traffic | Higher costs during sustained peak | Monitor and switch to provisioned if traffic stabilizes |
| Redshift Spectrum query performance on cold data | Slower ad-hoc queries | S3 data is Parquet + partitioned by date; performance is adequate for historical analysis |
| Materialized view staleness | Dashboard shows lagging data | 15-minute refresh interval; add "last updated" timestamp to dashboards |
Deep Dive: Why This Works on a Manga Chatbot Workload
The analytics pipeline is the cost optimization that pays for itself by making the other cost optimizations possible. Without observable per-request cost attribution, US-08's cost circuit breaker cannot trip, US-01's tier routing cannot be tuned, and US-04's auto-scaling cannot be evaluated. Treating analytics as "background telemetry" misses this — the pipeline is part of the cost-control feedback loop, and its own cost shape (Kinesis PUT units + Redshift compute) is highly tunable. The 40–60% saving target comes from three independent mechanical wins.
Property 1: Chatbot events are tiny, frequent, and compressible. A single chat-event JSON payload is ~500 bytes uncompressed (intent label, model used, token counts, latency, cost, timestamp, session ID). At 1M messages/day with multiple events per message (request, classification, retrieval, LLM call, response), event volume is ~10M events/day. Sent individually to Kinesis, each event consumes 1 PUT unit (Kinesis bills minimum 1 KB per PUT regardless of actual payload). Sent individually, that is 10M PUT units/day; sent as 50-event batches with gzip compression, the same data fits in ~250K PUT units/day — 40× reduction in PUT-unit cost, not from fewer bytes (Kinesis bills per request, not per byte) but from fewer requests. The architectural assumption is that the 5-second batching delay is acceptable for analytics use cases (it is); operational alerting must use a different signal path (CloudWatch metrics, not Kinesis events) for sub-second response.
Property 2: Hot/cold tiering matches query patterns. Analytics queries decompose into two shapes: operational (last 24-72 hours, frequent, low-latency required for dashboards) and retrospective (weeks or months, infrequent, latency-tolerant for ad-hoc analysis). Redshift RA3 hot storage is fast but expensive (~$24/TB/month for managed storage on top of compute); S3 + Spectrum is cheap (~$23/TB/month for S3 + ad-hoc query cost) but slower. The 30-day hot/cold boundary captures this — anything queried daily lives in Redshift, anything queried monthly lives in S3 Parquet. The architectural assumption is that the boundary is correct; if dashboards start querying > 30 days regularly, either move the boundary or pre-aggregate via materialized views (the next property).
Property 3: Materialized views amortize aggregation cost across many dashboard reads. A dashboard query like "daily intent summary for the last 7 days" scans potentially millions of events to compute a few hundred summary rows. Running this scan once per dashboard load wastes Redshift compute. Materializing the result table (refreshed every 15 minutes) makes each dashboard load cheap (read 700 rows from a materialized table) at the cost of one scheduled refresh per 15 minutes. The trade-off is staleness vs cost; for the cost dashboards described in this story, 15-minute staleness is acceptable. The failure mode is materialized-view-staleness lag during refresh contention — flag if refresh time consistently > 5 minutes (story line 468).
Property 4: Parquet compression compounds over years of retention. Raw JSON event archives at 500 B/event × 10M/day × 365 days = ~1.8 TB/year of cold data. Parquet (columnar, with dictionary encoding and snappy compression) typically achieves 5–10× compression on event-shaped data → ~200 GB/year in S3. At $0.023/GB/month, that is $4.60/month per year of retention vs $41/month uncompressed — a 90% storage cost reduction that compounds with retention period. The architectural assumption is that columnar query patterns dominate (which they do for analytics: GROUP BY intent, COUNT, AVG); row-oriented queries (give me this exact event) would be slower on Parquet but are not the analytics use case.
Bottom line: event batching (Kinesis PUT savings) is the single largest lever and a one-time engineering cost. RA3 hot/cold tiering is moderate ongoing savings but requires the offload pipeline to be reliable. Materialized views are a small Redshift compute saving but a large dashboard-latency improvement. Parquet compression is "free" once the offload pipeline is in place.
Real-World Validation
Industry Benchmarks & Case Studies
- AWS Kinesis Data Streams pricing (current) — On-demand mode: $0.04/GB ingested + $0.014/GB retrieved; provisioned mode: $0.015/shard-hour + $0.014/PUT million units (1 PUT unit = 25 KB). The story's "50% PUT-unit reduction from batching" matches the math: batching 50 events into one ~25 KB PUT consumes 1 PUT unit instead of 50.
- AWS re:Invent ANT308 (Redshift performance) — Documents 3–5× compute improvement for RA3 vs DC2 on analytics workloads; pricing per node ~$1.086/hr for ra3.xlplus vs $0.25/hr for dc2.large but with managed storage included. Crossover point: ~10–20 GB of data + low compute utilization favors RA3.
- Apache Parquet documentation + Cloudera benchmarks — Parquet vs JSON compression ratios of 5–10× on event-shaped data are widely published. Snappy is the default compression; ZSTD gives ~15% better ratio with ~2× CPU.
- AWS Redshift materialized views documentation — Auto-refresh (CDC-driven) vs scheduled refresh trade-off; for cost dashboards, scheduled refresh at 15-minute intervals is the documented best practice.
- Netflix engineering blog: "Iceberg at Netflix" — Documents the open-table-format pattern for hot/cold analytics tiering at scale; analogous to the Spectrum offload pattern in this story.
- Internal cross-reference:
Monitoring-GenAI-Systems/— The metrics surfaced by this pipeline feed the broader GenAI observability framework; cross-reference the dashboard catalog there. - Internal cross-reference:
POC-to-Production-War-Story/02-seven-production-catastrophes.md— The "cost explosion" catastrophe was undetected in part because cost telemetry lagged 24+ hours; informs the requirement that this pipeline's lag stay < 5 minutes (story line 464).
Math Validation
- Kinesis on-demand: 10M events/day × 500 B = 5 GB/day × $0.04/GB = $0.20/day ingested = $6/month — surprisingly cheap on volume alone. The PUT-request cost is the dominant component pre-batching: 10M PUTs/day × $0.014/M = $0.14/day = $4.20/month. With batching to 250K PUTs/day → $0.10/month — saves $4/month. Flag: the absolute Kinesis savings are small ($4–10/month); the story's "50% Kinesis savings" is a percentage of a small base. The bigger absolute savings come from Redshift, not Kinesis.
- Redshift dc2.large cluster: $0.25/hr × 730 = $182.50/month per node; 2-node cluster = $365/month. ✅ matches story's "$360/month" baseline.
- Redshift RA3.xlplus: $1.086/hr × 730 = $792.78/month per node — more expensive than DC2 at small scale. RA3 wins on managed-storage savings only above ~10 GB hot data; at 10 GB scale, the wins are marginal. Flag: the story's "Redshift RA3 storage savings 40%" claim is workload-dependent — verify hot-data volume before migration.
- S3 Spectrum: $5/TB scanned + $0.023/GB/month storage. 200 GB/year archived × $0.023 = $4.60/month — cheap.
- Materialized views: refresh cost is one query × 15-min interval × 730 hr = 2,920 refreshes/month per view × ~10s each = ~8 hours of compute saved per dashboard if dashboard hit > 100 times/day. Materialized views pay off above ~50 dashboard loads/day per view.
Conservative vs Aggressive Savings Bounds
| Bound | Source | Total monthly savings |
|---|---|---|
| Conservative | Batching only, keep DC2, no S3 archive | ~25% (~$130/month) |
| Aggressive | Batching + RA3 (only if > 50 GB hot) + S3 archive + 5 materialized views | ~55% (~$300/month) |
| Story's projected savings | 40–60% (~$200–360/month) | Realistic; depends on whether hot data volume justifies RA3. |
Cross-Story Interactions & Conflicts
- US-08 (Traffic-Based) — Authoritative side: this story owns the cost telemetry pipeline. Conflict mode: US-08's cost circuit breaker reads cumulative daily Bedrock spend from this pipeline. If event-batching latency exceeds 5 minutes, the breaker decides on stale data — could trip too late (after spending has overshot) or fail to trip during a runaway. Resolution: cost-tracking events (specifically
bedrock_call_completedevents) bypass the 50-event batching window and use a smaller batch (5 events / 1 second flush) on a dedicated stream. This trades some Kinesis savings for breaker freshness on the most cost-critical event class. - US-05 (DynamoDB) — See US-05. This pipeline reads the DynamoDB Stream for TURN items and archives them to S3 before TTL deletes them. Resolution: the TURN-archive flow runs on a 5-minute schedule (well under the 24h DDB TTL) and is monitored with a "TURN items in DDB but not in S3" reconciliation query.
- US-04 (Compute) — Indirect interaction. The event-batcher buffer (50 events / 5 sec) lives in-process on Fargate workers. Spot interruption or scale-in can drop in-flight batches. Resolution: SIGTERM handler flushes the batch synchronously before exit; combined with US-04's graceful drain, batch loss should be near zero.
- US-01, US-02, US-03, US-06 — All emit events into this pipeline. Conflict mode: event schema drift across stories breaks the materialized views and dashboards. Resolution: centralized event schema in a shared schemas module; CI lints any analytics event for schema compliance before merge.
Rollback & Experimentation
Shadow-Mode Plan
- Event batching: deploy in observe mode for 1 week — events are emitted both batched (to a shadow Kinesis stream) and unbatched (to the existing stream). Compare event counts and dashboard outputs daily.
- RA3 migration: provision an RA3 cluster alongside DC2; replicate data via DBLINK or scheduled COPY for 2 weeks; run dashboards on both; promote queries to RA3 only after performance + cost analysis confirms the win.
- Materialized views: deploy each view in shadow (build the table but don't yet point dashboards at it); compare query results between source-table and materialized-view queries; promote dashboards one at a time.
Canary Thresholds
- Batching ramp: 10% → 50% → 100% of event types over 2 weeks; abort if event-count discrepancy between batched and unbatched paths > 0.5%.
- RA3 ramp: 10% of dashboards over 1 week; full after 1 month if cost + perf hold.
- Abort criteria (any one trips): event loss > 0.1%, dashboard data divergence > 1%, materialized-view refresh time > 5 minutes, Spectrum query failure rate > 1%.
Kill Switch
- Three flags:
analytics_batching_enabled(reverts to per-event Kinesis writes),analytics_redshift_ra3_active(routes queries back to DC2 cluster),analytics_materialized_views_active(routes dashboards to source tables). Dashboard-level rollback is per-view via Redshift's view-renaming mechanism.
Quality Regression Criteria (story-specific)
- Event loss rate: ≤ 0.01% (10 events lost per 100K — well below dashboard-noise threshold).
- Cost-tracking event lag (time from Bedrock call → US-08 reading event): ≤ 5 minutes P95.
- Materialized view refresh time: ≤ 2 minutes (story metric line 468).
- Spectrum cold-query latency: ≤ 30 seconds for typical historical-analysis queries.
Multi-Reviewer Validation Findings & Resolutions
The cross-reviewer pass identified the following story-specific findings. README's "Multi-Reviewer Validation & Cross-Cutting Hardening" section covers concerns that span all stories.
S1 (must-fix before production)
Event schema unversioned; US-08 consumer breaks silently on producer drift. Cost events feed US-08's circuit breaker decisions. Adding a field, renaming input_tokens → input_token_count, or changing a unit (cents → dollars) silently breaks the breaker. Resolution: centralized events/cost_event_v1.schema.json with a required schema_version field; producers (US-01, US-04, US-06) validate before emit; consumer (US-08) validates on read and skip-with-alert on unknown version. CI lint blocks any PR touching the event-emitting code without bumping the schema version.
S3 archive unencrypted by default. UNLOAD does not enforce SSE; if the bucket policy is permissive, archived events (containing intent labels, customer_id hashes, and potentially PII fragments) sit unencrypted at rest. Resolution: S3 bucket policy denies any PutObject without x-amz-server-side-encryption: aws:kms (KMS-managed key); also deny public read; require aws:SecureTransport. KMS key policy restricts decryption to the Redshift role only.
GDPR right-to-deletion not propagated to S3 archive. Per US-05 fix, customer deletion happens in DDB; the S3 archive in Parquet is never re-written. Resolution: monthly Glue job rebuilds the S3 archive partitioned by month, excluding customer_ids from the US-05 deletion-audit table. Document 30-day SLA on deletion completion. Alternatively, use Apache Iceberg / row-level delete on the archive — out of scope for v1.
Buffered events in task memory are unencrypted PII. Up to 5 seconds of events buffered in AnalyticsEventBatcher. Memory dump from a crashed task leaks queued events. Resolution: (a) reduce buffer flush interval from 5s to 1s for events containing message_text (cost-tracking events without message_text remain at 5s); (b) SIGTERM handler flushes + clears the buffer synchronously before exit (per US-04 fix); © consider not buffering message_text at all — emit message_text events one at a time at the cost of higher Kinesis PUT volume.
S2 (fix before scale-up)
ML observability fields missing from event schema. The schema captures latency_ms, feedback, but not tokens_per_response, cost_per_intent, retrieval_recall, embedding_cached, intent_confidence. Without these, the cost optimizations in US-01/US-02/US-06 cannot be evaluated for quality regression. Resolution: v1 event schema includes:
- request_id (UUID, threaded per README cross-cutting concerns)
- customer_id_hash (SHA-256 of customer_id + monthly-rotated salt)
- intent_label, intent_confidence, intent_classifier_version
- model_id, model_version, input_tokens, output_tokens, cached_input_tokens
- embedding_cached: bool, rag_invoked: bool, chunks_retrieved: int, reranker_invoked: bool
- tier_used (which model tier the request was served by)
- degradation_level_at_request_time
- language (en | ja | mixed) — drives stratified metrics
- cost_estimate_usd (per-request, validates against US-08 breaker math)
Materialized-view staleness invisible on dashboards. 15-min refresh interval but no surfaced timestamp. Resolution: every materialized view exposes last_refreshed_at; dashboards display data-age alongside every metric; alert if any view age > 30 minutes.
RA3 vs DC2 migration recommendation is workload-dependent. Math validation flagged that RA3 wins only above ~15–20 GB hot data. Resolution: measure hot-data size for 30 days post-launch; only migrate to RA3 if size > 15 GB. Default to keeping DC2 + Spectrum for cold data.
Kinesis cost claim doesn't reconcile. Story's "$30/month Kinesis savings" math doesn't match the 10M-events/day baseline. Resolution: restate Kinesis savings at the actual reconciled number (~$4–10/month); the dominant cost lever in this story is Redshift compute via materialized views, not Kinesis.
Kill switch absent. Resolution: add analytics_optimization_enabled flag; when false, stop batching (per-event Kinesis writes), bypass materialized views (read from source tables).
S3 (acknowledged / future work)
- Glue Crawler weekly to detect schema drift on S3 Parquet (auto-update Glue catalog).
- Cost reconciliation: monthly compare Kinesis-derived spend vs AWS Cost Explorer; alert on > 5% divergence.
- Apache Iceberg migration for row-level deletes in archive — major effort, out of v1 scope.