LOCAL PREVIEW View on GitHub

Scenarios and Runbooks — Data Validation Workflows

MangaAssist context: JP Manga store chatbot on AWS — Bedrock Claude 3 (Sonnet at $3/$15 per 1M tokens input/output, Haiku at $0.25/$1.25), OpenSearch Serverless (vector store), DynamoDB (sessions/products), ECS Fargate (orchestrator), API Gateway WebSocket, ElastiCache Redis. Target: useful answer in under 3 seconds, 1M messages/day scale.


Skill Mapping

Dimension Detail
Certification AWS AIP-C01 — AI Practitioner
Domain 1 — Foundation Model Integration, Data Management, and Compliance
Task 1.3 — Implement data validation and processing pipelines for FM consumption
Skill 1.3.1 — Create comprehensive data validation workflows to ensure data meets quality standards for FM consumption
This File Five production scenarios with detection flowcharts, root cause analysis, resolution code, and prevention strategies

Skill Scope Statement

This file presents five real-world failure scenarios that MangaAssist has encountered (or would encounter) in production when ingesting data into FM-backed pipelines. Each scenario exposes a distinct data quality failure mode — duplicates, nulls, format inconsistency, silent pipeline failures, and unbounded token injection. Each runbook includes: a problem statement, a mermaid detection flowchart, root cause analysis, Python resolution code using boto3, and prevention measures. These runbooks are designed for on-call engineers responding to data quality alerts before or after FM consumption.


Mind Map — Data Validation Failure Modes

mindmap
  root((Data Validation<br/>Failure Scenarios))
    Duplicate ASINs
      Contradictory FM Answers
      Duplicate Embeddings
      Knowledge Base Drift
    Null Content Fields
      Schema Pass Silent Failure
      FM Context Starvation
      Hallucinated Answers
    Mixed Date Formats
      ISO 8601 vs MM/DD/YYYY
      Freetext Dates
      FM Misinterpretation
    Silent Glue DQ Failure
      DQ Score Below Threshold
      Pipeline Continues
      Invalid Data in OpenSearch
    Unbounded Review Tokens
      50K Token Injection
      Prompt Truncation
      System Instructions Dropped

Scenario Overview

# Scenario Severity Blast Radius Typical Detection Time
1 Duplicate ASINs in product catalog CSV — FM gives contradictory price answers P2 — High All users querying affected products 5-15 minutes via answer consistency audit
2 Null/empty FAQ content fields pass schema check — FM hallucinates answers P2 — High Any user on topic covered by empty FAQ doc Post-hoc via quality audit or user report
3 Mixed date formats in manga metadata — FM misinterprets release dates P3 — Medium Recommendations for date-sensitive queries Post-hoc via date comparison test suite
4 AWS Glue Data Quality job fails silently — invalid data reaches OpenSearch P1 — Critical All users querying contaminated index segment 10-30 minutes via DQ score CloudWatch alarm
5 User review data injected without length validation — 50K tokens truncate system instructions P1 — Critical Any session with user-submitted context injection Immediate via token count alarm or user report

Scenario 1: Duplicate ASINs in Product Catalog — FM Gives Contradictory Price Answers

Problem

MangaAssist ingests a weekly product catalog CSV from the vendor into a Bedrock Knowledge Base. The CSV contains duplicate ASIN rows with different prices (e.g., two rows for ASIN B08MANGA01 — one at $12.99 and one at $9.99 after a partial sale update). Both rows are chunked and embedded independently into OpenSearch Serverless. When a user asks "How much is Demon Slayer Vol. 3?", the RAG pipeline retrieves both chunks and Claude receives conflicting context, reporting either price depending on retrieval ranking. Users see inconsistent answers across sessions.

Detection

flowchart TD
    A[Product CSV arrives in S3] --> B[Lambda: pre-ingest validation triggered]
    B --> C{ASIN uniqueness check}
    C -->|Duplicates found| D[CloudWatch metric: catalog_asin_duplicates > 0]
    C -->|No duplicates| E[Proceed to Bedrock Knowledge Base sync]
    D --> F[SNS Alert: DataQuality-ProductCatalog]
    F --> G[On-call engineer notified]
    G --> H{Manual review}
    H -->|Deduplicate and re-upload| I[Re-trigger Knowledge Base sync]
    H -->|Escalate to vendor| J[Vendor corrects source feed]
    I --> E

Root Cause

The vendor's catalog export system performs incremental updates by appending new rows rather than updating existing rows in place. When a price change event fires, the old row is not deleted before the new row is appended. The Lambda pre-ingest validator checked for required columns (asin, title, price, availability) and correct data types, but did not enforce uniqueness constraints on the asin field. Once duplicate embeddings entered OpenSearch, the Knowledge Base sync had no deduplication mechanism — both chunks survived retrieval scoring equally.

