Scenarios and Runbooks — Data Validation Workflows
MangaAssist context: JP Manga store chatbot on AWS — Bedrock Claude 3 (Sonnet at $3/$15 per 1M tokens input/output, Haiku at $0.25/$1.25), OpenSearch Serverless (vector store), DynamoDB (sessions/products), ECS Fargate (orchestrator), API Gateway WebSocket, ElastiCache Redis. Target: useful answer in under 3 seconds, 1M messages/day scale.
Skill Mapping
| Dimension | Detail |
|---|---|
| Certification | AWS AIP-C01 — AI Practitioner |
| Domain | 1 — Foundation Model Integration, Data Management, and Compliance |
| Task | 1.3 — Implement data validation and processing pipelines for FM consumption |
| Skill | 1.3.1 — Create comprehensive data validation workflows to ensure data meets quality standards for FM consumption |
| This File | Five production scenarios with detection flowcharts, root cause analysis, resolution code, and prevention strategies |
Skill Scope Statement
This file presents five real-world failure scenarios that MangaAssist has encountered (or would encounter) in production when ingesting data into FM-backed pipelines. Each scenario exposes a distinct data quality failure mode — duplicates, nulls, format inconsistency, silent pipeline failures, and unbounded token injection. Each runbook includes: a problem statement, a mermaid detection flowchart, root cause analysis, Python resolution code using boto3, and prevention measures. These runbooks are designed for on-call engineers responding to data quality alerts before or after FM consumption.
Mind Map — Data Validation Failure Modes
mindmap
root((Data Validation<br/>Failure Scenarios))
Duplicate ASINs
Contradictory FM Answers
Duplicate Embeddings
Knowledge Base Drift
Null Content Fields
Schema Pass Silent Failure
FM Context Starvation
Hallucinated Answers
Mixed Date Formats
ISO 8601 vs MM/DD/YYYY
Freetext Dates
FM Misinterpretation
Silent Glue DQ Failure
DQ Score Below Threshold
Pipeline Continues
Invalid Data in OpenSearch
Unbounded Review Tokens
50K Token Injection
Prompt Truncation
System Instructions Dropped
Scenario Overview
| # | Scenario | Severity | Blast Radius | Typical Detection Time |
|---|---|---|---|---|
| 1 | Duplicate ASINs in product catalog CSV — FM gives contradictory price answers | P2 — High | All users querying affected products | 5-15 minutes via answer consistency audit |
| 2 | Null/empty FAQ content fields pass schema check — FM hallucinates answers | P2 — High | Any user on topic covered by empty FAQ doc | Post-hoc via quality audit or user report |
| 3 | Mixed date formats in manga metadata — FM misinterprets release dates | P3 — Medium | Recommendations for date-sensitive queries | Post-hoc via date comparison test suite |
| 4 | AWS Glue Data Quality job fails silently — invalid data reaches OpenSearch | P1 — Critical | All users querying contaminated index segment | 10-30 minutes via DQ score CloudWatch alarm |
| 5 | User review data injected without length validation — 50K tokens truncate system instructions | P1 — Critical | Any session with user-submitted context injection | Immediate via token count alarm or user report |
Scenario 1: Duplicate ASINs in Product Catalog — FM Gives Contradictory Price Answers
Problem
MangaAssist ingests a weekly product catalog CSV from the vendor into a Bedrock Knowledge Base. The CSV contains duplicate ASIN rows with different prices (e.g., two rows for ASIN B08MANGA01 — one at $12.99 and one at $9.99 after a partial sale update). Both rows are chunked and embedded independently into OpenSearch Serverless. When a user asks "How much is Demon Slayer Vol. 3?", the RAG pipeline retrieves both chunks and Claude receives conflicting context, reporting either price depending on retrieval ranking. Users see inconsistent answers across sessions.
Detection
flowchart TD
A[Product CSV arrives in S3] --> B[Lambda: pre-ingest validation triggered]
B --> C{ASIN uniqueness check}
C -->|Duplicates found| D[CloudWatch metric: catalog_asin_duplicates > 0]
C -->|No duplicates| E[Proceed to Bedrock Knowledge Base sync]
D --> F[SNS Alert: DataQuality-ProductCatalog]
F --> G[On-call engineer notified]
G --> H{Manual review}
H -->|Deduplicate and re-upload| I[Re-trigger Knowledge Base sync]
H -->|Escalate to vendor| J[Vendor corrects source feed]
I --> E
Root Cause
The vendor's catalog export system performs incremental updates by appending new rows rather than updating existing rows in place. When a price change event fires, the old row is not deleted before the new row is appended. The Lambda pre-ingest validator checked for required columns (asin, title, price, availability) and correct data types, but did not enforce uniqueness constraints on the asin field. Once duplicate embeddings entered OpenSearch, the Knowledge Base sync had no deduplication mechanism — both chunks survived retrieval scoring equally.
Resolution
import boto3
import csv
import io
import json
import logging
from collections import defaultdict
from datetime import datetime, timezone
logger = logging.getLogger()
logger.setLevel(logging.INFO)
s3 = boto3.client("s3")
cloudwatch = boto3.client("cloudwatch")
sns = boto3.client("sns")
SNS_TOPIC_ARN = "arn:aws:sns:us-east-1:123456789012:DataQuality-ProductCatalog"
NAMESPACE = "MangaAssist/DataQuality"
def lambda_handler(event, context):
"""Validate product catalog CSV for duplicate ASINs before KB sync."""
bucket = event["Records"][0]["s3"]["bucket"]["name"]
key = event["Records"][0]["s3"]["object"]["key"]
response = s3.get_object(Bucket=bucket, Key=key)
raw_csv = response["Body"].read().decode("utf-8")
reader = csv.DictReader(io.StringIO(raw_csv))
rows = list(reader)
asin_index = defaultdict(list)
for i, row in enumerate(rows):
asin = row.get("asin", "").strip()
if asin:
asin_index[asin].append(i)
duplicates = {asin: idxs for asin, idxs in asin_index.items() if len(idxs) > 1}
dup_count = len(duplicates)
_emit_metric("catalog_asin_duplicates", dup_count)
if dup_count == 0:
logger.info("No duplicate ASINs found. Proceeding to KB sync.")
return {"status": "clean", "rows": len(rows)}
logger.warning(f"Found {dup_count} duplicate ASINs: {list(duplicates.keys())[:10]}")
# Deduplication: keep last occurrence (most recent price update)
seen = {}
deduped_rows = []
for row in reversed(rows):
asin = row.get("asin", "").strip()
if asin not in seen:
seen[asin] = True
deduped_rows.insert(0, row)
# Write deduplicated file back to S3 under a quarantine-safe prefix
deduped_key = key.replace("raw/", "validated/")
output = io.StringIO()
writer = csv.DictWriter(output, fieldnames=reader.fieldnames)
writer.writeheader()
writer.writerows(deduped_rows)
s3.put_object(
Bucket=bucket,
Key=deduped_key,
Body=output.getvalue().encode("utf-8"),
ContentType="text/csv",
)
_send_alert(duplicates, bucket, key, deduped_key)
return {
"status": "deduped",
"original_rows": len(rows),
"deduped_rows": len(deduped_rows),
"duplicate_asins": dup_count,
"clean_file": f"s3://{bucket}/{deduped_key}",
}
def _emit_metric(metric_name: str, value: float) -> None:
cloudwatch.put_metric_data(
Namespace=NAMESPACE,
MetricData=[{
"MetricName": metric_name,
"Value": value,
"Unit": "Count",
"Timestamp": datetime.now(timezone.utc),
}],
)
def _send_alert(duplicates: dict, bucket: str, raw_key: str, clean_key: str) -> None:
message = {
"alert": "Duplicate ASINs detected in product catalog",
"duplicate_count": len(duplicates),
"sample_duplicates": list(duplicates.keys())[:5],
"raw_file": f"s3://{bucket}/{raw_key}",
"clean_file": f"s3://{bucket}/{clean_key}",
"action": "Deduplication applied (last-row-wins). Verify prices before KB sync.",
}
sns.publish(TopicArn=SNS_TOPIC_ARN, Message=json.dumps(message, indent=2))
Prevention
- Enforce
UNIQUENESSrule in AWS Glue Data Quality ruleset on theasincolumn with threshold>= 1.0before any S3-to-KB sync. - Add a pre-sync Lambda gate: if
catalog_asin_duplicates > 0CloudWatch alarm is ALARM, block the Bedrock Knowledge Base sync Step Functions workflow. - Configure the Knowledge Base ingestion to use ASIN as the document ID so a re-sync of the same ASIN overwrites rather than appends the embedding.
- Set up a weekly reconciliation job that compares the live OpenSearch document count against the deduplicated catalog row count and alerts on discrepancies.
- Publish a vendor SLA requirement that catalog exports must pass ASIN uniqueness validation before acceptance; reject non-conforming files at the S3 bucket policy level using an EventBridge rule.
Scenario 2: Null/Empty FAQ Content Fields Pass Schema Check — FM Hallucinates Answers
Problem
MangaAssist's FAQ knowledge base is maintained by the content team in a JSON file. The schema validator checks that each document has the fields id, question, and answer. After a CMS export bug, 23 FAQ documents were exported with answer: "" (empty string). The schema check passed because the field existed and was a string. These documents were chunked and embedded as single-character or empty vectors. When users asked questions that matched those FAQ topics (e.g., "What is your return policy for damaged manga?"), the RAG retriever returned the empty-answer chunk as the top result with high cosine similarity to the question. Claude received context like Answer: and, lacking any substantive information, invented a plausible-sounding but incorrect return policy. Users received misinformation; one user incorrectly believed damaged items were eligible for full refund.
Detection
flowchart TD
A[FAQ JSON uploaded to S3] --> B[Lambda: content-depth validator triggered]
B --> C{Answer field length check}
C -->|answer.strip length == 0| D[Tag document: EMPTY_CONTENT]
C -->|answer.strip length < 50 chars| E[Tag document: THIN_CONTENT warning]
C -->|answer.strip length >= 50 chars| F[Document passes content depth]
D --> G[CloudWatch metric: faq_empty_content_count]
E --> H[CloudWatch metric: faq_thin_content_count]
G --> I{Empty count > 0?}
I -->|Yes| J[Block KB sync — SNS alert to content team]
I -->|No| K{Thin count > threshold?}
K -->|Yes| L[Soft alert — allow sync with warning tag]
K -->|No| F
F --> M[Proceed to Bedrock Knowledge Base sync]
Root Cause
The CMS export pipeline serialized FAQ entries directly from the database, where answer was stored as NULL in the RDS table. The Python export script used row.get("answer", "") which silently converted NULL to empty string. The downstream schema validator used jsonschema with type: string — empty string is a valid string. No minimum-length validator was applied. The embedding model (amazon.titan-embed-text-v2) embedded the empty string as a near-zero vector that nonetheless had high cosine similarity with any query about the topic (the question field still provided semantic signal). This is a structural failure: schema validity is necessary but not sufficient for semantic content quality.
Resolution
import boto3
import json
import logging
from datetime import datetime, timezone
from typing import Any
logger = logging.getLogger()
logger.setLevel(logging.INFO)
s3 = boto3.client("s3")
cloudwatch = boto3.client("cloudwatch")
sns = boto3.client("sns")
SNS_TOPIC_ARN = "arn:aws:sns:us-east-1:123456789012:DataQuality-FAQ"
NAMESPACE = "MangaAssist/DataQuality"
MIN_ANSWER_LENGTH = 50 # characters — below this indicates thin/stub content
EMPTY_ANSWER_BLOCK = True # hard block on empty answers
def lambda_handler(event: dict, context: Any) -> dict:
"""Validate FAQ JSON for empty/thin content before KB sync."""
bucket = event["Records"][0]["s3"]["bucket"]["name"]
key = event["Records"][0]["s3"]["object"]["key"]
response = s3.get_object(Bucket=bucket, Key=key)
faq_docs = json.loads(response["Body"].read().decode("utf-8"))
empty_ids, thin_ids, valid_docs = [], [], []
for doc in faq_docs:
doc_id = doc.get("id", "unknown")
answer = (doc.get("answer") or "").strip()
if len(answer) == 0:
empty_ids.append(doc_id)
logger.warning(f"Empty answer — doc_id={doc_id}")
elif len(answer) < MIN_ANSWER_LENGTH:
thin_ids.append(doc_id)
logger.warning(f"Thin answer ({len(answer)} chars) — doc_id={doc_id}")
valid_docs.append(doc)
else:
valid_docs.append(doc)
_emit_metric("faq_empty_content_count", len(empty_ids))
_emit_metric("faq_thin_content_count", len(thin_ids))
_emit_metric("faq_valid_doc_count", len(valid_docs))
if empty_ids and EMPTY_ANSWER_BLOCK:
_send_alert(empty_ids, thin_ids, bucket, key)
return {
"status": "BLOCKED",
"reason": "Empty answer fields detected",
"empty_ids": empty_ids,
"thin_ids": thin_ids,
"valid_count": len(valid_docs),
}
# Write only valid + thin (warned) docs to validated prefix
validated_key = key.replace("raw/", "validated/")
s3.put_object(
Bucket=bucket,
Key=validated_key,
Body=json.dumps(valid_docs, ensure_ascii=False, indent=2).encode("utf-8"),
ContentType="application/json",
)
if thin_ids:
_send_alert([], thin_ids, bucket, key)
return {
"status": "PARTIAL" if thin_ids else "CLEAN",
"empty_blocked": len(empty_ids),
"thin_warned": len(thin_ids),
"valid_count": len(valid_docs),
"clean_file": f"s3://{bucket}/{validated_key}",
}
def _emit_metric(metric_name: str, value: float) -> None:
cloudwatch.put_metric_data(
Namespace=NAMESPACE,
MetricData=[{
"MetricName": metric_name,
"Value": value,
"Unit": "Count",
"Timestamp": datetime.now(timezone.utc),
}],
)
def _send_alert(empty_ids: list, thin_ids: list, bucket: str, key: str) -> None:
message = {
"alert": "FAQ content quality failure",
"empty_answer_ids": empty_ids,
"thin_answer_ids": thin_ids,
"source_file": f"s3://{bucket}/{key}",
"action": (
"KB sync BLOCKED due to empty answers. "
"Content team must populate empty answers before re-upload."
),
}
sns.publish(TopicArn=SNS_TOPIC_ARN, Message=json.dumps(message, indent=2))
Prevention
- Add
minLength: 50to the JSON Schema definition for theanswerfield so schema validation itself catches empty and stub answers. - Configure an AWS Glue Data Quality ruleset with a
ColumnLengthrule:ColumnLength "answer" >= 50with completeness threshold= 1.0. - In the CMS export pipeline, replace
row.get("answer", "")with explicitNULLhandling that tags the record asDRAFTand excludes it from export until the field is populated. - Create a CloudWatch alarm on
faq_empty_content_count >= 1that acts as a gate in the Step Functions KB sync workflow — any non-zero count stops the sync. - Run a monthly content audit Lambda that scans the live Knowledge Base index for short-embedding documents and flags them for content team review.
Scenario 3: Mixed Date Formats in Manga Metadata — FM Misinterprets Release Dates
Problem
MangaAssist ingests manga metadata from three upstream sources: the publisher API (returns ISO 8601 dates: 2024-03-15), a legacy internal database export (returns MM/DD/YYYY: 03/15/2024), and a web-scraped supplemental feed (returns freetext: Released: March 2024, Mar '24, Spring 2024). All three are merged into a single metadata JSON and embedded into OpenSearch. When users ask "What manga came out this month?" or "Is there anything newer than Attack on Titan Vol. 35?", Claude receives context with heterogeneous date strings and interprets them inconsistently. In one observed case, 03/15/2024 was parsed by Claude as March 15 in one response and as the 3rd day of the 15th month (invalid) in another, producing contradictory recency rankings.
Detection
flowchart TD
A[Manga metadata JSON arrives in S3] --> B[Lambda: date-format validator]
B --> C[Extract all release_date field values]
C --> D{Apply format detection regex}
D -->|Matches ISO 8601 only| E[Mark: STANDARD]
D -->|Matches MM/DD/YYYY| F[Mark: LEGACY_US — convert]
D -->|Matches freetext pattern| G[Mark: FREETEXT — convert or flag]
D -->|No pattern matches| H[Mark: UNPARSEABLE — quarantine]
F --> I[Normalise to ISO 8601]
G --> J{Parseable freetext?}
J -->|Yes| I
J -->|No| K[Mark: UNPARSEABLE]
H --> L[CloudWatch metric: metadata_unparseable_dates]
K --> L
I --> M[Emit metric: metadata_normalised_dates]
E --> N[Merge normalised records]
M --> N
N --> O[Write to validated/ prefix]
Root Cause
No canonical date format was defined when the three data sources were onboarded. Each source team documented their own format, and the merge Lambda simply concatenated records without normalization. The embedding pipeline treated release_date as a plain string field — there was no structured date type in the OpenSearch mapping (it was mapped as text). Claude's date interpretation was therefore entirely dependent on in-context string patterns, which are ambiguous for MM/DD/YYYY (month-first vs day-first is indistinguishable without locale context). Freetext like "Spring 2024" provides only approximate temporal information that different model invocations resolved differently.
Resolution
import boto3
import json
import logging
import re
from datetime import datetime, timezone
from typing import Optional
logger = logging.getLogger()
logger.setLevel(logging.INFO)
s3 = boto3.client("s3")
cloudwatch = boto3.client("cloudwatch")
NAMESPACE = "MangaAssist/DataQuality"
# Regex patterns ordered by specificity
ISO_PATTERN = re.compile(r"^\d{4}-\d{2}-\d{2}$")
US_DATE_PATTERN = re.compile(r"^(\d{1,2})/(\d{1,2})/(\d{4})$")
FREETEXT_MONTH_YEAR = re.compile(
r"(?:Released:\s*)?([A-Za-z]+\.?)\s*['\"]?(\d{2,4})", re.IGNORECASE
)
MONTH_MAP = {
"jan": "01", "feb": "02", "mar": "03", "apr": "04",
"may": "05", "jun": "06", "jul": "07", "aug": "08",
"sep": "09", "oct": "10", "nov": "11", "dec": "12",
}
def normalise_date(raw: str) -> Optional[str]:
"""Normalise any supported date format to ISO 8601 (YYYY-MM-DD)."""
if not raw:
return None
raw = raw.strip()
if ISO_PATTERN.match(raw):
return raw # Already normalised
m = US_DATE_PATTERN.match(raw)
if m:
month, day, year = m.group(1), m.group(2), m.group(3)
return f"{year}-{int(month):02d}-{int(day):02d}"
m = FREETEXT_MONTH_YEAR.search(raw)
if m:
month_str = m.group(1).lower().rstrip(".")[:3]
year_str = m.group(2)
if len(year_str) == 2:
year_str = "20" + year_str
month_num = MONTH_MAP.get(month_str)
if month_num and len(year_str) == 4:
return f"{year_str}-{month_num}-01" # Day unknown: default to 1st
return None # Unparseable
def lambda_handler(event: dict, context) -> dict:
"""Normalise release_date fields in manga metadata to ISO 8601."""
bucket = event["Records"][0]["s3"]["bucket"]["name"]
key = event["Records"][0]["s3"]["object"]["key"]
response = s3.get_object(Bucket=bucket, Key=key)
records = json.loads(response["Body"].read().decode("utf-8"))
normalised_count = 0
unparseable_ids = []
for record in records:
raw_date = record.get("release_date", "")
normalised = normalise_date(str(raw_date))
if normalised:
if normalised != raw_date:
normalised_count += 1
record["release_date"] = normalised
record["release_date_source_format"] = raw_date # Preserve original
else:
unparseable_ids.append(record.get("asin", "unknown"))
record["release_date"] = None
record["release_date_parse_error"] = raw_date
_emit_metric("metadata_normalised_dates", normalised_count)
_emit_metric("metadata_unparseable_dates", len(unparseable_ids))
validated_key = key.replace("raw/", "validated/")
s3.put_object(
Bucket=bucket,
Key=validated_key,
Body=json.dumps(records, ensure_ascii=False, indent=2).encode("utf-8"),
ContentType="application/json",
)
logger.info(f"Normalised: {normalised_count}, Unparseable: {len(unparseable_ids)}")
return {
"status": "normalised",
"normalised_count": normalised_count,
"unparseable_ids": unparseable_ids,
"clean_file": f"s3://{bucket}/{validated_key}",
}
def _emit_metric(metric_name: str, value: float) -> None:
cloudwatch.put_metric_data(
Namespace=NAMESPACE,
MetricData=[{
"MetricName": metric_name,
"Value": value,
"Unit": "Count",
"Timestamp": datetime.now(timezone.utc),
}],
)
Prevention
- Define a canonical data contract: all upstream sources must deliver
release_dateas ISO 8601 (YYYY-MM-DD) or the feed is rejected at the API gateway layer. - Set the
release_datefield type in OpenSearch Serverless todatewithformat: "strict_date_optional_time"so the index itself rejects malformed strings at index time. - Add a Glue Data Quality rule
ColumnValues "release_date" matches "^\d{4}-\d{2}-\d{2}$"with completeness>= 0.95to catch residual format violations. - Include a date normalization step in the Bedrock Knowledge Base custom transformation Lambda so that even if upstream formats slip through, embeddings always use the normalized value.
- Emit
metadata_unparseable_datesas a CloudWatch alarm that fires an SNS alert to the data-sources team when any record has an unparseable date after normalization.
Scenario 4: AWS Glue Data Quality Job Fails Silently — Invalid Data Reaches OpenSearch Serverless
Problem
MangaAssist runs a weekly AWS Glue ETL job that ingests product and metadata updates into the OpenSearch Serverless knowledge base. A Glue Data Quality (DQ) ruleset was configured to check completeness (>= 0.95), uniqueness, and value ranges. However, the Glue job was configured with DQ_STOP_JOB_ON_FAILURE_OPTIONS = {} (empty — DQ alerts only, no stop). When the DQ score dropped to 0.72 due to a batch of records with missing price and title fields, the job logged a DataQualityEvaluationResult as WARNING but continued execution. The downstream write to the OpenSearch index proceeded with the contaminated batch. Users began receiving answers that cited products with null titles as "Unknown Item" or prices as "Price not available" — causing customer confusion and support ticket spikes.
Detection
flowchart TD
A[Glue ETL job starts] --> B[DQ ruleset evaluation runs]
B --> C{DQ score >= threshold?}
C -->|Yes — score >= 0.95| D[Job continues normally]
C -->|No — score < 0.95| E[DQ WARNING logged to CloudWatch Logs]
E --> F{DQ_STOP_JOB_ON_FAILURE configured?}
F -->|No — silent continue| G[Job writes invalid records to OpenSearch]
F -->|Yes — job halted| H[Step Functions reports FAILED state]
G --> I[CloudWatch metric: glue_dq_score below threshold]
I --> J{Alarm triggered?}
J -->|No alarm configured| K[Silent contamination — users affected]
J -->|Alarm configured| L[SNS alert to on-call]
H --> L
L --> M[On-call quarantines affected index segment]
Root Cause
The Glue job was initially set up with DQ in WARN mode (not FAIL) to avoid blocking the pipeline during initial tuning. The team intended to switch to FAIL mode after one month but the Jira ticket was deprioritised and never completed. No CloudWatch alarm was configured on the GlueDataQuality.RulesetPassedCount or custom DQ score metric. The Step Functions workflow that orchestrated the Glue job checked only for the Glue job terminal state (SUCCEEDED vs FAILED) — since the Glue job itself succeeded (it ran to completion), the workflow marked the pipeline run as healthy. The DQ evaluation result log entry was written to CloudWatch Logs but no Metric Filter or alarm consumed it.
Resolution
import boto3
import json
import logging
from datetime import datetime, timezone
from typing import Any
logger = logging.getLogger()
logger.setLevel(logging.INFO)
glue = boto3.client("glue")
cloudwatch = boto3.client("cloudwatch")
sns = boto3.client("sns")
stepfunctions = boto3.client("stepfunctions")
SNS_TOPIC_ARN = "arn:aws:sns:us-east-1:123456789012:DataQuality-GlueETL"
NAMESPACE = "MangaAssist/DataQuality"
DQ_PASS_THRESHOLD = 0.95
def lambda_handler(event: dict, context: Any) -> dict:
"""
Post-Glue-job gate: inspect DQ evaluation results and halt Step Functions
task token if score is below threshold. Invoked by EventBridge rule on
Glue job state change to SUCCEEDED.
"""
job_name = event.get("detail", {}).get("jobName")
job_run_id = event.get("detail", {}).get("jobRunId")
task_token = event.get("taskToken") # From Step Functions .waitForTaskToken
if not job_name or not job_run_id:
raise ValueError("Missing jobName or jobRunId in event")
dq_score = _get_dq_score(job_name, job_run_id)
_emit_metric("glue_dq_score", dq_score, {"JobName": job_name})
if dq_score < DQ_PASS_THRESHOLD:
logger.error(f"DQ score {dq_score:.3f} below threshold {DQ_PASS_THRESHOLD}")
_send_alert(job_name, job_run_id, dq_score)
if task_token:
stepfunctions.send_task_failure(
taskToken=task_token,
error="DataQualityThresholdNotMet",
cause=json.dumps({
"job_name": job_name,
"job_run_id": job_run_id,
"dq_score": dq_score,
"threshold": DQ_PASS_THRESHOLD,
}),
)
return {"status": "BLOCKED", "dq_score": dq_score}
logger.info(f"DQ score {dq_score:.3f} passes threshold. Proceeding.")
if task_token:
stepfunctions.send_task_success(
taskToken=task_token,
output=json.dumps({"dq_score": dq_score, "status": "PASS"}),
)
return {"status": "PASS", "dq_score": dq_score}
def _get_dq_score(job_name: str, job_run_id: str) -> float:
"""Retrieve the most recent DQ evaluation score for a Glue job run."""
response = glue.get_data_quality_result(ResultId=job_run_id)
score = response.get("Score", 0.0)
return float(score)
def _emit_metric(metric_name: str, value: float, dimensions: dict = None) -> None:
dim_list = [{"Name": k, "Value": v} for k, v in (dimensions or {}).items()]
cloudwatch.put_metric_data(
Namespace=NAMESPACE,
MetricData=[{
"MetricName": metric_name,
"Value": value,
"Unit": "None",
"Timestamp": datetime.now(timezone.utc),
"Dimensions": dim_list,
}],
)
def _send_alert(job_name: str, job_run_id: str, dq_score: float) -> None:
message = {
"alert": "Glue Data Quality threshold not met — OpenSearch sync BLOCKED",
"job_name": job_name,
"job_run_id": job_run_id,
"dq_score": round(dq_score, 4),
"threshold": DQ_PASS_THRESHOLD,
"action": "Investigate Glue DQ result in AWS console. Re-run after source data remediation.",
}
sns.publish(TopicArn=SNS_TOPIC_ARN, Message=json.dumps(message, indent=2))
Prevention
- Set
DQ_STOP_JOB_ON_FAILURE_OPTIONS = {"StopJobOnFailure": "true"}in the Glue Data Catalog table DQ configuration so the Glue job itself halts on score failure — do not rely solely on post-job Lambda gating. - Create a CloudWatch Metric Filter on Glue job CloudWatch Logs that extracts the
Scorefield fromDataQualityEvaluationResultlog events and publishes it as a custom metric; set an alarm at<= 0.94. - In the Step Functions workflow, use
.waitForTaskTokenwith the DQ gate Lambda so that a DQ failure automatically surfaces as a workflow execution failure — visible in the console and triggerable by EventBridge. - Establish a data quarantine S3 prefix: on DQ failure, route the contaminated batch to
s3://mangaassist-data/quarantine/<timestamp>/for forensic review without deleting source data. - Schedule a quarterly Game Day drill where the team intentionally injects a low-quality batch to verify the full alarm-to-block-to-alert chain fires correctly end to end.
Scenario 5: User Review Data Injected Without Length Validation — 50K-Token Review Truncates System Instructions
Problem
MangaAssist allows users to submit long-form reviews for manga volumes. A new "personalized recommendation" feature was added that injects the user's most recent review into the system prompt context to help Claude personalize recommendations. A power user submitted a 50,000-token review (a detailed chapter-by-chapter analysis of a 50-volume boxset, copy-pasted from their personal blog). The orchestration Lambda appended the full review to the system prompt without any length check. Claude 3 Sonnet's 200K context window accommodated the text, but the combined prompt (system instructions + user review + conversation history + RAG chunks) exceeded the token budget designed for RAG retrieval. Token budget overflow caused the RAG chunks and — critically — the last portion of the system instructions (including the safety rails, response format directives, and refusal policy) to be silently truncated by the context window packing logic. The model responded without safety guidelines, producing off-topic and policy-violating content in one session.
Detection
flowchart TD
A[User submits review data] --> B[Lambda: review ingestion handler]
B --> C[Count tokens using Bedrock tokenizer]
C --> D{Token count <= MAX_REVIEW_TOKENS?}
D -->|Yes| E[Store review in DynamoDB user profile]
D -->|No| F[Truncate review to MAX_REVIEW_TOKENS]
F --> G[CloudWatch metric: review_token_overflow_count]
G --> H[Log user_id and token count]
E --> I[Recommendation orchestrator builds prompt]
I --> J[Calculate total prompt token budget]
J --> K{Total tokens <= PROMPT_BUDGET?}
K -->|Yes| L[Invoke Bedrock Claude]
K -->|No| M[Drop user review from context — use summary only]
M --> N[CloudWatch metric: prompt_budget_exceeded]
N --> L
L --> O[Return recommendation to user]
Root Cause
The feature was developed with the assumption that user reviews would be short (a few hundred words). No token budget analysis was performed when the feature was designed. The orchestration code used f-string concatenation to build the prompt and passed the raw review_text field from DynamoDB directly into the prompt template. DynamoDB's 400KB item size limit was the only implicit cap — a 50K-token review at ~4 chars/token is approximately 200KB, which fits within DynamoDB's limit. The Lambda had no awareness of Claude's effective context budget for the recommendation use case (system prompt: ~2K tokens, RAG chunks: ~4K tokens, conversation history: ~2K tokens, model response reservation: ~1K tokens — leaving only ~191K tokens nominally "available" but the feature ticket never set a review budget). The context window packing logic in the Converse API silently dropped trailing content when total tokens exceeded the model limit reservation, with no exception raised.
Resolution
import boto3
import json
import logging
import re
from datetime import datetime, timezone
from typing import Optional
logger = logging.getLogger()
logger.setLevel(logging.INFO)
bedrock_runtime = boto3.client("bedrock-runtime", region_name="us-east-1")
dynamodb = boto3.resource("dynamodb")
cloudwatch = boto3.client("cloudwatch")
sns = boto3.client("sns")
NAMESPACE = "MangaAssist/DataQuality"
SNS_TOPIC_ARN = "arn:aws:sns:us-east-1:123456789012:DataQuality-TokenBudget"
# Token budget allocation for recommendation prompt
SYSTEM_PROMPT_TOKENS = 2_000
RAG_CHUNK_TOKENS = 4_000
CONVERSATION_HISTORY_TOKENS = 2_000
RESPONSE_RESERVATION_TOKENS = 1_000
MAX_REVIEW_TOKENS = 3_000 # Hard cap for user-submitted review content
TOTAL_BUDGET = 200_000 # Claude 3 Sonnet context window
PROMPT_BUDGET = (
TOTAL_BUDGET
- SYSTEM_PROMPT_TOKENS
- RAG_CHUNK_TOKENS
- CONVERSATION_HISTORY_TOKENS
- RESPONSE_RESERVATION_TOKENS
)
def estimate_tokens(text: str) -> int:
"""Conservative token estimate: 1 token per 3.5 characters (multilingual)."""
return max(1, int(len(text) / 3.5))
def truncate_to_token_budget(text: str, max_tokens: int) -> tuple[str, bool]:
"""Truncate text to stay within token budget. Returns (text, was_truncated)."""
if estimate_tokens(text) <= max_tokens:
return text, False
# Truncate by char count approximation
max_chars = max_tokens * 3 # Conservative: 3 chars per token
return text[:max_chars] + "\n\n[Review truncated for context budget.]", True
def lambda_handler(event: dict, context) -> dict:
"""Validate and budget user review before injection into FM recommendation prompt."""
user_id = event.get("user_id")
raw_review = event.get("review_text", "")
action = event.get("action", "ingest") # "ingest" or "inject"
if action == "ingest":
return _handle_review_ingest(user_id, raw_review)
elif action == "inject":
return _handle_review_inject(user_id, raw_review)
else:
raise ValueError(f"Unknown action: {action}")
def _handle_review_ingest(user_id: str, raw_review: str) -> dict:
"""Store review with token count metadata. Reject if exceeds absolute cap."""
token_count = estimate_tokens(raw_review)
ABSOLUTE_CAP = 20_000 # Reject outright above this — storage + abuse guard
_emit_metric("review_submitted_tokens", token_count)
if token_count > ABSOLUTE_CAP:
_emit_metric("review_token_overflow_count", 1)
_send_alert(user_id, token_count, ABSOLUTE_CAP, "REJECTED_AT_INGEST")
logger.error(f"Review rejected: user_id={user_id}, tokens={token_count}")
return {
"status": "REJECTED",
"reason": f"Review exceeds maximum allowed length ({ABSOLUTE_CAP} tokens equivalent). "
"Please shorten your review to under 15,000 words.",
"token_estimate": token_count,
}
# Store with metadata
table = dynamodb.Table("mangaassist-user-profiles")
table.update_item(
Key={"user_id": user_id},
UpdateExpression="SET latest_review = :r, review_token_estimate = :t, review_updated_at = :ts",
ExpressionAttributeValues={
":r": raw_review,
":t": token_count,
":ts": datetime.now(timezone.utc).isoformat(),
},
)
return {"status": "STORED", "token_estimate": token_count}
def _handle_review_inject(user_id: str, raw_review: str) -> dict:
"""Prepare review snippet safe for prompt injection within token budget."""
review_snippet, was_truncated = truncate_to_token_budget(raw_review, MAX_REVIEW_TOKENS)
actual_tokens = estimate_tokens(review_snippet)
if was_truncated:
_emit_metric("review_injection_truncations", 1)
logger.warning(f"Review truncated for injection: user_id={user_id}, original_tokens={estimate_tokens(raw_review)}, injected_tokens={actual_tokens}")
return {
"review_snippet": review_snippet,
"was_truncated": was_truncated,
"injected_token_estimate": actual_tokens,
"remaining_budget": PROMPT_BUDGET - actual_tokens,
}
def _emit_metric(metric_name: str, value: float) -> None:
cloudwatch.put_metric_data(
Namespace=NAMESPACE,
MetricData=[{
"MetricName": metric_name,
"Value": value,
"Unit": "Count",
"Timestamp": datetime.now(timezone.utc),
}],
)
def _send_alert(user_id: str, token_count: int, cap: int, reason: str) -> None:
message = {
"alert": "User review token overflow",
"user_id": user_id,
"token_estimate": token_count,
"cap": cap,
"reason": reason,
"action": "Review rejected/truncated before FM injection. Investigate for abuse pattern.",
}
sns.publish(TopicArn=SNS_TOPIC_ARN, Message=json.dumps(message, indent=2))
Prevention
- Enforce a hard character limit (e.g., 60,000 characters ≈ 15,000 tokens) at the API Gateway request validator level using a JSON Schema
maxLengthconstraint on thereview_textfield — malformed requests are rejected before Lambda is invoked. - Define and document a token budget allocation table for every prompt template that injects user-controlled content; review tokens must be explicitly budgeted and the system prompt must always be the first injected component.
- Use Amazon Bedrock's
ConverseAPIsystemparameter (separate frommessages) so that system instructions are structurally separated from user content and cannot be displaced by token overflow in the message array. - Set up a CloudWatch alarm on
review_token_overflow_count >= 5 in 1 hourto detect potential prompt injection or abuse patterns from users attempting to overwhelm the context window. - Add a canary test that weekly submits a 4,000-token review via the recommendation API and asserts that the system prompt content is present verbatim in the model's reasoning trace — alerting if safety instructions are missing from any prompt invocation.