LOCAL PREVIEW View on GitHub

LLMOps User Stories — Implementation Details & Code Snippets

Practical implementation summaries and code snippets for every LLMOps user story in MangaAssist. Each section maps to a user story, describes the implementation approach, lists the AWS services and libraries involved, and includes working code.


Epic 1: Model Lifecycle Management

US-1.1 — Model Registry and Versioning

Implementation: Use MLflow Model Registry hosted on ECS Fargate with PostgreSQL RDS for metadata and S3 for artifacts. Every model (DistilBERT, reranker, embedding config, Claude prompt version) is registered with training metadata and evaluation metrics before deployment.

Services: MLflow, S3, PostgreSQL RDS, ECS Fargate

import mlflow
from mlflow.tracking import MlflowClient

client = MlflowClient(tracking_uri="http://mlflow.internal.manga-assist.amazon.com")

# Register a new intent classifier version after training
with mlflow.start_run(run_name="intent-classifier-v3.2") as run:
    mlflow.log_params({
        "model_type": "distilbert-base-uncased",
        "training_samples": 55000,
        "epochs": 5,
        "learning_rate": 2e-5,
        "dataset_hash": "sha256:a3f8c1...",
    })
    mlflow.log_metrics({
        "accuracy": 0.923,
        "f1_macro": 0.891,
        "f1_recommendation": 0.912,
        "f1_order_tracking": 0.956,
        "p99_latency_ms": 48,
    })
    mlflow.pytorch.log_model(
        model,
        artifact_path="intent-classifier",
        registered_model_name="manga-intent-classifier",
    )

# Promote to staging after offline eval passes
client.transition_model_version_stage(
    name="manga-intent-classifier",
    version=12,
    stage="Staging",
    archive_existing_versions=False,
)

US-1.2 — Automated Model Deployment Pipeline

Implementation: AWS CDK defines SageMaker endpoints as infrastructure. CodePipeline orchestrates: artifact build → golden dataset eval → deploy to staging → integration test → promote to production. Model promotion in MLflow Registry triggers the pipeline via EventBridge.

Services: AWS CDK, CodePipeline, CodeBuild, SageMaker, EventBridge

# cdk_stack.py — SageMaker endpoint definition (AWS CDK)
from aws_cdk import aws_sagemaker as sagemaker, Stack
from constructs import Construct

class IntentClassifierStack(Stack):
    def __init__(self, scope: Construct, id: str, model_data_url: str, **kwargs):
        super().__init__(scope, id, **kwargs)

        model = sagemaker.CfnModel(self, "IntentModel",
            model_name="manga-intent-classifier",
            execution_role_arn="arn:aws:iam::role/SageMakerExecRole",
            primary_container=sagemaker.CfnModel.ContainerDefinitionProperty(
                image="763104351884.dkr.ecr.us-east-1.amazonaws.com/pytorch-inference:2.1-gpu",
                model_data_url=model_data_url,
                environment={
                    "SAGEMAKER_PROGRAM": "inference.py",
                    "MODEL_NAME": "distilbert-manga-intent",
                },
            ),
        )

        endpoint_config = sagemaker.CfnEndpointConfig(self, "EndpointConfig",
            production_variants=[sagemaker.CfnEndpointConfig.ProductionVariantProperty(
                variant_name="primary",
                model_name=model.model_name,
                instance_type="ml.inf1.xlarge",
                initial_instance_count=2,
            )],
        )

        sagemaker.CfnEndpoint(self, "Endpoint",
            endpoint_name="manga-intent-classifier-prod",
            endpoint_config_name=endpoint_config.attr_endpoint_config_name,
        )
# buildspec-eval.yml — CodeBuild step that runs golden dataset evaluation
version: 0.2
phases:
  install:
    commands:
      - pip install boto3 mlflow bert-score rouge-score
  build:
    commands:
      - python evaluate_golden_dataset.py
        --endpoint manga-intent-classifier-staging
        --dataset s3://manga-assist-eval/golden-dataset-v4.json
        --output-dir eval-results/
      - python check_eval_gates.py --results-dir eval-results/
  post_build:
    commands:
      - |
        if [ $CODEBUILD_BUILD_SUCCEEDING -eq 1 ]; then
          echo "All evaluation gates passed — promoting to production"
          python promote_model.py --stage Production
        fi
artifacts:
  files:
    - eval-results/**/*

US-1.3 — One-Click Model Rollback

Implementation: SageMaker blue-green deployment using production variants. Rollback shifts 100% traffic back to the previous variant. A Lambda function triggered by CloudWatch alarm or manual invocation performs the switch and re-runs the golden dataset.

Services: SageMaker, Lambda, CloudWatch Alarms, SNS

import boto3

sagemaker_client = boto3.client("sagemaker")
sns_client = boto3.client("sns")

def rollback_endpoint(endpoint_name: str, previous_variant: str):
    """Shift 100% traffic to the previous model variant."""

    sagemaker_client.update_endpoint_weights_and_capacities(
        EndpointName=endpoint_name,
        DesiredWeightsAndCapacities=[
            {"VariantName": "current", "DesiredWeight": 0},
            {"VariantName": previous_variant, "DesiredWeight": 1},
        ],
    )

    sns_client.publish(
        TopicArn="arn:aws:sns:us-east-1:ACCOUNT:manga-assist-alerts",
        Subject=f"ROLLBACK: {endpoint_name}",
        Message=(
            f"Endpoint {endpoint_name} rolled back to variant {previous_variant}. "
            "Golden dataset re-evaluation triggered automatically."
        ),
    )

    # Trigger golden dataset re-evaluation on the restored variant
    invoke_evaluation_pipeline(endpoint_name, variant=previous_variant)

US-1.4 — Multi-Model Endpoint Consolidation

Implementation: SageMaker Multi-Model Endpoint (MME) hosts both DistilBERT and the cross-encoder reranker on shared ml.g4dn.xlarge instances. The inference container routes requests based on the TargetModel header. Each model has independent CloudWatch metrics via custom dimensions.

Services: SageMaker MME, CloudWatch Custom Metrics

import boto3

runtime = boto3.client("sagemaker-runtime")

def invoke_intent_classifier(message: str) -> dict:
    response = runtime.invoke_endpoint(
        EndpointName="manga-shared-ml-endpoint",
        TargetModel="intent-classifier/model.tar.gz",  # Routes to DistilBERT
        ContentType="application/json",
        Body=json.dumps({"text": message}),
    )
    return json.loads(response["Body"].read())

def invoke_reranker(query: str, documents: list[str]) -> list[float]:
    response = runtime.invoke_endpoint(
        EndpointName="manga-shared-ml-endpoint",
        TargetModel="reranker/model.tar.gz",  # Routes to cross-encoder
        ContentType="application/json",
        Body=json.dumps({"query": query, "documents": documents}),
    )
    return json.loads(response["Body"].read())["scores"]

US-1.5 — Inferentia Migration

Implementation: Compile DistilBERT to Neuron SDK format using torch-neuronx. Deploy on ml.inf1.xlarge SageMaker endpoints. Validate by running golden dataset on the compiled model and comparing accuracy/latency against the GPU baseline.

Services: AWS Neuron SDK, SageMaker (Inferentia), S3

import torch
import torch_neuronx
from transformers import DistilBertForSequenceClassification, DistilBertTokenizer

# Load trained model
model = DistilBertForSequenceClassification.from_pretrained("./manga-intent-v3.2")
tokenizer = DistilBertTokenizer.from_pretrained("./manga-intent-v3.2")
model.eval()

# Create sample input for tracing
sample = tokenizer("recommend dark fantasy manga", return_tensors="pt",
                    max_length=128, padding="max_length", truncation=True)
sample_inputs = (sample["input_ids"], sample["attention_mask"])

# Compile for Inferentia
neuron_model = torch_neuronx.trace(model, sample_inputs)
neuron_model.save("manga_intent_neuron.pt")

# Benchmark: verify latency and accuracy
import time
start = time.perf_counter()
for _ in range(1000):
    neuron_model(*sample_inputs)
avg_ms = (time.perf_counter() - start)  # ~8ms avg vs ~15ms on GPU
print(f"Avg inference: {avg_ms:.1f}ms")

Epic 2: Prompt Lifecycle Management

US-2.1 — Prompt Versioning and Source Control

Implementation: Prompts are stored in a prompts/ directory in the MangaAssist repo. Each prompt file uses YAML frontmatter (version, author, description) with the template body in Jinja2. Changes go through PR review. A CHANGELOG.md tracks version history. AWS AppConfig serves the active prompt version to the orchestrator at runtime.

Services: Git/GitHub, AWS AppConfig

# prompts/recommendation/v2.3.yaml
---
version: "2.3"
author: "sr-prompt-eng"
date: "2026-03-01"
description: "Added explicit length constraint to prevent response inflation after Claude 3.5 upgrade"
eval_run_id: "mlflow-run-abc123"
intent: "recommendation"
---

system: |
  You are MangaAssist, a helpful shopping assistant for the JP Manga store on Amazon.com.
  RULES:
  1. Only recommend products from the PRODUCT DATA section below.
  2. Never invent prices, ASINs, or availability.
  3. Keep recommendations to 2-4 titles with one sentence each explaining why.
  4. Do not use emoji.
  5. Response must be under 200 words.

context_template: |
  USER PROFILE:
  - Prime: {{ user.is_prime }}
  - Locale: {{ user.locale }}
  - Recent browsing: {{ user.recent_asins | join(', ') }}

  PRODUCT DATA:
  {{ product_data_json }}

  RETRIEVED KNOWLEDGE:
  {% for chunk in rag_chunks %}
  [{{ chunk.source_type }}] {{ chunk.content }}
  {% endfor %}

  CONVERSATION HISTORY:
  {% for turn in history[-5:] %}
  {{ turn.role }}: {{ turn.content }}
  {% endfor %}
# Load active prompt version from AppConfig at runtime
import boto3

appconfig = boto3.client("appconfig")

def get_active_prompt(intent: str) -> str:
    response = appconfig.get_configuration(
        Application="manga-assist",
        Environment="production",
        Configuration=f"prompt-{intent}",
        ClientId="orchestrator-node-1",
    )
    return yaml.safe_load(response["Content"].read())

US-2.2 — Automated Prompt Regression Testing