Resolution

import boto3
import csv
import io
import json
import logging
from collections import defaultdict
from datetime import datetime, timezone

logger = logging.getLogger()
logger.setLevel(logging.INFO)

s3 = boto3.client("s3")
cloudwatch = boto3.client("cloudwatch")
sns = boto3.client("sns")

SNS_TOPIC_ARN = "arn:aws:sns:us-east-1:123456789012:DataQuality-ProductCatalog"
NAMESPACE = "MangaAssist/DataQuality"


def lambda_handler(event, context):
    """Validate product catalog CSV for duplicate ASINs before KB sync."""
    bucket = event["Records"][0]["s3"]["bucket"]["name"]
    key = event["Records"][0]["s3"]["object"]["key"]

    response = s3.get_object(Bucket=bucket, Key=key)
    raw_csv = response["Body"].read().decode("utf-8")

    reader = csv.DictReader(io.StringIO(raw_csv))
    rows = list(reader)

    asin_index = defaultdict(list)
    for i, row in enumerate(rows):
        asin = row.get("asin", "").strip()
        if asin:
            asin_index[asin].append(i)

    duplicates = {asin: idxs for asin, idxs in asin_index.items() if len(idxs) > 1}
    dup_count = len(duplicates)

    _emit_metric("catalog_asin_duplicates", dup_count)

    if dup_count == 0:
        logger.info("No duplicate ASINs found. Proceeding to KB sync.")
        return {"status": "clean", "rows": len(rows)}

    logger.warning(f"Found {dup_count} duplicate ASINs: {list(duplicates.keys())[:10]}")

    # Deduplication: keep last occurrence (most recent price update)
    seen = {}
    deduped_rows = []
    for row in reversed(rows):
        asin = row.get("asin", "").strip()
        if asin not in seen:
            seen[asin] = True
            deduped_rows.insert(0, row)

    # Write deduplicated file back to S3 under a quarantine-safe prefix
    deduped_key = key.replace("raw/", "validated/")
    output = io.StringIO()
    writer = csv.DictWriter(output, fieldnames=reader.fieldnames)
    writer.writeheader()
    writer.writerows(deduped_rows)

    s3.put_object(
        Bucket=bucket,
        Key=deduped_key,
        Body=output.getvalue().encode("utf-8"),
        ContentType="text/csv",
    )

    _send_alert(duplicates, bucket, key, deduped_key)

    return {
        "status": "deduped",
        "original_rows": len(rows),
        "deduped_rows": len(deduped_rows),
        "duplicate_asins": dup_count,
        "clean_file": f"s3://{bucket}/{deduped_key}",
    }


def _emit_metric(metric_name: str, value: float) -> None:
    cloudwatch.put_metric_data(
        Namespace=NAMESPACE,
        MetricData=[{
            "MetricName": metric_name,
            "Value": value,
            "Unit": "Count",
            "Timestamp": datetime.now(timezone.utc),
        }],
    )


def _send_alert(duplicates: dict, bucket: str, raw_key: str, clean_key: str) -> None:
    message = {
        "alert": "Duplicate ASINs detected in product catalog",
        "duplicate_count": len(duplicates),
        "sample_duplicates": list(duplicates.keys())[:5],
        "raw_file": f"s3://{bucket}/{raw_key}",
        "clean_file": f"s3://{bucket}/{clean_key}",
        "action": "Deduplication applied (last-row-wins). Verify prices before KB sync.",
    }
    sns.publish(TopicArn=SNS_TOPIC_ARN, Message=json.dumps(message, indent=2))

Prevention

  • Enforce UNIQUENESS rule in AWS Glue Data Quality ruleset on the asin column with threshold >= 1.0 before any S3-to-KB sync.
  • Add a pre-sync Lambda gate: if catalog_asin_duplicates > 0 CloudWatch alarm is ALARM, block the Bedrock Knowledge Base sync Step Functions workflow.
  • Configure the Knowledge Base ingestion to use ASIN as the document ID so a re-sync of the same ASIN overwrites rather than appends the embedding.
  • Set up a weekly reconciliation job that compares the live OpenSearch document count against the deduplicated catalog row count and alerts on discrepancies.
  • Publish a vendor SLA requirement that catalog exports must pass ASIN uniqueness validation before acceptance; reject non-conforming files at the S3 bucket policy level using an EventBridge rule.

Scenario 2: Null/Empty FAQ Content Fields Pass Schema Check — FM Hallucinates Answers

Problem

MangaAssist's FAQ knowledge base is maintained by the content team in a JSON file. The schema validator checks that each document has the fields id, question, and answer. After a CMS export bug, 23 FAQ documents were exported with answer: "" (empty string). The schema check passed because the field existed and was a string. These documents were chunked and embedded as single-character or empty vectors. When users asked questions that matched those FAQ topics (e.g., "What is your return policy for damaged manga?"), the RAG retriever returned the empty-answer chunk as the top result with high cosine similarity to the question. Claude received context like Answer: and, lacking any substantive information, invented a plausible-sounding but incorrect return policy. Users received misinformation; one user incorrectly believed damaged items were eligible for full refund.

Detection

flowchart TD
    A[FAQ JSON uploaded to S3] --> B[Lambda: content-depth validator triggered]
    B --> C{Answer field length check}
    C -->|answer.strip length == 0| D[Tag document: EMPTY_CONTENT]
    C -->|answer.strip length < 50 chars| E[Tag document: THIN_CONTENT warning]
    C -->|answer.strip length >= 50 chars| F[Document passes content depth]
    D --> G[CloudWatch metric: faq_empty_content_count]
    E --> H[CloudWatch metric: faq_thin_content_count]
    G --> I{Empty count > 0?}
    I -->|Yes| J[Block KB sync — SNS alert to content team]
    I -->|No| K{Thin count > threshold?}
    K -->|Yes| L[Soft alert — allow sync with warning tag]
    K -->|No| F
    F --> M[Proceed to Bedrock Knowledge Base sync]

Root Cause

The CMS export pipeline serialized FAQ entries directly from the database, where answer was stored as NULL in the RDS table. The Python export script used row.get("answer", "") which silently converted NULL to empty string. The downstream schema validator used jsonschema with type: string — empty string is a valid string. No minimum-length validator was applied. The embedding model (amazon.titan-embed-text-v2) embedded the empty string as a near-zero vector that nonetheless had high cosine similarity with any query about the topic (the question field still provided semantic signal). This is a structural failure: schema validity is necessary but not sufficient for semantic content quality.

Resolution

import boto3
import json
import logging
from datetime import datetime, timezone
from typing import Any

logger = logging.getLogger()
logger.setLevel(logging.INFO)

s3 = boto3.client("s3")
cloudwatch = boto3.client("cloudwatch")
sns = boto3.client("sns")

SNS_TOPIC_ARN = "arn:aws:sns:us-east-1:123456789012:DataQuality-FAQ"
NAMESPACE = "MangaAssist/DataQuality"
MIN_ANSWER_LENGTH = 50  # characters — below this indicates thin/stub content
EMPTY_ANSWER_BLOCK = True  # hard block on empty answers


def lambda_handler(event: dict, context: Any) -> dict:
    """Validate FAQ JSON for empty/thin content before KB sync."""
    bucket = event["Records"][0]["s3"]["bucket"]["name"]
    key = event["Records"][0]["s3"]["object"]["key"]

    response = s3.get_object(Bucket=bucket, Key=key)
    faq_docs = json.loads(response["Body"].read().decode("utf-8"))

    empty_ids, thin_ids, valid_docs = [], [], []

    for doc in faq_docs:
        doc_id = doc.get("id", "unknown")
        answer = (doc.get("answer") or "").strip()

        if len(answer) == 0:
            empty_ids.append(doc_id)
            logger.warning(f"Empty answer — doc_id={doc_id}")
        elif len(answer) < MIN_ANSWER_LENGTH:
            thin_ids.append(doc_id)
            logger.warning(f"Thin answer ({len(answer)} chars) — doc_id={doc_id}")
            valid_docs.append(doc)
        else:
            valid_docs.append(doc)

    _emit_metric("faq_empty_content_count", len(empty_ids))
    _emit_metric("faq_thin_content_count", len(thin_ids))
    _emit_metric("faq_valid_doc_count", len(valid_docs))

    if empty_ids and EMPTY_ANSWER_BLOCK:
        _send_alert(empty_ids, thin_ids, bucket, key)
        return {
            "status": "BLOCKED",
            "reason": "Empty answer fields detected",
            "empty_ids": empty_ids,
            "thin_ids": thin_ids,
            "valid_count": len(valid_docs),
        }

    # Write only valid + thin (warned) docs to validated prefix
    validated_key = key.replace("raw/", "validated/")
    s3.put_object(
        Bucket=bucket,
        Key=validated_key,
        Body=json.dumps(valid_docs, ensure_ascii=False, indent=2).encode("utf-8"),
        ContentType="application/json",
    )

    if thin_ids:
        _send_alert([], thin_ids, bucket, key)

    return {
        "status": "PARTIAL" if thin_ids else "CLEAN",
        "empty_blocked": len(empty_ids),
        "thin_warned": len(thin_ids),
        "valid_count": len(valid_docs),
        "clean_file": f"s3://{bucket}/{validated_key}",
    }


def _emit_metric(metric_name: str, value: float) -> None:
    cloudwatch.put_metric_data(
        Namespace=NAMESPACE,
        MetricData=[{
            "MetricName": metric_name,
            "Value": value,
            "Unit": "Count",
            "Timestamp": datetime.now(timezone.utc),
        }],
    )