Implementation: A CodeBuild pipeline triggers on any change to prompts/** files. It loads the golden dataset from S3, runs all 500 queries through the full inference pipeline using the candidate prompt, computes evaluation metrics, and posts results as a PR comment. PR merge is blocked on gate failures.

Services: CodeBuild, CodePipeline, S3, Bedrock, GitHub API

# evaluate_golden_dataset.py
import json
import boto3
from bert_score import score as bert_score
from rouge_score import rouge_scorer

def evaluate(endpoint: str, dataset_path: str) -> dict:
    with open(dataset_path) as f:
        golden = json.load(f)

    results = {"intent_correct": 0, "bertscore_sum": 0, "guardrail_pass": 0,
               "prohibited_violations": 0, "total": len(golden)}
    scorer = rouge_scorer.RougeScorer(["rougeL"], use_stemmer=True)

    for entry in golden:
        # Run through full pipeline
        response = invoke_pipeline(endpoint, entry["query"], entry["context"])

        # Check intent accuracy
        if response["classified_intent"] == entry["expected_intent"]:
            results["intent_correct"] += 1

        # BERTScore against reference
        P, R, F1 = bert_score(
            [response["text"]], [entry["reference_response"]], lang="en"
        )
        results["bertscore_sum"] += F1.item()

        # Prohibited element check
        for prohibited in entry.get("prohibited_elements", []):
            if prohibited.lower() in response["text"].lower():
                results["prohibited_violations"] += 1

        # Guardrail pass
        if response["guardrail_passed"]:
            results["guardrail_pass"] += 1

    # Compute final metrics
    n = results["total"]
    return {
        "intent_accuracy": results["intent_correct"] / n,
        "avg_bertscore": results["bertscore_sum"] / n,
        "guardrail_pass_rate": results["guardrail_pass"] / n,
        "prohibited_violations": results["prohibited_violations"],
    }


def check_gates(metrics: dict) -> bool:
    """Return True if all quality gates pass."""
    gates = {
        "intent_accuracy >= 0.90": metrics["intent_accuracy"] >= 0.90,
        "avg_bertscore >= 0.80": metrics["avg_bertscore"] >= 0.80,
        "guardrail_pass_rate >= 0.95": metrics["guardrail_pass_rate"] >= 0.95,
        "prohibited_violations == 0": metrics["prohibited_violations"] == 0,
    }
    for gate, passed in gates.items():
        status = "PASS" if passed else "FAIL"
        print(f"  [{status}] {gate}")
    return all(gates.values())

US-2.3 — Prompt A/B Testing Framework

Implementation: The orchestrator reads an experiment config from AppConfig that defines traffic splits and variant mappings. Each session is assigned a variant at creation (deterministic hash on session_id). Metrics per variant are emitted to Kinesis and aggregated in Redshift. A Bayesian significance calculator runs nightly.

Services: AWS AppConfig, Kinesis, Redshift, Lambda

import hashlib

# Experiment configuration (loaded from AppConfig)
EXPERIMENT = {
    "id": "exp-prompt-rec-v24",
    "variants": {
        "control": {"prompt_version": "v2.3", "weight": 50},
        "treatment": {"prompt_version": "v2.4", "weight": 50},
    },
    "metrics": ["thumbs_up_rate", "escalation_rate", "conversion_rate"],
}

def assign_variant(session_id: str, experiment: dict) -> str:
    """Deterministic variant assignment based on session_id hash."""
    hash_val = int(hashlib.sha256(session_id.encode()).hexdigest(), 16) % 100
    cumulative = 0
    for variant_name, config in experiment["variants"].items():
        cumulative += config["weight"]
        if hash_val < cumulative:
            return variant_name
    return list(experiment["variants"].keys())[-1]


def get_prompt_for_session(session_id: str, intent: str) -> dict:
    variant = assign_variant(session_id, EXPERIMENT)
    prompt_version = EXPERIMENT["variants"][variant]["prompt_version"]
    return {
        "variant": variant,
        "experiment_id": EXPERIMENT["id"],
        "prompt": load_prompt(intent, prompt_version),
    }


def emit_experiment_metric(session_id: str, variant: str, metric: str, value: float):
    """Send to Kinesis for Redshift aggregation."""
    kinesis.put_record(
        StreamName="manga-assist-experiments",
        Data=json.dumps({
            "experiment_id": EXPERIMENT["id"],
            "variant": variant,
            "session_id": session_id,
            "metric": metric,
            "value": value,
            "timestamp": datetime.utcnow().isoformat(),
        }),
        PartitionKey=session_id,
    )

US-2.4 — Prompt Template Management for Multi-Intent Routing

Implementation: A base system prompt is shared across all intents. Each intent has an override template that adds intent-specific context assembly rules and few-shot examples. The PromptBuilder class merges the base + intent-specific template + runtime context at request time.

Services: AppConfig, S3 (prompt storage)

from jinja2 import Environment, FileSystemLoader

class PromptBuilder:
    def __init__(self, prompts_dir: str = "prompts/"):
        self.env = Environment(loader=FileSystemLoader(prompts_dir))
        self._base = self.env.get_template("base_system.jinja2")

    def build(self, intent: str, context: dict) -> str:
        base_prompt = self._base.render(context)
        intent_template = self.env.get_template(f"{intent}/template.jinja2")
        intent_section = intent_template.render(context)
        return f"{base_prompt}\n\n{intent_section}\n\nUSER: {context['user_message']}"


# Usage in orchestrator
builder = PromptBuilder()
prompt = builder.build(
    intent="recommendation",
    context={
        "user": {"is_prime": True, "locale": "en-US"},
        "product_data_json": catalog_results,
        "rag_chunks": retrieved_chunks,
        "history": conversation_turns[-5:],
        "user_message": "Something like Demon Slayer but darker",
    },
)

Epic 3: Evaluation and Quality Gates

US-3.1 — Golden Dataset Curation and Maintenance

Implementation: The golden dataset is stored as a versioned JSON file in S3. A quarterly curation script identifies stale entries (products delisted, policies changed) and generates candidate replacements from production error logs. A human reviewer approves additions/removals via a simple review UI (Streamlit app).

Services: S3, DynamoDB (error logs), Streamlit, Lambda

import boto3
from datetime import datetime, timedelta

s3 = boto3.client("s3")

def quarterly_refresh(dataset_bucket: str, dataset_key: str):
    """Identify stale golden dataset entries and suggest replacements."""

    dataset = json.loads(
        s3.get_object(Bucket=dataset_bucket, Key=dataset_key)["Body"].read()
    )

    stale_entries = []
    for entry in dataset:
        # Check if referenced products still exist in catalog
        if entry.get("context", {}).get("asin"):
            if not catalog_exists(entry["context"]["asin"]):
                stale_entries.append(entry["query_id"])

    # Pull candidates from production error logs (thumbs-down, escalations)
    candidates = query_error_logs(
        start_date=datetime.utcnow() - timedelta(days=90),
        limit=100,
        min_severity="thumbs_down",
    )

    return {
        "stale_count": len(stale_entries),
        "stale_ids": stale_entries,
        "candidate_replacements": len(candidates),
        "candidates": candidates[:50],  # Top 50 by frequency
    }
// Golden dataset entry schema
{
  "query_id": "GD-042",
  "query": "What dark fantasy manga would you recommend for someone who loved Berserk?",
  "intent": "recommendation",
  "context": {
    "user_profile": {"prime": true, "locale": "en-US"},
    "browsing_history": ["B00GX...", "B01HN..."]
  },
  "expected_intent": "recommendation",
  "reference_response": "Based on your love for Berserk, I'd recommend Vinland Saga and Claymore.",
  "required_elements": ["at least 2 product recommendations", "genre reasoning"],
  "prohibited_elements": ["competitor mentions", "fabricated prices"],
  "tags": ["recommendation", "dark-fantasy", "medium-complexity"]
}

US-3.2 — Offline Evaluation Pipeline (Layer 1)

Implementation: A CodeBuild job runs on every PR that modifies model or prompt artifacts. It loads the golden dataset, invokes the candidate pipeline (staging endpoint), scores each response with automated metrics, and posts a structured evaluation report to the PR. Thresholds are checked programmatically — any failure blocks merge.

Services: CodeBuild, SageMaker (staging endpoints), Bedrock, S3

# Evaluation gate definitions — all must pass to merge
EVALUATION_GATES = {
    "intent_accuracy":       {"threshold": 0.90, "comparator": ">="},
    "avg_bertscore_f1":      {"threshold": 0.80, "comparator": ">="},
    "rouge_l_delta":         {"threshold": 0.10, "comparator": "<="},  # Max 10% drop
    "format_compliance":     {"threshold": 0.95, "comparator": ">="},
    "guardrail_pass_rate":   {"threshold": 0.95, "comparator": ">="},
    "response_length_delta": {"threshold": 0.30, "comparator": "<="},  # Max 30% drift
    "prohibited_violations": {"threshold": 0,    "comparator": "=="},
    "min_class_f1":          {"threshold": 0.85, "comparator": ">="},
}

def run_offline_evaluation(staging_endpoint: str, golden_dataset: list[dict]) -> dict:
    """Run all golden queries and compute metrics."""
    predictions, references, intents_pred, intents_true = [], [], [], []

    for entry in golden_dataset:
        result = invoke_staging_pipeline(staging_endpoint, entry)
        predictions.append(result["response_text"])
        references.append(entry["reference_response"])
        intents_pred.append(result["classified_intent"])
        intents_true.append(entry["expected_intent"])

    # Compute aggregate metrics
    P, R, F1 = bert_score(predictions, references, lang="en")

    return {
        "intent_accuracy": accuracy_score(intents_true, intents_pred),
        "avg_bertscore_f1": F1.mean().item(),
        "per_class_f1": classification_report(intents_true, intents_pred, output_dict=True),
        "format_compliance": sum(1 for p in predictions if is_valid_format(p)) / len(predictions),
        "guardrail_pass_rate": sum(1 for r in results if r["guardrail_passed"]) / len(results),
    }

US-3.3 — Shadow Mode Evaluation (Layer 2)

Implementation: The orchestrator is configured (via AppConfig feature flag) to fork every request to both the production and candidate pipelines. The candidate's response is logged asynchronously to Kinesis but never served. A nightly Lambda aggregates comparison metrics and generates a shadow report stored in S3. The report is sent to Slack for review.

Services: AppConfig, Kinesis, Lambda, S3, Slack (webhook)

import asyncio
import mlflow

async def handle_message_with_shadow(request: ChatRequest, shadow_enabled: bool):
    """Process request through both prod and candidate when shadow mode is on."""

    # Always run production pipeline (serves the user)
    prod_response = await run_pipeline(request, model_version="production")

    if shadow_enabled:
        # Run candidate in parallel — fire-and-forget, does not block response
        asyncio.create_task(
            run_shadow_pipeline(request, prod_response)
        )

    return prod_response


async def run_shadow_pipeline(request: ChatRequest, prod_response: ChatResponse):
    """Run candidate pipeline and log comparison — never serves to user."""
    try:
        candidate_response = await run_pipeline(request, model_version="candidate")

        # Compute comparison metrics
        P, R, F1 = bert_score(
            [candidate_response.text], [prod_response.text], lang="en"
        )

        comparison = {
            "request_id": request.request_id,
            "intent": request.classified_intent,
            "prod_length": len(prod_response.text.split()),
            "candidate_length": len(candidate_response.text.split()),
            "bertscore_f1": F1.item(),
            "prod_guardrail_passed": prod_response.guardrail_passed,
            "candidate_guardrail_passed": candidate_response.guardrail_passed,
            "timestamp": datetime.utcnow().isoformat(),
        }

        # Log to Kinesis for aggregation
        kinesis.put_record(
            StreamName="manga-assist-shadow",
            Data=json.dumps(comparison),
            PartitionKey=request.request_id,
        )
    except Exception as e:
        logger.warning(f"Shadow pipeline failed (non-blocking): {e}")

US-3.4 — Canary Deployment (Layer 3)

Implementation: SageMaker endpoint with two production variants: baseline (99% traffic) and canary (1%). CloudWatch alarms on canary-specific metrics trigger an auto-rollback Lambda if thresholds are breached. Promotion from 1% → 10% → 50% → 100% is done via update_endpoint_weights_and_capacities with manual approval.

Services: SageMaker, CloudWatch Alarms, Lambda, SNS

import boto3

sm = boto3.client("sagemaker")

def deploy_canary(endpoint_name: str, canary_model_url: str):
    """Add a canary variant at 1% traffic."""

    sm.update_endpoint_weights_and_capacities(
        EndpointName=endpoint_name,
        DesiredWeightsAndCapacities=[
            {"VariantName": "baseline", "DesiredWeight": 99},
            {"VariantName": "canary", "DesiredWeight": 1},
        ],
    )

def promote_canary(endpoint_name: str, target_weight: int):
    """Promote canary: 1% -> 10% -> 50% -> 100%."""
    sm.update_endpoint_weights_and_capacities(
        EndpointName=endpoint_name,
        DesiredWeightsAndCapacities=[
            {"VariantName": "baseline", "DesiredWeight": 100 - target_weight},
            {"VariantName": "canary", "DesiredWeight": target_weight},
        ],
    )

def auto_rollback_handler(event, context):
    """Lambda triggered by CloudWatch alarm on canary metrics."""
    alarm_name = event["detail"]["alarmName"]
    endpoint = event["detail"]["configuration"]["metrics"][0]["dimensions"]["EndpointName"]

    sm.update_endpoint_weights_and_capacities(
        EndpointName=endpoint,
        DesiredWeightsAndCapacities=[
            {"VariantName": "baseline", "DesiredWeight": 100},
            {"VariantName": "canary", "DesiredWeight": 0},
        ],
    )

    sns.publish(
        TopicArn="arn:aws:sns:us-east-1:ACCOUNT:manga-assist-alerts",
        Subject=f"AUTO-ROLLBACK: Canary on {endpoint}",
        Message=f"Alarm {alarm_name} triggered. Canary traffic set to 0%.",
    )

US-3.5 — Continuous Monitoring and Auto-Rollback (Layer 4)

Implementation: CloudWatch custom metrics are emitted per response. Composite alarms combine multiple metrics (hallucination rate, escalation rate, error rate) into a single health signal. Hard threshold breach triggers auto-rollback via Lambda. Soft thresholds page the on-call via PagerDuty. A weekly Redshift query generates trend reports.

Services: CloudWatch, Lambda, PagerDuty, Redshift, Grafana

import boto3

cw = boto3.client("cloudwatch")

def emit_response_metrics(response: ChatResponse, trace: dict):
    """Emit per-response metrics to CloudWatch for continuous monitoring."""

    cw.put_metric_data(
        Namespace="MangaAssist/AIQuality",
        MetricData=[
            {
                "MetricName": "HallucinationDetected",
                "Value": 1 if response.hallucination_flag else 0,
                "Unit": "Count",
                "Dimensions": [
                    {"Name": "Intent", "Value": response.intent},
                    {"Name": "ModelVersion", "Value": trace["model_version"]},
                ],
            },
            {
                "MetricName": "EscalationTriggered",
                "Value": 1 if response.escalated else 0,
                "Unit": "Count",
                "Dimensions": [
                    {"Name": "Intent", "Value": response.intent},
                ],
            },
            {
                "MetricName": "GuardrailBlocked",
                "Value": 1 if not response.guardrail_passed else 0,
                "Unit": "Count",
            },
            {
                "MetricName": "ResponseLatencyMs",
                "Value": trace["total_latency_ms"],
                "Unit": "Milliseconds",
            },
        ],
    )
// CloudWatch composite alarm — auto-rollback trigger
{
  "AlarmName": "MangaAssist-CriticalQuality",
  "AlarmRule": "ALARM(HallucinationRateHigh) OR ALARM(EscalationRateHigh) OR ALARM(ErrorRateHigh)",
  "AlarmActions": ["arn:aws:sns:us-east-1:ACCOUNT:auto-rollback-trigger"],
  "AlarmDescription": "Triggers auto-rollback if any critical quality metric breaches threshold"
}

US-3.6 — Human Evaluation Pipeline

Implementation: A Lambda runs weekly to sample 200 production responses from DynamoDB (stratified by intent). These are loaded into a Streamlit annotation app where human annotators score each response. Scores are written to Redshift and a weekly summary is posted to Slack with trend comparisons.

Services: Lambda, DynamoDB, Streamlit, Redshift, Slack

import random
from collections import Counter

def sample_for_human_eval(responses_table: str, sample_size: int = 200) -> list[dict]:
    """Stratified sampling of production responses for human evaluation."""

    # Target distribution matches golden dataset
    intent_quotas = {
        "recommendation": 48,   # 24%
        "product_question": 40, # 20%
        "faq": 32,              # 16%
        "order_tracking": 24,   # 12%
        "multi_turn": 20,       # 10%
        "edge_case": 16,        # 8%
        "return_request": 12,   # 6%
        "chitchat": 8,          # 4%
    }

    samples = []
    for intent, count in intent_quotas.items():
        intent_responses = query_responses_by_intent(responses_table, intent, limit=count * 3)
        samples.extend(random.sample(intent_responses, min(count, len(intent_responses))))

    return samples


# Annotation schema per response
ANNOTATION_SCHEMA = {
    "factual_correctness":  {"type": "likert", "scale": [1, 2, 3, 4, 5]},
    "helpfulness":          {"type": "likert", "scale": [1, 2, 3, 4, 5]},
    "tone_appropriateness": {"type": "likert", "scale": [1, 2, 3, 4, 5]},
    "format_quality":       {"type": "likert", "scale": [1, 2, 3, 4, 5]},
    "purchase_likelihood":  {"type": "binary", "options": ["yes", "no"]},
    "failure_category":     {"type": "optional_select", "options": [
        "none", "wrong_intent", "bad_retrieval", "hallucination",
        "unhelpful_tone", "missing_info", "format_issue"
    ]},
}

Epic 4: Observability and Monitoring

US-4.1 — End-to-End LLM Pipeline Tracing

Implementation: MLflow Tracing is deployed on ECS Fargate with mlflow-tracing lightweight SDK. Auto-tracing is enabled for Bedrock/Anthropic calls. Custom @mlflow.trace decorators instrument internal components (intent classifier, guardrails, caching). 10% sampling in production, 100% in staging. Traces are stored in S3 with PostgreSQL metadata.

Services: MLflow Tracing, ECS Fargate, PostgreSQL RDS, S3, OpenTelemetry

import mlflow
from mlflow.entities import SpanType

# Enable auto-tracing for Bedrock calls
mlflow.anthropic.autolog()

# Configure async logging + sampling for production
mlflow.config.enable_async_logging(True)
mlflow.config.set_sampling_ratio(0.10)

@mlflow.trace(span_type=SpanType.CHAIN, name="orchestrator.handle_message")
async def handle_message(request: ChatRequest) -> ChatResponse:
    """Full pipeline with automatic tracing for every span."""

    # Each function below is also decorated with @mlflow.trace
    context = await load_conversation_context(request.session_id)
    intent = classify_intent(request.message, request.page_context)
    service_data = await fetch_service_data(intent, context)
    prompt = build_prompt(intent, service_data, context)

    # Bedrock call is auto-traced by mlflow.anthropic.autolog()
    raw_response = await call_bedrock(prompt)

    validated = apply_guardrails(raw_response, service_data)
    await save_turn(request.session_id, request.message, validated.text)

    # Tag trace with searchable metadata
    mlflow.update_current_trace(tags={
        "session_id": request.session_id,
        "intent": intent.type.value,
        "model_version": "claude-3.5-sonnet-v2",
        "cache_hit": str(service_data.get("cache_hit", False)),
    })

    return validated

US-4.2 — Model Drift Detection

Implementation: A scheduled Lambda runs weekly, computing intent distribution from the past 7 days (Kinesis → Redshift) and comparing against the baseline distribution stored at training time. For embedding drift, cosine similarity of sampled production queries against training centroid is computed. Alerts fire via SNS → PagerDuty/Slack.

Services: Lambda, Redshift, CloudWatch, SNS, SageMaker

import numpy as np
from scipy.stats import ks_2samp

# Baseline intent distribution (set at training time)
BASELINE_DISTRIBUTION = {
    "recommendation": 0.35, "product_question": 0.22, "faq": 0.18,
    "order_tracking": 0.12, "chitchat": 0.08, "escalation": 0.05,
}

def check_intent_drift(current_distribution: dict, threshold_pp: float = 5.0) -> list[dict]:
    """Alert if any intent shifts > threshold percentage points from baseline."""
    alerts = []
    for intent, baseline_share in BASELINE_DISTRIBUTION.items():
        current_share = current_distribution.get(intent, 0)
        drift_pp = abs(current_share - baseline_share) * 100
        if drift_pp > threshold_pp:
            alerts.append({
                "intent": intent,
                "baseline": f"{baseline_share:.1%}",
                "current": f"{current_share:.1%}",
                "drift_pp": f"{drift_pp:.1f}pp",
                "action": "Investigate query patterns; may need classifier retraining",
            })
    return alerts


def check_embedding_drift(recent_embeddings: np.ndarray, training_embeddings: np.ndarray) -> dict:
    """KS test on cosine similarity distributions to detect embedding space shift."""
    from sklearn.metrics.pairwise import cosine_similarity

    training_centroid = training_embeddings.mean(axis=0, keepdims=True)
    recent_sims = cosine_similarity(recent_embeddings, training_centroid).flatten()
    training_sims = cosine_similarity(training_embeddings, training_centroid).flatten()

    ks_stat, p_value = ks_2samp(recent_sims, training_sims)
    return {
        "ks_statistic": ks_stat,
        "p_value": p_value,
        "drift_detected": p_value < 0.01,
        "action": "Re-index knowledge base with latest queries" if p_value < 0.01 else "No action needed",
    }

US-4.3 — Real-Time Operational Dashboard

Implementation: Grafana dashboards backed by Prometheus (scraped from MLflow + ECS metrics) and CloudWatch. Dashboard panels auto-refresh every 10 seconds. Drill-down links connect metrics to MLflow trace search.

Services: Grafana, Prometheus, CloudWatch, MLflow

// Grafana dashboard panel definition (JSON model excerpt)
{
  "title": "MangaAssist LLM Pipeline — Real-Time",
  "panels": [
    {
      "title": "Active Sessions",
      "type": "stat",
      "datasource": "CloudWatch",
      "targets": [{"namespace": "MangaAssist/Sessions", "metricName": "ActiveSessions"}]
    },
    {
      "title": "P99 Latency (ms)",
      "type": "timeseries",
      "datasource": "Prometheus",
      "targets": [{"expr": "histogram_quantile(0.99, rate(manga_response_latency_bucket[5m]))"}]
    },
    {
      "title": "Intent Distribution",
      "type": "piechart",
      "datasource": "Prometheus",
      "targets": [{"expr": "sum by (intent) (rate(manga_intent_total[1h]))"}]
    },
    {
      "title": "LLM Cost / Hour",
      "type": "stat",
      "datasource": "CloudWatch",
      "targets": [{"namespace": "MangaAssist/Cost", "metricName": "LLMCostPerHour"}]
    },
    {
      "title": "Guardrail Block Rate",
      "type": "gauge",
      "datasource": "Prometheus",
      "targets": [{"expr": "rate(manga_guardrail_blocked_total[1h]) / rate(manga_requests_total[1h])"}]
    }
  ],
  "refresh": "10s"
}

US-4.4 — LLM-Specific Metrics Collection

Implementation: A metrics middleware in the orchestrator extracts LLM-specific data from the Bedrock response headers and body (token counts, TTFT, stop reason) and emits them as CloudWatch custom metrics with intent and model dimensions.

Services: CloudWatch, Bedrock

import time
import boto3

cw = boto3.client("cloudwatch")

async def call_bedrock_with_metrics(prompt: str, intent: str) -> dict:
    """Invoke Bedrock and capture LLM-specific metrics."""

    start = time.perf_counter()
    first_token_time = None

    bedrock = boto3.client("bedrock-runtime")
    response = bedrock.invoke_model_with_response_stream(
        modelId="anthropic.claude-3-5-sonnet-20241022-v2:0",
        body=json.dumps({
            "anthropic_version": "bedrock-2023-05-31",
            "messages": [{"role": "user", "content": prompt}],
            "max_tokens": 800,
            "temperature": 0.3,
        }),
    )

    tokens = []
    for event in response["body"]:
        chunk = json.loads(event["chunk"]["bytes"])
        if chunk["type"] == "content_block_delta":
            if first_token_time is None:
                first_token_time = time.perf_counter()
            tokens.append(chunk["delta"]["text"])

    total_time = time.perf_counter() - start
    ttft = (first_token_time - start) if first_token_time else total_time
    output_text = "".join(tokens)

    # Extract usage from final message
    input_tokens = response.get("inputTokenCount", 0)
    output_tokens = len(tokens)

    # Emit LLM-specific metrics
    cw.put_metric_data(
        Namespace="MangaAssist/LLM",
        MetricData=[
            {"MetricName": "TTFT_ms", "Value": ttft * 1000, "Unit": "Milliseconds",
             "Dimensions": [{"Name": "Intent", "Value": intent}, {"Name": "Model", "Value": "claude-3.5-sonnet"}]},
            {"MetricName": "InputTokens", "Value": input_tokens, "Unit": "Count",
             "Dimensions": [{"Name": "Intent", "Value": intent}]},
            {"MetricName": "OutputTokens", "Value": output_tokens, "Unit": "Count",
             "Dimensions": [{"Name": "Intent", "Value": intent}]},
            {"MetricName": "TotalLatency_ms", "Value": total_time * 1000, "Unit": "Milliseconds"},
        ],
    )

    return {"text": output_text, "input_tokens": input_tokens, "output_tokens": output_tokens,
            "ttft_ms": ttft * 1000, "total_ms": total_time * 1000}

US-4.5 — Alerting on Quality Degradation

Implementation: CloudWatch alarms with tiered severity → SNS fan-out → PagerDuty (critical), Slack webhook (warning), email (informational). Each alarm message includes the metric name, threshold, current value, and a deep-link to the relevant Grafana panel and MLflow trace search.

Services: CloudWatch Alarms, SNS, Lambda (formatter), PagerDuty, Slack

# CDK alarm definitions
from aws_cdk import aws_cloudwatch as cw, aws_cloudwatch_actions as cw_actions, aws_sns as sns

# Critical: pages on-call, triggers auto-rollback
hallucination_alarm = cw.Alarm(self, "HallucinationRateHigh",
    metric=cw.Metric(
        namespace="MangaAssist/AIQuality",
        metric_name="HallucinationDetected",
        statistic="Average",
        period=Duration.minutes(5),
    ),
    threshold=0.05,  # 5%
    evaluation_periods=3,
    comparison_operator=cw.ComparisonOperator.GREATER_THAN_THRESHOLD,
    alarm_description="Hallucination rate >5% for 15 min — auto-rollback triggered",
)
hallucination_alarm.add_alarm_action(cw_actions.SnsAction(pagerduty_topic))
hallucination_alarm.add_alarm_action(cw_actions.SnsAction(auto_rollback_topic))

# Warning: Slack notification, no auto-rollback
escalation_warning = cw.Alarm(self, "EscalationRateWarning",
    metric=cw.Metric(
        namespace="MangaAssist/AIQuality",
        metric_name="EscalationTriggered",
        statistic="Average",
        period=Duration.minutes(15),
    ),
    threshold=0.17,  # 17%
    evaluation_periods=2,
    alarm_description="Escalation rate >17% for 30 min — investigate",
)
escalation_warning.add_alarm_action(cw_actions.SnsAction(slack_topic))

Epic 5: RAG and Data Pipeline Operations

US-5.1 — Automated Knowledge Base Refresh Pipeline

Implementation: An event-driven pipeline using EventBridge + Step Functions. Catalog changes trigger an EventBridge rule that invokes a Step Function workflow: fetch updated documents → chunk → embed via Bedrock Titan → upsert into OpenSearch. Stale chunk detection runs as a daily scheduled Lambda.

Services: EventBridge, Step Functions, Lambda, Bedrock (Titan Embeddings), OpenSearch Serverless

import boto3
from opensearchpy import OpenSearch, RequestsHttpConnection

bedrock = boto3.client("bedrock-runtime")

def chunk_and_index(document: dict, chunk_size: int = 512, overlap: int = 50):
    """Chunk a document, embed each chunk, and upsert to OpenSearch."""

    chunks = sliding_window_chunk(document["content"], chunk_size, overlap)

    for i, chunk_text in enumerate(chunks):
        # Embed using Titan Embeddings V2
        embed_response = bedrock.invoke_model(
            modelId="amazon.titan-embed-text-v2:0",
            body=json.dumps({"inputText": chunk_text}),
        )
        embedding = json.loads(embed_response["body"].read())["embedding"]

        # Upsert to OpenSearch
        chunk_doc = {
            "chunk_id": f"{document['doc_id']}-{i:03d}",
            "content": chunk_text,
            "embedding": embedding,
            "source_type": document["source_type"],
            "asin": document.get("asin"),
            "category": document.get("category"),
            "last_updated": datetime.utcnow().isoformat(),
            "source_doc_hash": hashlib.sha256(document["content"].encode()).hexdigest(),
        }

        opensearch_client.index(
            index="manga-embeddings",
            id=chunk_doc["chunk_id"],
            body=chunk_doc,
        )


def detect_stale_chunks():
    """Find chunks whose source document no longer exists or has been updated."""
    all_chunks = scroll_all_chunks("manga-embeddings")
    stale = []
    for chunk in all_chunks:
        source_doc = fetch_source_document(chunk["source_type"], chunk["asin"])
        if source_doc is None:
            stale.append({"chunk_id": chunk["chunk_id"], "reason": "source_deleted"})
        elif hashlib.sha256(source_doc["content"].encode()).hexdigest() != chunk["source_doc_hash"]:
            stale.append({"chunk_id": chunk["chunk_id"], "reason": "source_updated"})
    return stale

US-5.2 — Embedding Model Version Management

Implementation: When upgrading the embedding model, a full re-index is triggered into a new OpenSearch index (manga-embeddings-v3). The retrieval evaluation suite runs against the new index. Once validation passes, an alias swap atomically switches production traffic from the old to new index. The old index is retained for 7 days as rollback.

Services: OpenSearch Serverless, Bedrock, Lambda, S3

def upgrade_embedding_model(new_model_id: str, new_index: str, old_index: str):
    """Full re-index and atomic cutover for embedding model upgrades."""

    # Step 1: Re-index all documents with new embedding model
    all_source_docs = fetch_all_source_documents()
    for doc in all_source_docs:
        chunk_and_index(doc, embedding_model=new_model_id, target_index=new_index)

    # Step 2: Run retrieval evaluation
    eval_results = evaluate_retrieval(
        index=new_index,
        eval_dataset="s3://manga-assist-eval/retrieval-eval-100.json",
    )

    if eval_results["recall_at_3"] < 0.80 or eval_results["mrr_at_3"] < 0.60:
        raise EvalFailure(f"New embeddings failed: {eval_results}")

    # Step 3: Atomic alias swap
    opensearch_client.indices.update_aliases(body={
        "actions": [
            {"remove": {"index": old_index, "alias": "manga-embeddings-live"}},
            {"add": {"index": new_index, "alias": "manga-embeddings-live"}},
        ]
    })

    # Step 4: Schedule old index deletion after 7 days
    schedule_index_deletion(old_index, delay_days=7)

US-5.3 — RAG Retrieval Quality Monitoring

Implementation: A weekly Lambda loads 100 curated query-document pairs from S3, runs each query against the live OpenSearch index, and computes Recall@3, MRR@3, and average reranker score. Results are sent to Redshift for trending. Alerts fire if metrics drop below thresholds.

Services: Lambda, OpenSearch, Redshift, CloudWatch, SNS

def weekly_retrieval_evaluation(eval_dataset_key: str):
    """Evaluate RAG retrieval quality against curated test set."""

    eval_data = load_from_s3(eval_dataset_key)
    recall_hits, mrr_sum, reranker_scores = 0, 0.0, []

    for entry in eval_data:
        query = entry["query"]
        expected_doc_ids = set(entry["relevant_chunk_ids"])

        # Retrieve top 3 via full RAG pipeline
        results = retrieve_and_rerank(query, top_k=3)
        retrieved_ids = [r["chunk_id"] for r in results]

        # Recall@3: did we find the relevant document?
        if expected_doc_ids & set(retrieved_ids):
            recall_hits += 1

        # MRR@3: reciprocal rank of first relevant result
        for rank, rid in enumerate(retrieved_ids, 1):
            if rid in expected_doc_ids:
                mrr_sum += 1.0 / rank
                break

        reranker_scores.append(results[0]["reranker_score"] if results else 0)

    n = len(eval_data)
    metrics = {
        "recall_at_3": recall_hits / n,
        "mrr_at_3": mrr_sum / n,
        "avg_reranker_score": sum(reranker_scores) / n,
        "evaluated_queries": n,
    }

    # Alert if below threshold
    if metrics["recall_at_3"] < 0.75:
        send_alert(f"RAG Recall@3 dropped to {metrics['recall_at_3']:.2%} (threshold: 75%)")

    return metrics

US-5.4 — Chunk Strategy Tuning Pipeline

Implementation: An MLflow experiment tracks different chunk configurations. Each experiment creates a test index with the candidate chunking strategy, evaluates retrieval quality, and logs results. The experiment comparison view shows which configuration performs best.

Services: MLflow Experiments, OpenSearch, Bedrock, Lambda

import mlflow

CHUNK_EXPERIMENTS = [
    {"chunk_size": 256, "overlap": 25, "name": "small-chunks"},
    {"chunk_size": 512, "overlap": 50, "name": "medium-chunks"},
    {"chunk_size": 1024, "overlap": 100, "name": "large-chunks"},
    {"chunk_size": 512, "overlap": 0, "name": "no-overlap"},
]

def run_chunk_experiment(config: dict, test_corpus: list[dict], eval_dataset: list[dict]):
    with mlflow.start_run(run_name=config["name"]):
        mlflow.log_params(config)

        # Create test index with candidate chunk strategy
        test_index = f"chunk-experiment-{config['name']}"
        for doc in test_corpus:
            chunk_and_index(doc, chunk_size=config["chunk_size"],
                           overlap=config["overlap"], target_index=test_index)

        # Evaluate retrieval quality
        retrieval_metrics = evaluate_retrieval(test_index, eval_dataset)
        mlflow.log_metrics(retrieval_metrics)

        # Also evaluate downstream response quality
        response_metrics = evaluate_golden_dataset_with_index(test_index)
        mlflow.log_metrics({"downstream_bertscore": response_metrics["avg_bertscore"]})

        # Cleanup test index
        delete_index(test_index)

Epic 6: Cost Governance and Token Budget Management

US-6.1 — Per-Session Token Budget Enforcement

Implementation: The orchestrator tracks cumulative token usage per session in DynamoDB. Before each LLM call, it checks remaining budget. If budget is exceeded, it gracefully degrades: shorter context, skip RAG, switch to Haiku. Token counts come from Bedrock response metadata.

Services: DynamoDB, Bedrock, ElastiCache

SESSION_TOKEN_BUDGET = 5000  # Configurable via AppConfig

class TokenBudgetManager:
    def __init__(self, session_id: str, dynamodb_table):
        self.session_id = session_id
        self.table = dynamodb_table

    def get_remaining(self) -> int:
        item = self.table.get_item(Key={"session_id": self.session_id}).get("Item", {})
        used = item.get("tokens_used", 0)
        return max(0, SESSION_TOKEN_BUDGET - used)

    def record_usage(self, input_tokens: int, output_tokens: int):
        self.table.update_item(
            Key={"session_id": self.session_id},
            UpdateExpression="ADD tokens_used :t",
            ExpressionAttributeValues={":t": input_tokens + output_tokens},
        )

    def get_degradation_strategy(self) -> str:
        remaining = self.get_remaining()
        if remaining > 2000:
            return "full"        # Full pipeline: RAG + Sonnet + full context
        elif remaining > 500:
            return "reduced"     # Skip RAG, use shorter context, switch to Haiku
        else:
            return "minimal"     # Template responses only, no LLM


# Usage in orchestrator
budget = TokenBudgetManager(request.session_id, sessions_table)
strategy = budget.get_degradation_strategy()

if strategy == "full":
    response = await full_pipeline(request)
elif strategy == "reduced":
    response = await reduced_pipeline(request)  # Haiku, no RAG
else:
    response = template_response(request.intent)

budget.record_usage(response.input_tokens, response.output_tokens)

US-6.2 — LLM Cost Attribution and Reporting

Implementation: Every LLM call is tagged with intent, model, user type, and region. Token counts and cost are computed per request and emitted to Kinesis. A nightly Redshift SQL job aggregates costs into a reporting table. Grafana dashboards visualize cost by dimension.

Services: Kinesis, Redshift, Grafana, Lambda

# Cost calculation per request
MODEL_PRICING = {
    "claude-3.5-sonnet": {"input_per_1k": 0.003, "output_per_1k": 0.015},
    "claude-3-haiku":    {"input_per_1k": 0.00025, "output_per_1k": 0.00125},
    "titan-embed-v2":    {"input_per_1k": 0.0001, "output_per_1k": 0.0},
}

def compute_request_cost(model: str, input_tokens: int, output_tokens: int) -> float:
    pricing = MODEL_PRICING[model]
    return (input_tokens / 1000 * pricing["input_per_1k"] +
            output_tokens / 1000 * pricing["output_per_1k"])


def emit_cost_event(request_id: str, intent: str, model: str,
                    input_tokens: int, output_tokens: int, user_type: str):
    cost = compute_request_cost(model, input_tokens, output_tokens)
    kinesis.put_record(
        StreamName="manga-assist-cost",
        Data=json.dumps({
            "request_id": request_id,
            "intent": intent,
            "model": model,
            "input_tokens": input_tokens,
            "output_tokens": output_tokens,
            "cost_usd": cost,
            "user_type": user_type,    # "prime" | "guest"
            "region": "us-east-1",
            "timestamp": datetime.utcnow().isoformat(),
        }),
        PartitionKey=intent,
    )
-- Redshift daily cost attribution query
SELECT
    DATE(timestamp) AS day,
    intent,
    model,
    user_type,
    COUNT(*) AS request_count,
    SUM(cost_usd) AS total_cost,
    AVG(cost_usd) AS avg_cost_per_request,
    SUM(input_tokens + output_tokens) AS total_tokens
FROM manga_assist.cost_events
WHERE timestamp >= DATEADD(day, -7, CURRENT_DATE)
GROUP BY day, intent, model, user_type
ORDER BY total_cost DESC;

US-6.3 — LLM Bypass for Low-Complexity Intents

Implementation: A routing table (stored in AppConfig) maps intents to handler types: template, api_template, or llm. Template and API handlers skip the LLM entirely. The bypass rate is tracked as a real-time metric. New bypass routes can be added by updating AppConfig without a deployment.

Services: AppConfig, Lambda, CloudWatch

# Intent routing configuration (AppConfig)
INTENT_ROUTES = {
    "chitchat":       {"handler": "template", "template_id": "greeting"},
    "order_tracking": {"handler": "api_template", "api": "order-service", "template_id": "order_status"},
    "faq_simple":     {"handler": "cached", "cache_key_pattern": "faq:{query_hash}"},
    "recommendation": {"handler": "llm", "model": "claude-3.5-sonnet"},
    "product_question":{"handler": "llm", "model": "claude-3.5-sonnet"},
    "faq_complex":    {"handler": "llm", "model": "claude-3-haiku"},
}

TEMPLATES = {
    "greeting": "Hi! Welcome to the JP Manga store. How can I help you today?",
    "order_status": "Your order {order_id} shipped on {ship_date} and is expected to arrive by {delivery_date}.",
}

async def route_request(intent: str, request: ChatRequest) -> ChatResponse:
    route = INTENT_ROUTES[intent]
    handler = route["handler"]

    # Track bypass rate
    is_llm = handler == "llm"
    cw.put_metric_data(
        Namespace="MangaAssist/Cost",
        MetricData=[{"MetricName": "LLMBypassed", "Value": 0 if is_llm else 1, "Unit": "Count"}],
    )

    if handler == "template":
        return ChatResponse(text=TEMPLATES[route["template_id"]])

    elif handler == "api_template":
        api_data = await call_api(route["api"], request)
        return ChatResponse(text=TEMPLATES[route["template_id"]].format(**api_data))

    elif handler == "cached":
        cached = redis_client.get(route["cache_key_pattern"].format(query_hash=hash_query(request.message)))
        if cached:
            return ChatResponse(text=cached)
        # Cache miss: fall through to LLM
        return await llm_pipeline(request)

    else:
        return await llm_pipeline(request, model=route.get("model"))

US-6.4 — Prompt Caching for Repeated Context

Implementation: Bedrock prompt caching is enabled by structuring prompts with a stable system block as a cacheable prefix. The system prompt (~500 tokens) and frequently-used policy chunks are placed in the first content block. Bedrock automatically caches matching prefixes across requests.

Services: Bedrock (Prompt Caching), CloudWatch

def build_cacheable_prompt(intent: str, context: dict) -> dict:
    """Structure the prompt so the system block is cacheable by Bedrock."""

    # Static system block — identical across requests, highly cacheable
    system_block = {
        "role": "system",
        "content": [
            {
                "type": "text",
                "text": SYSTEM_PROMPT,  # ~500 tokens, stable across requests
                "cache_control": {"type": "ephemeral"},  # Enable caching
            }
        ],
    }

    # Dynamic user block — varies per request
    user_content = (
        f"CONTEXT:\n{json.dumps(context['page_context'])}\n\n"
        f"RETRIEVED KNOWLEDGE:\n{context['rag_chunks']}\n\n"
        f"CONVERSATION:\n{format_history(context['history'])}\n\n"
        f"USER: {context['user_message']}"
    )

    return {
        "anthropic_version": "bedrock-2023-05-31",
        "system": system_block["content"],
        "messages": [{"role": "user", "content": user_content}],
        "max_tokens": 800,
        "temperature": 0.3,
    }

Epic 7: Guardrails and Safety

US-7.1 — Multi-Layer Guardrail Pipeline

Implementation: A 6-stage guardrail pipeline (3 pre-generation, 3 post-generation) runs on every request. Each stage is a pluggable function with pass/fail/modify semantics. Stages run sequentially — a failure at any stage short-circuits the rest. The full pipeline is instrumented with MLflow Tracing.

Services: Bedrock Guardrails, Lambda, Comprehend (PII), OpenSearch (ASIN validation)

from dataclasses import dataclass
from typing import Optional

@dataclass
class GuardrailResult:
    passed: bool
    stage: str
    reason: Optional[str] = None
    modified_text: Optional[str] = None

# Pre-generation guardrails (run on user input)
PRE_GUARDRAILS = [
    ("input_length", lambda text, _: len(text) <= 2000),
    ("prompt_injection", detect_prompt_injection),
    ("input_toxicity", check_input_toxicity),
]

# Post-generation guardrails (run on LLM output)
POST_GUARDRAILS = [
    ("asin_validation", validate_asins_exist),
    ("price_validation", validate_prices_match_catalog),
    ("pii_detection", detect_and_redact_pii),
    ("competitor_filter", check_competitor_mentions),
    ("url_validation", validate_amazon_urls_only),
    ("format_check", validate_response_format),
]

@mlflow.trace(span_type=SpanType.CHAIN, name="guardrails_pipeline")
def apply_post_guardrails(response_text: str, context: dict) -> GuardrailResult:
    current_text = response_text

    for stage_name, check_fn in POST_GUARDRAILS:
        with mlflow.start_span(name=f"guardrail.{stage_name}") as span:
            result = check_fn(current_text, context)
            span.set_attributes({"passed": result.passed, "reason": result.reason or ""})

            if not result.passed:
                return GuardrailResult(passed=False, stage=stage_name, reason=result.reason)
            if result.modified_text:
                current_text = result.modified_text  # e.g., PII redacted

    return GuardrailResult(passed=True, stage="all", modified_text=current_text)


def validate_asins_exist(text: str, context: dict) -> GuardrailResult:
    """Ensure every ASIN mentioned in the response exists in the catalog."""
    import re
    mentioned_asins = re.findall(r"B[0-9A-Z]{9}", text)
    provided_asins = {p["asin"] for p in context.get("products", [])}

    invalid = [a for a in mentioned_asins if a not in provided_asins]
    if invalid:
        return GuardrailResult(passed=False, stage="asin_validation",
                               reason=f"Hallucinated ASINs: {invalid}")
    return GuardrailResult(passed=True, stage="asin_validation")

US-7.2 — Prompt Injection Defense

Implementation: Two layers: (1) regex-based detection catches known injection patterns, (2) a lightweight classifier (fine-tuned DistilBERT) catches novel attempts. Both run pre-generation with a combined latency budget of 20ms. All blocked attempts are logged for security review and classifier retraining.

Services: SageMaker (classifier), CloudWatch, DynamoDB (security logs)

import re

# Known prompt injection patterns
INJECTION_PATTERNS = [
    r"ignore\s+(all\s+)?previous\s+instructions",
    r"you\s+are\s+now\s+(?:a|an)\s+",
    r"forget\s+(?:everything|all|your)\s+",
    r"system\s*prompt\s*:",
    r"<\s*system\s*>",
    r"IMPORTANT:\s*(?:new|override|ignore)",
    r"(?:base64|rot13|hex)\s*:",                  # Encoded payloads
    r"\[INST\]|\[\/INST\]|<<SYS>>|<\|im_start\|>",  # Model-specific tokens
]

def detect_prompt_injection(user_input: str, context: dict) -> GuardrailResult:
    """Two-layer injection detection: regex + ML classifier."""

    # Layer 1: Regex fast path
    for pattern in INJECTION_PATTERNS:
        if re.search(pattern, user_input, re.IGNORECASE):
            log_security_event("prompt_injection_blocked", user_input, "regex", pattern)
            return GuardrailResult(passed=False, stage="prompt_injection",
                                   reason="Suspected prompt injection detected")

    # Layer 2: ML classifier for novel attacks
    injection_score = invoke_injection_classifier(user_input)
    if injection_score > 0.85:
        log_security_event("prompt_injection_blocked", user_input, "ml_classifier", injection_score)
        return GuardrailResult(passed=False, stage="prompt_injection",
                               reason="ML classifier flagged as injection attempt")

    return GuardrailResult(passed=True, stage="prompt_injection")

US-7.3 — Anti-Hallucination Validation

Implementation: Post-generation, a validation pipeline checks: (1) all ASINs against the catalog, (2) all prices against live catalog data, (3) all URLs against Amazon URL patterns. Additionally, a faithfulness score is computed by asking a lightweight LLM (Haiku) whether the response is grounded in the provided context.

Services: Bedrock (Haiku for faithfulness), Product Catalog API, Lambda

async def check_hallucination(response_text: str, context: dict) -> GuardrailResult:
    """Multi-check anti-hallucination validation."""

    # Check 1: ASIN existence
    asin_result = validate_asins_exist(response_text, context)
    if not asin_result.passed:
        return asin_result

    # Check 2: Price accuracy
    price_result = validate_prices(response_text, context)
    if not price_result.passed:
        return price_result

    # Check 3: Faithfulness score via Haiku
    faithfulness = await compute_faithfulness_score(response_text, context)
    if faithfulness < 0.55:
        return GuardrailResult(
            passed=False, stage="hallucination_check",
            reason=f"Faithfulness score {faithfulness:.2f} below threshold 0.55",
        )

    return GuardrailResult(passed=True, stage="hallucination_check")


async def compute_faithfulness_score(response: str, context: dict) -> float:
    """Use Haiku to score whether the response is grounded in provided context."""
    prompt = f"""Score how faithfully this RESPONSE uses only information from the CONTEXT.
Return a single float between 0.0 (completely fabricated) and 1.0 (fully grounded).

CONTEXT:
{context.get('rag_chunks', '')}
{json.dumps(context.get('products', []))}

RESPONSE:
{response}

Score (0.0-1.0):"""

    result = await call_bedrock(prompt, model="anthropic.claude-3-haiku-20240307-v1:0", max_tokens=10)
    return float(result["text"].strip())

US-7.4 — Streaming Response Guardrails

Implementation: A lightweight token-level filter runs on each streaming chunk, checking for PII patterns, competitor names, and profanity via regex. If a violation is detected mid-stream, streaming is halted, the partial response is replaced with a safe fallback, and the incident is logged. The full assembled response also runs through the complete post-generation guardrails.

Services: Bedrock (streaming), Lambda, CloudWatch

import re

# Lightweight patterns for streaming check (must be fast: <5ms per chunk)
STREAMING_BLOCKLIST = {
    "competitor": re.compile(r"\b(Barnes\s*&?\s*Noble|RightStuf|Crunchyroll\s*Store)\b", re.I),
    "pii_email": re.compile(r"[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}"),
    "pii_phone": re.compile(r"\b\d{3}[-.]?\d{3}[-.]?\d{4}\b"),
    "pii_ssn": re.compile(r"\b\d{3}-\d{2}-\d{4}\b"),
}

async def stream_with_guardrails(prompt: str, websocket):
    """Stream tokens to user with real-time guardrail checks."""

    accumulated = []
    response_stream = bedrock.invoke_model_with_response_stream(
        modelId="anthropic.claude-3-5-sonnet-20241022-v2:0",
        body=json.dumps({"messages": [{"role": "user", "content": prompt}],
                         "max_tokens": 800, "temperature": 0.3,
                         "anthropic_version": "bedrock-2023-05-31"}),
    )

    for event in response_stream["body"]:
        chunk = json.loads(event["chunk"]["bytes"])
        if chunk["type"] != "content_block_delta":
            continue

        token_text = chunk["delta"]["text"]
        accumulated.append(token_text)

        # Real-time streaming check (<5ms)
        recent_window = "".join(accumulated[-20:])  # Check last 20 tokens
        for check_name, pattern in STREAMING_BLOCKLIST.items():
            if pattern.search(recent_window):
                # HALT: replace response and stop streaming
                await websocket.send(json.dumps({
                    "type": "replace",
                    "text": "I can only help with manga-related questions on the JP Manga store.",
                }))
                log_streaming_violation(check_name, recent_window)
                return

        # Safe: forward token to user
        await websocket.send(json.dumps({"type": "token", "text": token_text}))

    # Full response post-generation guardrails
    full_response = "".join(accumulated)
    final_result = apply_post_guardrails(full_response, context)
    if not final_result.passed:
        await websocket.send(json.dumps({
            "type": "replace",
            "text": "Let me rephrase that. " + generate_safe_fallback(context),
        }))

Epic 8: Feedback Loop and Continuous Improvement

US-8.1 — User Feedback Collection Pipeline

Implementation: The chat widget sends feedback events (thumbs-up/down + optional text) via the WebSocket connection. The orchestrator writes feedback to DynamoDB (linked to session/message/trace IDs) and emits an event to Kinesis for real-time analytics. Redshift stores aggregated feedback data for trending.

Services: DynamoDB, Kinesis, Redshift, CloudWatch

import boto3

dynamodb = boto3.resource("dynamodb")
feedback_table = dynamodb.Table("manga-assist-feedback")

def record_feedback(feedback: dict):
    """Store feedback linked to the full trace context."""

    feedback_table.put_item(Item={
        "feedback_id": str(uuid.uuid4()),
        "session_id": feedback["session_id"],
        "message_id": feedback["message_id"],
        "trace_id": feedback["trace_id"],
        "value": feedback["value"],           # "thumbs_up" | "thumbs_down"
        "free_text": feedback.get("text", ""),
        "intent": feedback["intent"],
        "model_version": feedback["model_version"],
        "prompt_version": feedback["prompt_version"],
        "timestamp": datetime.utcnow().isoformat(),
        "ttl": int((datetime.utcnow() + timedelta(days=365)).timestamp()),
    })

    # Emit to Kinesis for real-time dashboards
    kinesis.put_record(
        StreamName="manga-assist-feedback",
        Data=json.dumps({
            "value": feedback["value"],
            "intent": feedback["intent"],
            "model_version": feedback["model_version"],
            "timestamp": datetime.utcnow().isoformat(),
        }),
        PartitionKey=feedback["session_id"],
    )

    # Update real-time thumbs-up/down rates in CloudWatch
    cw.put_metric_data(
        Namespace="MangaAssist/Feedback",
        MetricData=[{
            "MetricName": "ThumbsUp" if feedback["value"] == "thumbs_up" else "ThumbsDown",
            "Value": 1,
            "Unit": "Count",
            "Dimensions": [{"Name": "Intent", "Value": feedback["intent"]}],
        }],
    )

US-8.2 — Automated Error Analysis and Categorization

Implementation: A nightly Lambda collects all negative signals (thumbs-down, escalations, guardrail blocks) from the past 24 hours. It batches them through Claude Haiku with a categorization prompt. Categories are stored in Redshift. A weekly summary is posted to Slack and added to the sprint backlog.

Services: Lambda, Bedrock (Haiku), Redshift, Slack

FAILURE_CATEGORIES = [
    "wrong_intent", "bad_retrieval", "hallucination", "unhelpful_tone",
    "missing_info", "format_issue", "out_of_scope", "slow_response", "other",
]

async def categorize_failures(failures: list[dict]) -> list[dict]:
    """Use Haiku to categorize negative feedback into root cause buckets."""

    categorized = []
    for failure in failures:
        prompt = f"""Categorize this chatbot failure into exactly one category.
Categories: {', '.join(FAILURE_CATEGORIES)}

User query: {failure['user_query']}
Bot response: {failure['bot_response']}
Intent classified as: {failure['intent']}
Feedback: {failure['feedback_value']}
User comment: {failure.get('free_text', 'none')}

Return ONLY the category name, nothing else."""

        result = await call_bedrock(prompt, model="anthropic.claude-3-haiku-20240307-v1:0", max_tokens=20)
        category = result["text"].strip().lower().replace(" ", "_")

        if category not in FAILURE_CATEGORIES:
            category = "other"

        categorized.append({**failure, "failure_category": category})

    return categorized


def generate_weekly_report(categorized_failures: list[dict]) -> str:
    """Generate a summary report for the sprint review."""
    from collections import Counter

    counts = Counter(f["failure_category"] for f in categorized_failures)
    total = len(categorized_failures)

    report = "## Weekly Error Analysis Report\n\n"
    report += f"Total negative signals: {total}\n\n"
    report += "| Category | Count | % | Example |\n|---|---|---|---|\n"

    for category, count in counts.most_common():
        example = next(f for f in categorized_failures if f["failure_category"] == category)
        report += f"| {category} | {count} | {count/total:.0%} | {example['user_query'][:60]}... |\n"

    return report

US-8.3 — Retraining Trigger Automation

Implementation: A weekly Lambda checks drift metrics (from US-4.2) and quality metrics (from US-3.5). If thresholds are breached, it triggers a SageMaker Training Job with the latest labeled data. The trained model is registered in MLflow and enters the standard evaluation pipeline — it is never auto-deployed.

Services: Lambda, SageMaker Training, MLflow, S3, EventBridge

def check_and_trigger_retraining():
    """Check drift metrics and trigger retraining if needed."""

    # Check intent classifier accuracy (rolling 1K eval)
    accuracy = get_rolling_accuracy(window_days=7, sample_size=1000)

    # Check intent distribution drift
    drift_alerts = check_intent_drift(get_current_distribution())

    should_retrain = (
        accuracy < 0.88 or
        len(drift_alerts) > 0 or
        days_since_last_training() > 90  # Quarterly retrain at minimum
    )

    if should_retrain:
        # Assemble training dataset
        training_data = assemble_training_data(
            base_dataset="s3://manga-assist-data/intent-training-v3.json",
            augment_with_production=True,
            production_window_days=90,
            max_augmented_samples=2000,
        )

        # Trigger SageMaker Training Job
        sagemaker = boto3.client("sagemaker")
        sagemaker.create_training_job(
            TrainingJobName=f"intent-classifier-{datetime.utcnow().strftime('%Y%m%d')}",
            AlgorithmSpecification={
                "TrainingImage": "763104351884.dkr.ecr.us-east-1.amazonaws.com/pytorch-training:2.1-gpu",
                "TrainingInputMode": "File",
            },
            InputDataConfig=[{
                "ChannelName": "training",
                "DataSource": {"S3DataSource": {
                    "S3DataType": "S3Prefix",
                    "S3Uri": training_data["s3_uri"],
                }},
            }],
            OutputDataConfig={"S3OutputPath": "s3://manga-assist-models/intent-classifier/"},
            ResourceConfig={
                "InstanceType": "ml.g4dn.xlarge",
                "InstanceCount": 1,
                "VolumeSizeInGB": 50,
            },
            StoppingCondition={"MaxRuntimeInSeconds": 3600},
            RoleArn="arn:aws:iam::role/SageMakerTrainingRole",
        )

        return {"triggered": True, "reason": f"accuracy={accuracy:.3f}, drift_alerts={len(drift_alerts)}"}

    return {"triggered": False, "accuracy": accuracy}

US-8.4 — Production Data Flywheel

Implementation: High-confidence production conversations (thumbs-up, resolved, no escalation, guardrail pass) are candidates for training augmentation. A weekly pipeline anonymizes (strips PII via Comprehend), deduplicates, and adds them to the training pool. Low-confidence conversations become golden dataset candidates.

Services: Lambda, Comprehend (PII stripping), S3, DynamoDB

import boto3

comprehend = boto3.client("comprehend")

def production_data_flywheel():
    """Extract production conversations for training and golden dataset augmentation."""

    # High-confidence: candidates for training data
    high_conf = query_conversations(
        filters={
            "feedback": "thumbs_up",
            "guardrail_passed": True,
            "escalated": False,
            "intent_confidence": {"gte": 0.90},
        },
        window_days=90,
        limit=5000,
    )

    # Low-confidence: candidates for golden dataset
    low_conf = query_conversations(
        filters={
            "feedback": "thumbs_down",
            "escalated": True,
        },
        window_days=90,
        limit=500,
    )

    # Anonymize (strip PII)
    anonymized_training = [anonymize_conversation(c) for c in high_conf]
    anonymized_golden = [anonymize_conversation(c) for c in low_conf]

    # Deduplicate by semantic similarity
    unique_training = deduplicate_by_embedding(anonymized_training, threshold=0.95)

    # Store for human review before inclusion
    upload_to_s3(unique_training, "s3://manga-assist-data/flywheel/training-candidates/")
    upload_to_s3(anonymized_golden, "s3://manga-assist-data/flywheel/golden-candidates/")

    return {"training_candidates": len(unique_training), "golden_candidates": len(anonymized_golden)}


def anonymize_conversation(conversation: dict) -> dict:
    """Strip PII using Amazon Comprehend."""
    pii_result = comprehend.detect_pii_entities(
        Text=conversation["user_message"],
        LanguageCode="en",
    )
    redacted = conversation["user_message"]
    for entity in sorted(pii_result["Entities"], key=lambda e: e["BeginOffset"], reverse=True):
        redacted = redacted[:entity["BeginOffset"]] + f"[{entity['Type']}]" + redacted[entity["EndOffset"]:]

    return {**conversation, "user_message": redacted, "anonymized": True}

US-8.5 — A/B Testing for Model and Prompt Variants

Implementation: Reuses the experiment framework from US-2.3. Extended to support model variant splits (not just prompt variants). A Bayesian significance calculator runs nightly in Redshift. Sequential testing allows early stopping if one variant is significantly worse.

Services: AppConfig, Kinesis, Redshift, Lambda

from scipy import stats

def check_experiment_significance(experiment_id: str) -> dict:
    """Bayesian significance check for A/B experiment."""

    # Pull metrics from Redshift
    variants = query_experiment_data(experiment_id)
    control = variants["control"]
    treatment = variants["treatment"]

    results = {}
    for metric in ["conversion_rate", "thumbs_up_rate", "escalation_rate"]:
        c_vals = control[metric]
        t_vals = treatment[metric]

        # Two-sample t-test
        t_stat, p_value = stats.ttest_ind(c_vals, t_vals)
        lift = (t_vals.mean() - c_vals.mean()) / c_vals.mean()

        results[metric] = {
            "control_mean": c_vals.mean(),
            "treatment_mean": t_vals.mean(),
            "lift": f"{lift:.1%}",
            "p_value": p_value,
            "significant": p_value < 0.05,
        }

    # Early stopping: if treatment is significantly worse on any critical metric
    should_stop = any(
        r["significant"] and r["treatment_mean"] < r["control_mean"]
        for m, r in results.items()
        if m in ["conversion_rate", "thumbs_up_rate"]
    )

    return {"results": results, "early_stop_recommended": should_stop}

Epic 9: Infrastructure and Scaling Operations

US-9.1 — Predictive Auto-Scaling

Implementation: SageMaker Application Auto Scaling with a combination of target-tracking policies (steady-state) and step-scaling policies (spike response). Scheduled actions pre-scale for known events. CloudWatch alarms monitor InvocationsPerInstance.

Services: SageMaker Auto Scaling, CloudWatch, EventBridge (scheduled scaling)

import boto3

autoscaling = boto3.client("application-autoscaling")

# Register scalable target
autoscaling.register_scalable_target(
    ServiceNamespace="sagemaker",
    ResourceId="endpoint/manga-intent-classifier-prod/variant/primary",
    ScalableDimension="sagemaker:variant:DesiredInstanceCount",
    MinCapacity=2,
    MaxCapacity=20,
)

# Step scaling: aggressive scale-up on spikes
autoscaling.put_scaling_policy(
    PolicyName="intent-classifier-step-up",
    ServiceNamespace="sagemaker",
    ResourceId="endpoint/manga-intent-classifier-prod/variant/primary",
    ScalableDimension="sagemaker:variant:DesiredInstanceCount",
    PolicyType="StepScaling",
    StepScalingPolicyConfiguration={
        "AdjustmentType": "ChangeInCapacity",
        "StepAdjustments": [
            {"MetricIntervalLowerBound": 0, "MetricIntervalUpperBound": 300, "ScalingAdjustment": 2},
            {"MetricIntervalLowerBound": 300, "ScalingAdjustment": 4},
        ],
        "Cooldown": 120,
    },
)

# Scheduled pre-scaling for known events (e.g., One Piece chapter release)
autoscaling.put_scheduled_action(
    ServiceNamespace="sagemaker",
    ScheduledActionName="one-piece-chapter-release",
    ResourceId="endpoint/manga-intent-classifier-prod/variant/primary",
    ScalableDimension="sagemaker:variant:DesiredInstanceCount",
    Schedule="cron(0 8 ? * SUN *)",  # Every Sunday 8 AM UTC
    ScalableTargetAction={"MinCapacity": 8, "MaxCapacity": 20},
)

US-9.2 — Bedrock Provisioned Throughput Management

Implementation: Bedrock provisioned throughput is committed at the P95 traffic level. On-demand handles the remaining 5% burst. A monthly Lambda reviews the provisioned vs. on-demand cost split and recommends adjustment. Alerts fire if on-demand exceeds 20% of total.

Services: Bedrock (Provisioned Throughput), CloudWatch, Lambda

def review_provisioned_throughput():
    """Monthly review of Bedrock provisioned vs on-demand usage."""

    # Query CloudWatch for last 30 days
    cw = boto3.client("cloudwatch")

    provisioned_invocations = get_metric_sum("AWS/Bedrock", "Invocations",
        dimensions={"ModelId": "anthropic.claude-3-5-sonnet", "InferenceType": "Provisioned"},
        period_days=30)

    on_demand_invocations = get_metric_sum("AWS/Bedrock", "Invocations",
        dimensions={"ModelId": "anthropic.claude-3-5-sonnet", "InferenceType": "OnDemand"},
        period_days=30)

    total = provisioned_invocations + on_demand_invocations
    on_demand_pct = on_demand_invocations / total if total > 0 else 0

    recommendation = {
        "provisioned_pct": f"{1 - on_demand_pct:.1%}",
        "on_demand_pct": f"{on_demand_pct:.1%}",
        "action": "increase_provisioned" if on_demand_pct > 0.20 else "maintain",
    }

    if on_demand_pct > 0.20:
        send_alert(f"Bedrock on-demand usage at {on_demand_pct:.0%} — consider increasing provisioned throughput")

    return recommendation

US-9.3 — Multi-Region Inference Deployment

Implementation: Inference endpoints are deployed to both us-east-1 and ap-northeast-1 via CDK stacks. Route 53 latency-based routing directs requests to the nearest region. Health checks enable automatic failover. Model versions are synchronized via a cross-region deployment pipeline.

Services: Route 53, SageMaker (multi-region), CDK, CodePipeline

# CDK: Multi-region endpoint deployment
from aws_cdk import Stack, aws_route53 as r53

class MultiRegionInferenceStack(Stack):
    def __init__(self, scope, id, regions: list[str], **kwargs):
        super().__init__(scope, id, **kwargs)

        for region in regions:
            # Deploy SageMaker endpoint in each region
            endpoint = deploy_sagemaker_endpoint(
                self, f"intent-classifier-{region}",
                region=region,
                model_data="s3://manga-assist-models/intent-classifier/latest/model.tar.gz",
                instance_type="ml.inf1.xlarge",
                min_instances=2,
            )

        # Route 53 latency-based routing
        hosted_zone = r53.HostedZone.from_lookup(self, "Zone", domain_name="manga-assist.internal")

        for region in regions:
            r53.CnameRecord(self, f"Latency-{region}",
                zone=hosted_zone,
                record_name="inference",
                domain_name=f"inference-{region}.manga-assist.internal",
                ttl=Duration.seconds(60),
            ).node.default_child.add_property_override("Region", region)
            # SetIdentifier and region properties enable latency routing

Epic 10: Compliance, Audit, and Reproducibility

US-10.1 — Full Request Lineage and Reproducibility

Implementation: Every response is logged to S3 (via Kinesis Firehose) with the complete input context: user message, assembled prompt hash, model version, prompt version, retrieved chunk IDs, raw LLM output, and post-guardrail output. A replay tool can re-run any historical request.

Services: Kinesis Firehose, S3, Lambda, DynamoDB

import hashlib

def log_request_lineage(request: ChatRequest, prompt: str, response: ChatResponse, trace_id: str):
    """Log complete lineage for reproducibility and audit."""

    lineage = {
        "trace_id": trace_id,
        "timestamp": datetime.utcnow().isoformat(),
        "user_message": request.message,
        "prompt_hash": hashlib.sha256(prompt.encode()).hexdigest(),
        "model_version": "claude-3.5-sonnet-20241022-v2",
        "prompt_version": "recommendation-v2.3",
        "intent": response.intent,
        "retrieved_chunk_ids": [c["chunk_id"] for c in response.rag_chunks],
        "raw_llm_output": response.raw_text,
        "post_guardrail_output": response.text,
        "guardrail_passed": response.guardrail_passed,
        "input_tokens": response.input_tokens,
        "output_tokens": response.output_tokens,
    }

    # Write to Kinesis Firehose → S3 (Parquet)
    firehose.put_record(
        DeliveryStreamName="manga-assist-lineage",
        Record={"Data": json.dumps(lineage)},
    )


def replay_request(trace_id: str) -> dict:
    """Replay a historical request to verify reproducibility."""

    # Fetch original lineage
    original = fetch_lineage_from_s3(trace_id)

    # Re-run with same inputs
    replayed = invoke_pipeline_with_fixed_context(
        message=original["user_message"],
        model_version=original["model_version"],
        prompt_version=original["prompt_version"],
        chunk_ids=original["retrieved_chunk_ids"],
    )

    return {
        "original_output": original["post_guardrail_output"],
        "replayed_output": replayed.text,
        "match": original["post_guardrail_output"] == replayed.text,
        "bertscore_similarity": compute_bertscore(original["post_guardrail_output"], replayed.text),
    }

US-10.2 — Model and Prompt Change Audit Trail

Implementation: CloudTrail logs all AWS API calls. MLflow tracks all model and prompt version transitions. A custom audit Lambda enriches these events with commit hashes, ticket IDs, and eval results, and writes them to a DynamoDB audit table (append-only, no delete permissions).

Services: CloudTrail, MLflow, DynamoDB, Lambda

def write_audit_entry(change: dict):
    """Write an immutable audit entry for a model or prompt change."""

    audit_table.put_item(Item={
        "audit_id": str(uuid.uuid4()),
        "timestamp": datetime.utcnow().isoformat(),
        "change_type": change["type"],        # "model_deploy" | "prompt_update" | "guardrail_change"
        "component": change["component"],      # "intent-classifier" | "recommendation-prompt"
        "old_version": change["old_version"],
        "new_version": change["new_version"],
        "changed_by": change["author"],
        "commit_hash": change.get("commit_hash"),
        "ticket_id": change.get("ticket_id"),  # JIRA / issue link
        "eval_run_id": change.get("eval_run_id"),
        "eval_passed": change.get("eval_passed"),
        "reason": change["reason"],
    })

    # IAM policy: no dynamodb:DeleteItem on this table

US-10.3 — PII Handling and Data Retention Compliance

Implementation: Amazon Comprehend detects PII in all stored conversation data. PII is redacted before writing to S3 or Redshift. Raw conversation data in DynamoDB has a 90-day TTL. Training data passes through a PII-stripping Lambda. User deletion requests trigger a targeted purge across DynamoDB, S3, and Redshift.

Services: Comprehend, DynamoDB (TTL), S3 Lifecycle, Lambda, Redshift

import boto3

comprehend = boto3.client("comprehend")

PII_TYPES_TO_REDACT = [
    "NAME", "ADDRESS", "EMAIL", "PHONE", "SSN",
    "CREDIT_DEBIT_NUMBER", "BANK_ACCOUNT_NUMBER", "DATE_TIME",
]

def redact_pii(text: str) -> str:
    """Detect and redact PII using Amazon Comprehend."""
    result = comprehend.detect_pii_entities(Text=text, LanguageCode="en")

    redacted = text
    for entity in sorted(result["Entities"], key=lambda e: e["BeginOffset"], reverse=True):
        if entity["Type"] in PII_TYPES_TO_REDACT and entity["Score"] > 0.80:
            redacted = (redacted[:entity["BeginOffset"]] +
                        f"[REDACTED_{entity['Type']}]" +
                        redacted[entity["EndOffset"]:])
    return redacted


def handle_deletion_request(customer_id: str):
    """GDPR/CCPA: purge all data for a customer within 30 days."""

    # 1. DynamoDB: delete all sessions
    sessions = query_sessions_by_customer(customer_id)
    for session in sessions:
        sessions_table.delete_item(Key={"session_id": session["session_id"]})
        feedback_table.delete_item(Key={"session_id": session["session_id"]})

    # 2. S3: mark lineage records for deletion
    mark_s3_objects_for_deletion(customer_id, bucket="manga-assist-lineage")

    # 3. Redshift: delete analytics rows
    redshift_query(f"DELETE FROM manga_assist.conversations WHERE customer_id = %s", [customer_id])
    redshift_query(f"DELETE FROM manga_assist.feedback WHERE customer_id = %s", [customer_id])

    # 4. Log deletion for compliance audit
    write_audit_entry({
        "type": "data_deletion",
        "component": "customer_data",
        "old_version": customer_id,
        "new_version": "DELETED",
        "author": "automated_deletion_pipeline",
        "reason": "Customer data deletion request (GDPR/CCPA)",
    })