def _send_alert(empty_ids: list, thin_ids: list, bucket: str, key: str) -> None:
    message = {
        "alert": "FAQ content quality failure",
        "empty_answer_ids": empty_ids,
        "thin_answer_ids": thin_ids,
        "source_file": f"s3://{bucket}/{key}",
        "action": (
            "KB sync BLOCKED due to empty answers. "
            "Content team must populate empty answers before re-upload."
        ),
    }
    sns.publish(TopicArn=SNS_TOPIC_ARN, Message=json.dumps(message, indent=2))

Prevention

  • Add minLength: 50 to the JSON Schema definition for the answer field so schema validation itself catches empty and stub answers.
  • Configure an AWS Glue Data Quality ruleset with a ColumnLength rule: ColumnLength "answer" >= 50 with completeness threshold = 1.0.
  • In the CMS export pipeline, replace row.get("answer", "") with explicit NULL handling that tags the record as DRAFT and excludes it from export until the field is populated.
  • Create a CloudWatch alarm on faq_empty_content_count >= 1 that acts as a gate in the Step Functions KB sync workflow — any non-zero count stops the sync.
  • Run a monthly content audit Lambda that scans the live Knowledge Base index for short-embedding documents and flags them for content team review.

Scenario 3: Mixed Date Formats in Manga Metadata — FM Misinterprets Release Dates

Problem

MangaAssist ingests manga metadata from three upstream sources: the publisher API (returns ISO 8601 dates: 2024-03-15), a legacy internal database export (returns MM/DD/YYYY: 03/15/2024), and a web-scraped supplemental feed (returns freetext: Released: March 2024, Mar '24, Spring 2024). All three are merged into a single metadata JSON and embedded into OpenSearch. When users ask "What manga came out this month?" or "Is there anything newer than Attack on Titan Vol. 35?", Claude receives context with heterogeneous date strings and interprets them inconsistently. In one observed case, 03/15/2024 was parsed by Claude as March 15 in one response and as the 3rd day of the 15th month (invalid) in another, producing contradictory recency rankings.

Detection

flowchart TD
    A[Manga metadata JSON arrives in S3] --> B[Lambda: date-format validator]
    B --> C[Extract all release_date field values]
    C --> D{Apply format detection regex}
    D -->|Matches ISO 8601 only| E[Mark: STANDARD]
    D -->|Matches MM/DD/YYYY| F[Mark: LEGACY_US — convert]
    D -->|Matches freetext pattern| G[Mark: FREETEXT — convert or flag]
    D -->|No pattern matches| H[Mark: UNPARSEABLE — quarantine]
    F --> I[Normalise to ISO 8601]
    G --> J{Parseable freetext?}
    J -->|Yes| I
    J -->|No| K[Mark: UNPARSEABLE]
    H --> L[CloudWatch metric: metadata_unparseable_dates]
    K --> L
    I --> M[Emit metric: metadata_normalised_dates]
    E --> N[Merge normalised records]
    M --> N
    N --> O[Write to validated/ prefix]

Root Cause

No canonical date format was defined when the three data sources were onboarded. Each source team documented their own format, and the merge Lambda simply concatenated records without normalization. The embedding pipeline treated release_date as a plain string field — there was no structured date type in the OpenSearch mapping (it was mapped as text). Claude's date interpretation was therefore entirely dependent on in-context string patterns, which are ambiguous for MM/DD/YYYY (month-first vs day-first is indistinguishable without locale context). Freetext like "Spring 2024" provides only approximate temporal information that different model invocations resolved differently.

Resolution

import boto3
import json
import logging
import re
from datetime import datetime, timezone
from typing import Optional

logger = logging.getLogger()
logger.setLevel(logging.INFO)

s3 = boto3.client("s3")
cloudwatch = boto3.client("cloudwatch")

NAMESPACE = "MangaAssist/DataQuality"

# Regex patterns ordered by specificity
ISO_PATTERN = re.compile(r"^\d{4}-\d{2}-\d{2}$")
US_DATE_PATTERN = re.compile(r"^(\d{1,2})/(\d{1,2})/(\d{4})$")
FREETEXT_MONTH_YEAR = re.compile(
    r"(?:Released:\s*)?([A-Za-z]+\.?)\s*['\"]?(\d{2,4})", re.IGNORECASE
)
MONTH_MAP = {
    "jan": "01", "feb": "02", "mar": "03", "apr": "04",
    "may": "05", "jun": "06", "jul": "07", "aug": "08",
    "sep": "09", "oct": "10", "nov": "11", "dec": "12",
}


def normalise_date(raw: str) -> Optional[str]:
    """Normalise any supported date format to ISO 8601 (YYYY-MM-DD)."""
    if not raw:
        return None
    raw = raw.strip()

    if ISO_PATTERN.match(raw):
        return raw  # Already normalised

    m = US_DATE_PATTERN.match(raw)
    if m:
        month, day, year = m.group(1), m.group(2), m.group(3)
        return f"{year}-{int(month):02d}-{int(day):02d}"

    m = FREETEXT_MONTH_YEAR.search(raw)
    if m:
        month_str = m.group(1).lower().rstrip(".")[:3]
        year_str = m.group(2)
        if len(year_str) == 2:
            year_str = "20" + year_str
        month_num = MONTH_MAP.get(month_str)
        if month_num and len(year_str) == 4:
            return f"{year_str}-{month_num}-01"  # Day unknown: default to 1st

    return None  # Unparseable


def lambda_handler(event: dict, context) -> dict:
    """Normalise release_date fields in manga metadata to ISO 8601."""
    bucket = event["Records"][0]["s3"]["bucket"]["name"]
    key = event["Records"][0]["s3"]["object"]["key"]

    response = s3.get_object(Bucket=bucket, Key=key)
    records = json.loads(response["Body"].read().decode("utf-8"))

    normalised_count = 0
    unparseable_ids = []

    for record in records:
        raw_date = record.get("release_date", "")
        normalised = normalise_date(str(raw_date))
        if normalised:
            if normalised != raw_date:
                normalised_count += 1
            record["release_date"] = normalised
            record["release_date_source_format"] = raw_date  # Preserve original
        else:
            unparseable_ids.append(record.get("asin", "unknown"))
            record["release_date"] = None
            record["release_date_parse_error"] = raw_date

    _emit_metric("metadata_normalised_dates", normalised_count)
    _emit_metric("metadata_unparseable_dates", len(unparseable_ids))

    validated_key = key.replace("raw/", "validated/")
    s3.put_object(
        Bucket=bucket,
        Key=validated_key,
        Body=json.dumps(records, ensure_ascii=False, indent=2).encode("utf-8"),
        ContentType="application/json",
    )

    logger.info(f"Normalised: {normalised_count}, Unparseable: {len(unparseable_ids)}")
    return {
        "status": "normalised",
        "normalised_count": normalised_count,
        "unparseable_ids": unparseable_ids,
        "clean_file": f"s3://{bucket}/{validated_key}",
    }


def _emit_metric(metric_name: str, value: float) -> None:
    cloudwatch.put_metric_data(
        Namespace=NAMESPACE,
        MetricData=[{
            "MetricName": metric_name,
            "Value": value,
            "Unit": "Count",
            "Timestamp": datetime.now(timezone.utc),
        }],
    )

Prevention

  • Define a canonical data contract: all upstream sources must deliver release_date as ISO 8601 (YYYY-MM-DD) or the feed is rejected at the API gateway layer.
  • Set the release_date field type in OpenSearch Serverless to date with format: "strict_date_optional_time" so the index itself rejects malformed strings at index time.
  • Add a Glue Data Quality rule ColumnValues "release_date" matches "^\d{4}-\d{2}-\d{2}$" with completeness >= 0.95 to catch residual format violations.
  • Include a date normalization step in the Bedrock Knowledge Base custom transformation Lambda so that even if upstream formats slip through, embeddings always use the normalized value.
  • Emit metadata_unparseable_dates as a CloudWatch alarm that fires an SNS alert to the data-sources team when any record has an unparseable date after normalization.

Scenario 4: AWS Glue Data Quality Job Fails Silently — Invalid Data Reaches OpenSearch Serverless

Problem

MangaAssist runs a weekly AWS Glue ETL job that ingests product and metadata updates into the OpenSearch Serverless knowledge base. A Glue Data Quality (DQ) ruleset was configured to check completeness (>= 0.95), uniqueness, and value ranges. However, the Glue job was configured with DQ_STOP_JOB_ON_FAILURE_OPTIONS = {} (empty — DQ alerts only, no stop). When the DQ score dropped to 0.72 due to a batch of records with missing price and title fields, the job logged a DataQualityEvaluationResult as WARNING but continued execution. The downstream write to the OpenSearch index proceeded with the contaminated batch. Users began receiving answers that cited products with null titles as "Unknown Item" or prices as "Price not available" — causing customer confusion and support ticket spikes.

Detection

flowchart TD
    A[Glue ETL job starts] --> B[DQ ruleset evaluation runs]
    B --> C{DQ score >= threshold?}
    C -->|Yes — score >= 0.95| D[Job continues normally]
    C -->|No — score < 0.95| E[DQ WARNING logged to CloudWatch Logs]
    E --> F{DQ_STOP_JOB_ON_FAILURE configured?}
    F -->|No — silent continue| G[Job writes invalid records to OpenSearch]
    F -->|Yes — job halted| H[Step Functions reports FAILED state]
    G --> I[CloudWatch metric: glue_dq_score below threshold]
    I --> J{Alarm triggered?}
    J -->|No alarm configured| K[Silent contamination — users affected]
    J -->|Alarm configured| L[SNS alert to on-call]
    H --> L
    L --> M[On-call quarantines affected index segment]

Root Cause

The Glue job was initially set up with DQ in WARN mode (not FAIL) to avoid blocking the pipeline during initial tuning. The team intended to switch to FAIL mode after one month but the Jira ticket was deprioritised and never completed. No CloudWatch alarm was configured on the GlueDataQuality.RulesetPassedCount or custom DQ score metric. The Step Functions workflow that orchestrated the Glue job checked only for the Glue job terminal state (SUCCEEDED vs FAILED) — since the Glue job itself succeeded (it ran to completion), the workflow marked the pipeline run as healthy. The DQ evaluation result log entry was written to CloudWatch Logs but no Metric Filter or alarm consumed it.

Resolution

import boto3
import json
import logging
from datetime import datetime, timezone
from typing import Any

logger = logging.getLogger()
logger.setLevel(logging.INFO)

glue = boto3.client("glue")
cloudwatch = boto3.client("cloudwatch")
sns = boto3.client("sns")
stepfunctions = boto3.client("stepfunctions")

SNS_TOPIC_ARN = "arn:aws:sns:us-east-1:123456789012:DataQuality-GlueETL"
NAMESPACE = "MangaAssist/DataQuality"
DQ_PASS_THRESHOLD = 0.95


def lambda_handler(event: dict, context: Any) -> dict:
    """
    Post-Glue-job gate: inspect DQ evaluation results and halt Step Functions
    task token if score is below threshold. Invoked by EventBridge rule on
    Glue job state change to SUCCEEDED.
    """
    job_name = event.get("detail", {}).get("jobName")
    job_run_id = event.get("detail", {}).get("jobRunId")
    task_token = event.get("taskToken")  # From Step Functions .waitForTaskToken

    if not job_name or not job_run_id:
        raise ValueError("Missing jobName or jobRunId in event")

    dq_score = _get_dq_score(job_name, job_run_id)
    _emit_metric("glue_dq_score", dq_score, {"JobName": job_name})

    if dq_score < DQ_PASS_THRESHOLD:
        logger.error(f"DQ score {dq_score:.3f} below threshold {DQ_PASS_THRESHOLD}")
        _send_alert(job_name, job_run_id, dq_score)

        if task_token:
            stepfunctions.send_task_failure(
                taskToken=task_token,
                error="DataQualityThresholdNotMet",
                cause=json.dumps({
                    "job_name": job_name,
                    "job_run_id": job_run_id,
                    "dq_score": dq_score,
                    "threshold": DQ_PASS_THRESHOLD,
                }),
            )
        return {"status": "BLOCKED", "dq_score": dq_score}

    logger.info(f"DQ score {dq_score:.3f} passes threshold. Proceeding.")
    if task_token:
        stepfunctions.send_task_success(
            taskToken=task_token,
            output=json.dumps({"dq_score": dq_score, "status": "PASS"}),
        )
    return {"status": "PASS", "dq_score": dq_score}


def _get_dq_score(job_name: str, job_run_id: str) -> float:
    """Retrieve the most recent DQ evaluation score for a Glue job run."""
    response = glue.get_data_quality_result(ResultId=job_run_id)
    score = response.get("Score", 0.0)
    return float(score)


def _emit_metric(metric_name: str, value: float, dimensions: dict = None) -> None:
    dim_list = [{"Name": k, "Value": v} for k, v in (dimensions or {}).items()]
    cloudwatch.put_metric_data(
        Namespace=NAMESPACE,
        MetricData=[{
            "MetricName": metric_name,
            "Value": value,
            "Unit": "None",
            "Timestamp": datetime.now(timezone.utc),
            "Dimensions": dim_list,
        }],
    )


def _send_alert(job_name: str, job_run_id: str, dq_score: float) -> None:
    message = {
        "alert": "Glue Data Quality threshold not met — OpenSearch sync BLOCKED",
        "job_name": job_name,
        "job_run_id": job_run_id,
        "dq_score": round(dq_score, 4),
        "threshold": DQ_PASS_THRESHOLD,
        "action": "Investigate Glue DQ result in AWS console. Re-run after source data remediation.",
    }
    sns.publish(TopicArn=SNS_TOPIC_ARN, Message=json.dumps(message, indent=2))

Prevention

  • Set DQ_STOP_JOB_ON_FAILURE_OPTIONS = {"StopJobOnFailure": "true"} in the Glue Data Catalog table DQ configuration so the Glue job itself halts on score failure — do not rely solely on post-job Lambda gating.
  • Create a CloudWatch Metric Filter on Glue job CloudWatch Logs that extracts the Score field from DataQualityEvaluationResult log events and publishes it as a custom metric; set an alarm at <= 0.94.
  • In the Step Functions workflow, use .waitForTaskToken with the DQ gate Lambda so that a DQ failure automatically surfaces as a workflow execution failure — visible in the console and triggerable by EventBridge.
  • Establish a data quarantine S3 prefix: on DQ failure, route the contaminated batch to s3://mangaassist-data/quarantine/<timestamp>/ for forensic review without deleting source data.
  • Schedule a quarterly Game Day drill where the team intentionally injects a low-quality batch to verify the full alarm-to-block-to-alert chain fires correctly end to end.

Scenario 5: User Review Data Injected Without Length Validation — 50K-Token Review Truncates System Instructions

Problem

MangaAssist allows users to submit long-form reviews for manga volumes. A new "personalized recommendation" feature was added that injects the user's most recent review into the system prompt context to help Claude personalize recommendations. A power user submitted a 50,000-token review (a detailed chapter-by-chapter analysis of a 50-volume boxset, copy-pasted from their personal blog). The orchestration Lambda appended the full review to the system prompt without any length check. Claude 3 Sonnet's 200K context window accommodated the text, but the combined prompt (system instructions + user review + conversation history + RAG chunks) exceeded the token budget designed for RAG retrieval. Token budget overflow caused the RAG chunks and — critically — the last portion of the system instructions (including the safety rails, response format directives, and refusal policy) to be silently truncated by the context window packing logic. The model responded without safety guidelines, producing off-topic and policy-violating content in one session.

Detection

flowchart TD
    A[User submits review data] --> B[Lambda: review ingestion handler]
    B --> C[Count tokens using Bedrock tokenizer]
    C --> D{Token count <= MAX_REVIEW_TOKENS?}
    D -->|Yes| E[Store review in DynamoDB user profile]
    D -->|No| F[Truncate review to MAX_REVIEW_TOKENS]
    F --> G[CloudWatch metric: review_token_overflow_count]
    G --> H[Log user_id and token count]
    E --> I[Recommendation orchestrator builds prompt]
    I --> J[Calculate total prompt token budget]
    J --> K{Total tokens <= PROMPT_BUDGET?}
    K -->|Yes| L[Invoke Bedrock Claude]
    K -->|No| M[Drop user review from context — use summary only]
    M --> N[CloudWatch metric: prompt_budget_exceeded]
    N --> L
    L --> O[Return recommendation to user]

Root Cause

The feature was developed with the assumption that user reviews would be short (a few hundred words). No token budget analysis was performed when the feature was designed. The orchestration code used f-string concatenation to build the prompt and passed the raw review_text field from DynamoDB directly into the prompt template. DynamoDB's 400KB item size limit was the only implicit cap — a 50K-token review at ~4 chars/token is approximately 200KB, which fits within DynamoDB's limit. The Lambda had no awareness of Claude's effective context budget for the recommendation use case (system prompt: ~2K tokens, RAG chunks: ~4K tokens, conversation history: ~2K tokens, model response reservation: ~1K tokens — leaving only ~191K tokens nominally "available" but the feature ticket never set a review budget). The context window packing logic in the Converse API silently dropped trailing content when total tokens exceeded the model limit reservation, with no exception raised.

Resolution

import boto3
import json
import logging
import re
from datetime import datetime, timezone
from typing import Optional

logger = logging.getLogger()
logger.setLevel(logging.INFO)

bedrock_runtime = boto3.client("bedrock-runtime", region_name="us-east-1")
dynamodb = boto3.resource("dynamodb")
cloudwatch = boto3.client("cloudwatch")
sns = boto3.client("sns")

NAMESPACE = "MangaAssist/DataQuality"
SNS_TOPIC_ARN = "arn:aws:sns:us-east-1:123456789012:DataQuality-TokenBudget"

# Token budget allocation for recommendation prompt
SYSTEM_PROMPT_TOKENS = 2_000
RAG_CHUNK_TOKENS = 4_000
CONVERSATION_HISTORY_TOKENS = 2_000
RESPONSE_RESERVATION_TOKENS = 1_000
MAX_REVIEW_TOKENS = 3_000   # Hard cap for user-submitted review content
TOTAL_BUDGET = 200_000      # Claude 3 Sonnet context window

PROMPT_BUDGET = (
    TOTAL_BUDGET
    - SYSTEM_PROMPT_TOKENS
    - RAG_CHUNK_TOKENS
    - CONVERSATION_HISTORY_TOKENS
    - RESPONSE_RESERVATION_TOKENS
)


def estimate_tokens(text: str) -> int:
    """Conservative token estimate: 1 token per 3.5 characters (multilingual)."""
    return max(1, int(len(text) / 3.5))


def truncate_to_token_budget(text: str, max_tokens: int) -> tuple[str, bool]:
    """Truncate text to stay within token budget. Returns (text, was_truncated)."""
    if estimate_tokens(text) <= max_tokens:
        return text, False
    # Truncate by char count approximation
    max_chars = max_tokens * 3  # Conservative: 3 chars per token
    return text[:max_chars] + "\n\n[Review truncated for context budget.]", True


def lambda_handler(event: dict, context) -> dict:
    """Validate and budget user review before injection into FM recommendation prompt."""
    user_id = event.get("user_id")
    raw_review = event.get("review_text", "")
    action = event.get("action", "ingest")  # "ingest" or "inject"

    if action == "ingest":
        return _handle_review_ingest(user_id, raw_review)
    elif action == "inject":
        return _handle_review_inject(user_id, raw_review)
    else:
        raise ValueError(f"Unknown action: {action}")


def _handle_review_ingest(user_id: str, raw_review: str) -> dict:
    """Store review with token count metadata. Reject if exceeds absolute cap."""
    token_count = estimate_tokens(raw_review)
    ABSOLUTE_CAP = 20_000  # Reject outright above this — storage + abuse guard

    _emit_metric("review_submitted_tokens", token_count)

    if token_count > ABSOLUTE_CAP:
        _emit_metric("review_token_overflow_count", 1)
        _send_alert(user_id, token_count, ABSOLUTE_CAP, "REJECTED_AT_INGEST")
        logger.error(f"Review rejected: user_id={user_id}, tokens={token_count}")
        return {
            "status": "REJECTED",
            "reason": f"Review exceeds maximum allowed length ({ABSOLUTE_CAP} tokens equivalent). "
                      "Please shorten your review to under 15,000 words.",
            "token_estimate": token_count,
        }

    # Store with metadata
    table = dynamodb.Table("mangaassist-user-profiles")
    table.update_item(
        Key={"user_id": user_id},
        UpdateExpression="SET latest_review = :r, review_token_estimate = :t, review_updated_at = :ts",
        ExpressionAttributeValues={
            ":r": raw_review,
            ":t": token_count,
            ":ts": datetime.now(timezone.utc).isoformat(),
        },
    )
    return {"status": "STORED", "token_estimate": token_count}


def _handle_review_inject(user_id: str, raw_review: str) -> dict:
    """Prepare review snippet safe for prompt injection within token budget."""
    review_snippet, was_truncated = truncate_to_token_budget(raw_review, MAX_REVIEW_TOKENS)
    actual_tokens = estimate_tokens(review_snippet)

    if was_truncated:
        _emit_metric("review_injection_truncations", 1)
        logger.warning(f"Review truncated for injection: user_id={user_id}, original_tokens={estimate_tokens(raw_review)}, injected_tokens={actual_tokens}")

    return {
        "review_snippet": review_snippet,
        "was_truncated": was_truncated,
        "injected_token_estimate": actual_tokens,
        "remaining_budget": PROMPT_BUDGET - actual_tokens,
    }


def _emit_metric(metric_name: str, value: float) -> None:
    cloudwatch.put_metric_data(
        Namespace=NAMESPACE,
        MetricData=[{
            "MetricName": metric_name,
            "Value": value,
            "Unit": "Count",
            "Timestamp": datetime.now(timezone.utc),
        }],
    )


def _send_alert(user_id: str, token_count: int, cap: int, reason: str) -> None:
    message = {
        "alert": "User review token overflow",
        "user_id": user_id,
        "token_estimate": token_count,
        "cap": cap,
        "reason": reason,
        "action": "Review rejected/truncated before FM injection. Investigate for abuse pattern.",
    }
    sns.publish(TopicArn=SNS_TOPIC_ARN, Message=json.dumps(message, indent=2))

Prevention

  • Enforce a hard character limit (e.g., 60,000 characters ≈ 15,000 tokens) at the API Gateway request validator level using a JSON Schema maxLength constraint on the review_text field — malformed requests are rejected before Lambda is invoked.
  • Define and document a token budget allocation table for every prompt template that injects user-controlled content; review tokens must be explicitly budgeted and the system prompt must always be the first injected component.
  • Use Amazon Bedrock's Converse API system parameter (separate from messages) so that system instructions are structurally separated from user content and cannot be displaced by token overflow in the message array.
  • Set up a CloudWatch alarm on review_token_overflow_count >= 5 in 1 hour to detect potential prompt injection or abuse patterns from users attempting to overwhelm the context window.
  • Add a canary test that weekly submits a 4,000-token review via the recommendation API and asserts that the system prompt content is present verbatim in the model's reasoning trace — alerting if safety instructions are missing from any prompt invocation.