LOCAL PREVIEW View on GitHub

09. Training Infrastructure and MLOps — End-to-End Fine-Tuning Pipeline

Problem Statement and MangaAssist Context

MangaAssist fine-tunes 5 models across the inference pipeline: intent classifier, embedding adapter, cross-encoder reranker, sentiment classifier, and (optionally) a LoRA-adapted LLM. Each model has a different training cadence — the intent classifier retrains weekly with new labeled data, the embedding adapter retrains monthly, and the LLM adapter retrains quarterly. This document covers the distributed training infrastructure, mixed-precision optimization, experiment tracking, CI/CD for model deployment, and the end-to-end MLOps pipeline on AWS SageMaker.

The Operational Challenge

Model Size Training Frequency GPU Hours/Train Monthly GPU Cost
DistilBERT intent 66M Weekly 0.5h (T4) $8
Embedding adapter 12M Monthly 2h (T4) $3
ms-marco reranker 22M Monthly 1.5h (T4) $2.50
Sentiment classifier 66M Weekly 0.5h (T4) $8
Llama 3 LoRA (70B base) 168M trainable Quarterly 48h (4×A100) $192
Total ~$214/month

Without proper infrastructure, each training run is a manual notebook execution with inconsistent environments, no experiment tracking, and error-prone deployment. The goal: fully automated, reproducible, gated training pipelines.


Mathematical Foundations

Distributed Stochastic Gradient Descent

When a model exceeds single-GPU memory or training time is too long, we distribute across $P$ GPUs (workers). Data-parallel SGD is the most common approach for our model sizes:

Standard SGD update:

$$\boldsymbol{\theta}{t+1} = \boldsymbol{\theta}_t - \eta \nabla{\boldsymbol{\theta}} \mathcal{L}(\boldsymbol{\theta}_t; \mathcal{B}_t)$$

Data-parallel SGD with $P$ workers:

Each worker $p$ computes a gradient on its local mini-batch $\mathcal{B}_t^{(p)}$:

$$\mathbf{g}t^{(p)} = \nabla{\boldsymbol{\theta}} \mathcal{L}(\boldsymbol{\theta}_t; \mathcal{B}_t^{(p)})$$

The gradients are averaged across all $P$ workers (all-reduce):

$$\bar{\mathbf{g}}t = \frac{1}{P} \sum{p=1}^{P} \mathbf{g}_t^{(p)}$$

Then all workers apply the same update:

$$\boldsymbol{\theta}_{t+1} = \boldsymbol{\theta}_t - \eta \bar{\mathbf{g}}_t$$

Effective batch size: If each worker processes a local batch of size $B$, the effective batch size is $B_{\text{eff}} = P \times B$.

Convergence rate of distributed SGD:

For convex objectives with $L$-Lipschitz gradients and gradient variance $\sigma^2$:

$$\mathbb{E}[|\nabla \mathcal{L}(\boldsymbol{\theta}_T)|^2] \leq \mathcal{O}\left(\frac{L}{T} + \frac{\sigma}{\sqrt{PT}}\right)$$

The $1/\sqrt{P}$ scaling in the variance term means that doubling workers reduces the stochastic noise by $\sqrt{2}$, not by 2. This is the "linear speedup" regime: $P$ workers achieve $P\times$ speedup if the communication overhead is negligible.

Linear Scaling Rule

When scaling from 1 GPU to $P$ GPUs, the effective batch size increases $P\times$. Goyal et al. (2017) showed that learning rate should scale proportionally:

$$\eta_P = \eta_1 \times P$$

Why? With batch size $B_{\text{eff}} = PB$, each gradient step averages $P\times$ more samples, reducing noise. A larger learning rate compensates by taking larger steps per update.

Warmup is essential: Jumping to a large LR at initialization causes divergence. The warmup linearly ramps the LR over the first 5% of training:

$$\eta_t = \eta_P \times \frac{t}{T_{\text{warmup}}}$$

Workers ($P$) Batch Size LR ($\eta$) Warmup Steps Convergence
1 32 3e-5 100 Baseline
2 64 6e-5 100 ~1.9× faster
4 128 1.2e-4 200 ~3.7× faster
8 256 2.4e-4 400 ~7.1× faster

Speedup is sub-linear (3.7× with 4 GPUs, not 4×) due to communication overhead.

Mixed Precision Training

Mixed precision (Micikevicius et al., 2017) uses FP16 for most computations and FP32 for critical accumulations, halving memory and doubling throughput.

FP16 representation: - Range: $\pm 65,504$ (vs FP32: $\pm 3.4 \times 10^{38}$) - Smallest positive: $6 \times 10^{-8}$ (vs FP32: $1.2 \times 10^{-38}$) - Precision: 3-4 significant digits (vs FP32: 7-8)

The three components:

  1. FP16 forward and backward pass: Activations and gradients computed in FP16 (2× less memory, 2× faster on Tensor Cores)

  2. FP32 master weights: Maintain FP32 copy of weights for the optimizer update. Small gradients (< $6 \times 10^{-8}$) would underflow in FP16 — the FP32 copy preserves them:

$$\boldsymbol{\theta}{\text{FP32}} \leftarrow \boldsymbol{\theta}{\text{FP32}} - \eta \cdot \mathbf{g}_{\text{FP32}}$$

  1. Loss scaling: Scale the loss by a factor $S$ before backprop to shift gradient magnitudes into FP16's representable range:

$$\mathcal{L}{\text{scaled}} = S \cdot \mathcal{L} \implies \mathbf{g}{\text{scaled}} = S \cdot \mathbf{g}$$

After backprop, unscale: $\mathbf{g} = \mathbf{g}_{\text{scaled}} / S$.

Dynamic loss scaling: Start with $S = 2^{15}$. If no overflow, double $S$ every 2000 steps. If overflow (gradient contains Inf/NaN), halve $S$ and skip the update.

Memory reduction (DistilBERT example):

Component FP32 (bytes) Mixed Precision (bytes) Savings
Model parameters 264MB 132MB (FP16) + 264MB (FP32 master) +132MB overhead
Activations (batch=32) 1.2GB 600MB (FP16) 50%
Gradients 264MB 132MB (FP16) 50%
Optimizer states (Adam) 528MB 528MB (FP32) 0%
Total 2.26GB 1.66GB 27%

The activation memory savings (50%) dominate. For larger models (Llama 3 70B), activation savings enable fitting 2× longer sequences or 2× larger batches.

ZeRO — Zero Redundancy Optimizer

ZeRO (Rajbhandari et al., 2019) partitions optimizer states, gradients, and parameters across GPUs, eliminating redundant copies.

Standard data-parallel: Each GPU stores a complete copy of: 1. Model parameters: $\Psi$ bytes 2. Gradients: $\Psi$ bytes 3. Optimizer states (Adam): $2\Psi$ bytes (momentum + variance)

Total per GPU: $4\Psi$ — redundant across all $P$ GPUs.

ZeRO stages:

Stage Partitions Per-GPU Memory Communication
ZeRO-1 Optimizer states $\Psi + \Psi + 2\Psi/P$ Same as data-parallel
ZeRO-2 + Gradients $\Psi + \Psi/P + 2\Psi/P$ 1.5× data-parallel
ZeRO-3 + Parameters $\Psi/P + \Psi/P + 2\Psi/P = 4\Psi/P$ 1.5× data-parallel

Llama 3 70B example (FP16 parameters = 140GB):

Config Per-GPU Memory GPUs Needed Communication
Standard data-parallel 560GB Impossible (A100=80GB)
ZeRO-1 (4 GPUs) 280GB + 70GB = 350GB Impossible
ZeRO-3 (8 GPUs) 70GB 8 × A100 ✅ 1.5× standard
ZeRO-3 (16 GPUs) 35GB 16 × A100 ✅✅ 1.5× standard

ZeRO-3 makes Llama 3 70B training feasible on 8 A100s by partitioning all state across devices.

Gradient Accumulation — Simulating Large Batches

When GPU memory limits the per-GPU batch size, gradient accumulation simulates a larger effective batch:

$$\bar{\mathbf{g}} = \frac{1}{A} \sum_{a=1}^{A} \nabla_{\boldsymbol{\theta}} \mathcal{L}(\boldsymbol{\theta}; \mathcal{B}_a)$$

With accumulation steps $A$, effective batch = $A \times B_{\text{local}}$. The optimizer step occurs every $A$ forward/backward passes.

Important: Normalize by $A$, not just sum. Many frameworks divide each mini-batch loss by $B_{\text{local}}$ but not by $A$, so the effective learning rate scales with $A$ unless you manually correct.


Infrastructure Diagrams

All-Reduce Communication Pattern

graph TB
    subgraph "Data-Parallel Training (P=4 GPUs)"
        subgraph "GPU 0"
            D0["Data shard 0<br>(batch 0-31)"]
            G0["Compute g₀"]
        end
        subgraph "GPU 1"
            D1["Data shard 1<br>(batch 32-63)"]
            G1["Compute g₁"]
        end
        subgraph "GPU 2"
            D2["Data shard 2<br>(batch 64-95)"]
            G2["Compute g₂"]
        end
        subgraph "GPU 3"
            D3["Data shard 3<br>(batch 96-127)"]
            G3["Compute g₃"]
        end
    end

    G0 & G1 & G2 & G3 --> AR["Ring All-Reduce<br>ḡ = (g₀ + g₁ + g₂ + g₃) / 4<br><br>Communication: 2(P-1)/P × Ψ bytes<br>= 1.5Ψ for P=4"]

    AR --> U0["GPU 0: θ ← θ - η·ḡ"]
    AR --> U1["GPU 1: θ ← θ - η·ḡ"]
    AR --> U2["GPU 2: θ ← θ - η·ḡ"]
    AR --> U3["GPU 3: θ ← θ - η·ḡ"]

    style AR fill:#fff9c4

ZeRO-3 Memory Partitioning

graph TB
    subgraph "ZeRO-3: Each GPU holds 1/P of everything"
        subgraph "GPU 0 (Shard 0: layers 0-1)"
            P0["Params: layers 0-1<br>17.5GB"]
            G0z["Grads: layers 0-1<br>17.5GB"]
            O0["Adam states: layers 0-1<br>35GB"]
            M0["Total: 70GB ✅<br>(fits A100 80GB)"]
        end
        subgraph "GPU 1 (Shard 1: layers 2-3)"
            P1["Params: layers 2-3<br>17.5GB"]
            G1z["Grads: layers 2-3<br>17.5GB"]
            O1["Adam states: layers 2-3<br>35GB"]
            M1["Total: 70GB ✅"]
        end
        subgraph "GPU 2 (Shard 2: layers 4-5)"
            P2["Params: layers 4-5<br>17.5GB"]
            G2z["Grads: layers 4-5<br>17.5GB"]
            O2["Adam states: layers 4-5<br>35GB"]
            M2["Total: 70GB ✅"]
        end
        subgraph "GPU 3 (Shard 3: head + embed)"
            P3["Params: embed+head<br>17.5GB"]
            G3z["Grads: embed+head<br>17.5GB"]
            O3["Adam states: embed+head<br>35GB"]
            M3["Total: 70GB ✅"]
        end
    end

    subgraph "Communication During Forward Pass"
        FWD["GPU 0 needs layer 2 params<br>→ All-gather from GPU 1<br>→ Use for computation<br>→ Discard after use"]
    end

    style M0 fill:#c8e6c9
    style M1 fill:#c8e6c9
    style M2 fill:#c8e6c9
    style M3 fill:#c8e6c9
    style FWD fill:#fff9c4

Mixed Precision Training Flow

sequenceDiagram
    participant FP32 as FP32 Master Weights
    participant FP16W as FP16 Weights (copy)
    participant FWD as Forward Pass (FP16)
    participant LOSS as Loss + Scaling
    participant BWD as Backward Pass (FP16)
    participant OPT as Optimizer (FP32)

    FP32->>FP16W: Cast θ_FP32 → θ_FP16
    FP16W->>FWD: Forward pass in FP16<br>(2× faster on Tensor Cores)
    FWD->>LOSS: Compute L (FP16)
    LOSS->>LOSS: Scale: L_scaled = S × L<br>(S = 2^15 initially)
    LOSS->>BWD: Backprop L_scaled → g_scaled (FP16)

    alt No overflow in gradients
        BWD->>OPT: Unscale: g = g_scaled / S (→ FP32)
        OPT->>FP32: θ_FP32 ← θ_FP32 - η·g<br>(Adam update in FP32)
        Note over LOSS: Every 2000 steps:<br>S ← S × 2 (grow scale)
    else Overflow detected (Inf/NaN)
        BWD-->>LOSS: Skip optimizer step
        Note over LOSS: S ← S / 2 (shrink scale)
    end

End-to-End MLOps Pipeline

graph TB
    subgraph "Data Pipeline"
        S3["S3: Raw customer messages"]
        LABEL["SageMaker Ground Truth<br>Human labeling"]
        S3 --> LABEL
        LABEL --> SPLIT["Train/Val/Test split<br>(80/10/10, stratified)"]
    end

    subgraph "Experiment Tracking"
        SPLIT --> TRAIN["SageMaker Training Job"]
        TRAIN --> MLFLOW["MLflow on ECS Fargate<br>- Hyperparams logged<br>- Metrics tracked<br>- Artifacts stored"]
    end

    subgraph "Training Configuration"
        CONFIG["training_config.yaml<br>- model_name<br>- hyperparameters<br>- instance_type<br>- data paths"]
        CONFIG --> TRAIN
    end

    subgraph "Evaluation Gate"
        MLFLOW --> GATE{"Quality Gate<br>F1 ≥ threshold?<br>Latency ≤ SLA?<br>No regression?"}
    end

    subgraph "Model Registry"
        GATE -->|Pass| REG["SageMaker Model Registry<br>Version: v1.2.3<br>Stage: Staging"]
        GATE -->|Fail| ALERT["SNS Alert → Slack<br>Training failed quality gate"]
    end

    subgraph "Deployment"
        REG --> SHADOW["Shadow deployment<br>1% traffic, compare<br>with production"]
        SHADOW --> CANARY{"Canary metrics<br>OK for 1 hour?"}
        CANARY -->|Yes| PROD["Production endpoint<br>100% traffic"]
        CANARY -->|No| ROLLBACK["Rollback to previous<br>version"]
    end

    style GATE fill:#fff9c4
    style CANARY fill:#fff9c4
    style PROD fill:#c8e6c9

Training Cadence Calendar

gantt
    title MangaAssist Model Training Schedule
    dateFormat  YYYY-MM-DD
    section Intent Classifier
    Weekly retrain W1     :w1, 2024-01-01, 1d
    Weekly retrain W2     :w2, 2024-01-08, 1d
    Weekly retrain W3     :w3, 2024-01-15, 1d
    Weekly retrain W4     :w4, 2024-01-22, 1d
    section Sentiment
    Weekly retrain W1     :s1, 2024-01-02, 1d
    Weekly retrain W2     :s2, 2024-01-09, 1d
    Weekly retrain W3     :s3, 2024-01-16, 1d
    Weekly retrain W4     :s4, 2024-01-23, 1d
    section Embedding Adapter
    Monthly retrain       :e1, 2024-01-15, 2d
    section Reranker
    Monthly retrain       :r1, 2024-01-17, 2d
    section LLM LoRA
    Quarterly retrain     :l1, 2024-01-20, 5d

Implementation Deep-Dive

SageMaker Training Pipeline

import sagemaker
from sagemaker.huggingface import HuggingFace
from sagemaker.workflow.pipeline import Pipeline
from sagemaker.workflow.steps import TrainingStep, ProcessingStep
from sagemaker.workflow.conditions import ConditionGreaterThanOrEqualTo
from sagemaker.workflow.condition_step import ConditionStep
from sagemaker.workflow.parameters import ParameterString, ParameterFloat


def build_training_pipeline(
    model_type: str,   # "intent", "sentiment", "embedding", "reranker", "llm_lora"
    role: str = None,
):
    """
    Build SageMaker Pipeline for model training with quality gates.
    """
    role = role or sagemaker.get_execution_role()

    # Pipeline parameters
    train_data = ParameterString(name="TrainDataUri")
    val_data = ParameterString(name="ValidationDataUri")
    min_f1 = ParameterFloat(name="MinF1Threshold", default_value=0.85)

    # Model-specific configuration
    configs = {
        "intent": {
            "instance_type": "ml.g4dn.xlarge",
            "instance_count": 1,
            "hyperparameters": {
                "model_name": "distilbert-base-uncased",
                "num_epochs": 10,
                "batch_size": 32,
                "lr": 3e-5,
                "focal_gamma": 2.0,
                "use_ewc": True,
                "ewc_lambda": 200,
            },
            "max_runtime": 3600,  # 1 hour
        },
        "sentiment": {
            "instance_type": "ml.g4dn.xlarge",
            "instance_count": 1,
            "hyperparameters": {
                "model_name": "distilbert-base-uncased",
                "num_epochs": 8,
                "batch_size": 32,
                "lr": 3e-5,
                "gradual_unfreezing": True,
                "focal_gamma": 2.0,
            },
            "max_runtime": 3600,
        },
        "llm_lora": {
            "instance_type": "ml.p4d.24xlarge",  # 8 × A100
            "instance_count": 1,
            "hyperparameters": {
                "model_name": "meta-llama/Llama-3-70b-hf",
                "lora_r": 16,
                "lora_alpha": 32,
                "num_epochs": 3,
                "batch_size": 4,
                "gradient_accumulation": 8,
                "use_qlora": True,
                "use_fp16": True,
            },
            "max_runtime": 86400,  # 24 hours
        },
    }

    config = configs[model_type]

    # Training step
    estimator = HuggingFace(
        entry_point=f"train_{model_type}.py",
        source_dir="./src/training/",
        instance_type=config["instance_type"],
        instance_count=config["instance_count"],
        role=role,
        transformers_version="4.37",
        pytorch_version="2.1",
        py_version="py310",
        hyperparameters=config["hyperparameters"],
        max_run=config["max_runtime"],
        metric_definitions=[
            {"Name": "val:f1_macro", "Regex": "val_f1_macro=([0-9.]+)"},
            {"Name": "val:loss", "Regex": "val_loss=([0-9.]+)"},
            {"Name": "val:latency_p50", "Regex": "latency_p50=([0-9.]+)"},
        ],
    )

    training_step = TrainingStep(
        name=f"Train-{model_type}",
        estimator=estimator,
        inputs={
            "train": sagemaker.inputs.TrainingInput(train_data),
            "validation": sagemaker.inputs.TrainingInput(val_data),
        },
    )

    # Quality gate: only register if F1 >= threshold
    condition = ConditionGreaterThanOrEqualTo(
        left=training_step.properties.FinalMetricDataList[0].Value,
        right=min_f1,
    )

    register_step = training_step.add_depends_on([])  # Placeholder for model registry

    quality_gate = ConditionStep(
        name=f"QualityGate-{model_type}",
        conditions=[condition],
        if_steps=[register_step],
        else_steps=[],  # Fail pipeline
    )

    pipeline = Pipeline(
        name=f"MangaAssist-{model_type}-training",
        parameters=[train_data, val_data, min_f1],
        steps=[training_step, quality_gate],
    )

    return pipeline

MLflow Experiment Tracking Integration

import mlflow
from mlflow.tracking import MlflowClient
import json


class ExperimentTracker:
    """MLflow integration for SageMaker training jobs."""

    def __init__(self, experiment_name: str, tracking_uri: str):
        mlflow.set_tracking_uri(tracking_uri)
        mlflow.set_experiment(experiment_name)
        self.client = MlflowClient()

    def log_training_run(
        self,
        model_type: str,
        hyperparams: dict,
        metrics: dict,
        artifacts_path: str,
        tags: dict = None,
    ):
        with mlflow.start_run(run_name=f"{model_type}-{self._timestamp()}"):
            # Log hyperparameters
            mlflow.log_params(hyperparams)

            # Log metrics
            for key, value in metrics.items():
                mlflow.log_metric(key, value)

            # Log per-label metrics (for multi-label models)
            if "per_label" in metrics:
                for label, label_metrics in metrics["per_label"].items():
                    for metric_name, value in label_metrics.items():
                        mlflow.log_metric(f"{label}_{metric_name}", value)

            # Log model artifacts
            mlflow.log_artifacts(artifacts_path)

            # Log training config as artifact
            mlflow.log_dict(hyperparams, "training_config.json")

            # Tags for filtering
            default_tags = {
                "model_type": model_type,
                "framework": "pytorch",
                "project": "mangaassist",
            }
            if tags:
                default_tags.update(tags)
            mlflow.set_tags(default_tags)

    def compare_with_production(
        self,
        model_type: str,
        new_metrics: dict,
    ) -> dict:
        """Compare new model metrics with current production model."""
        # Get current production run
        runs = self.client.search_runs(
            experiment_ids=[mlflow.get_experiment_by_name(
                f"mangaassist-{model_type}"
            ).experiment_id],
            filter_string="tags.stage = 'production'",
            order_by=["start_time DESC"],
            max_results=1,
        )

        if not runs:
            return {"decision": "deploy", "reason": "no production model"}

        prod_metrics = runs[0].data.metrics
        comparison = {}

        for key in new_metrics:
            if key in prod_metrics:
                delta = new_metrics[key] - prod_metrics[key]
                comparison[key] = {
                    "production": prod_metrics[key],
                    "new": new_metrics[key],
                    "delta": delta,
                    "improved": delta > 0,
                }

        # Decision logic
        f1_improved = comparison.get("f1_macro", {}).get("delta", 0) >= 0.01
        no_regression = all(
            v.get("delta", 0) >= -0.02
            for k, v in comparison.items()
            if k.startswith("frustration")
        )

        decision = "deploy" if (f1_improved and no_regression) else "reject"
        return {
            "decision": decision,
            "comparison": comparison,
        }

    def _timestamp(self):
        from datetime import datetime
        return datetime.now().strftime("%Y%m%d-%H%M%S")

Distributed Training Configuration

from dataclasses import dataclass
from transformers import TrainingArguments


@dataclass
class DistributedTrainingConfig:
    """Configuration for distributed training with mixed precision."""

    # Model
    model_name: str = "distilbert-base-uncased"

    # Data
    max_seq_length: int = 128
    train_batch_size: int = 32
    eval_batch_size: int = 64

    # Optimization
    learning_rate: float = 3e-5
    weight_decay: float = 0.01
    num_epochs: int = 10
    warmup_ratio: float = 0.05
    gradient_accumulation_steps: int = 1

    # Mixed precision
    fp16: bool = True
    bf16: bool = False  # Use on A100s

    # Distributed
    ddp: bool = False  # Data-parallel
    deepspeed_config: str = None  # Path to DeepSpeed ZeRO config

    # Evaluation
    eval_strategy: str = "epoch"
    save_strategy: str = "epoch"
    load_best_model_at_end: bool = True
    metric_for_best_model: str = "f1_macro"

    def to_hf_training_args(self, output_dir: str) -> TrainingArguments:
        """Convert to HuggingFace TrainingArguments."""
        return TrainingArguments(
            output_dir=output_dir,
            per_device_train_batch_size=self.train_batch_size,
            per_device_eval_batch_size=self.eval_batch_size,
            learning_rate=self.learning_rate,
            weight_decay=self.weight_decay,
            num_train_epochs=self.num_epochs,
            warmup_ratio=self.warmup_ratio,
            gradient_accumulation_steps=self.gradient_accumulation_steps,
            fp16=self.fp16,
            bf16=self.bf16,
            eval_strategy=self.eval_strategy,
            save_strategy=self.save_strategy,
            load_best_model_at_end=self.load_best_model_at_end,
            metric_for_best_model=self.metric_for_best_model,
            greater_is_better=True,
            deepspeed=self.deepspeed_config,
            ddp_find_unused_parameters=False,
            dataloader_num_workers=4,
            report_to=["mlflow"],
        )


# DeepSpeed ZeRO-2 config for multi-GPU training
DEEPSPEED_ZERO2_CONFIG = {
    "zero_optimization": {
        "stage": 2,
        "allgather_partitions": True,
        "allgather_bucket_size": 2e8,
        "overlap_comm": True,
        "reduce_scatter": True,
        "reduce_bucket_size": 2e8,
        "contiguous_gradients": True,
    },
    "fp16": {
        "enabled": True,
        "loss_scale": 0,
        "loss_scale_window": 1000,
        "initial_scale_power": 16,
        "hysteresis": 2,
        "min_loss_scale": 1,
    },
    "optimizer": {
        "type": "AdamW",
        "params": {
            "lr": 3e-5,
            "betas": [0.9, 0.999],
            "eps": 1e-8,
            "weight_decay": 0.01,
        },
    },
    "scheduler": {
        "type": "WarmupDecayLR",
        "params": {
            "warmup_min_lr": 0,
            "warmup_max_lr": 3e-5,
            "warmup_num_steps": 100,
            "total_num_steps": 10000,
        },
    },
    "gradient_clipping": 1.0,
    "train_batch_size": "auto",
    "train_micro_batch_size_per_gpu": "auto",
}

CI/CD Model Deployment

import boto3
import json


class ModelDeployer:
    """
    Automated deployment pipeline with shadow testing and canary rollout.
    """

    def __init__(self, region: str = "us-east-1"):
        self.sm_client = boto3.client("sagemaker", region_name=region)
        self.cw_client = boto3.client("cloudwatch", region_name=region)

    def deploy_shadow(
        self,
        model_name: str,
        model_data_uri: str,
        endpoint_name: str,
    ):
        """
        Deploy new model as shadow variant receiving 0% traffic.
        Production variant still serves 100%.
        """
        # Create model
        self.sm_client.create_model(
            ModelName=f"{model_name}-shadow",
            PrimaryContainer={
                "Image": self._get_inference_image(),
                "ModelDataUrl": model_data_uri,
            },
            ExecutionRoleArn=self._get_role(),
        )

        # Update endpoint with shadow variant
        self.sm_client.update_endpoint(
            EndpointName=endpoint_name,
            EndpointConfigName=f"{endpoint_name}-shadow-config",
        )

    def promote_canary(
        self,
        endpoint_name: str,
        canary_percent: int = 5,
        observation_minutes: int = 60,
    ) -> bool:
        """
        Shift canary_percent traffic to new model.
        Monitor for observation_minutes, rollback if metrics degrade.
        """
        # Update traffic split
        self.sm_client.update_endpoint_weights_and_capacities(
            EndpointName=endpoint_name,
            DesiredWeightsAndCapacities=[
                {"VariantName": "production", "DesiredWeight": 100 - canary_percent},
                {"VariantName": "canary", "DesiredWeight": canary_percent},
            ],
        )

        # Monitor metrics
        import time
        start = time.time()
        while (time.time() - start) < observation_minutes * 60:
            metrics = self._get_canary_metrics(endpoint_name)

            if metrics["error_rate"] > 0.05:      # >5% errors
                self._rollback(endpoint_name)
                return False
            if metrics["latency_p99"] > 0.015:     # >15ms
                self._rollback(endpoint_name)
                return False

            time.sleep(60)  # Check every minute

        return True

    def promote_full(self, endpoint_name: str):
        """Promote canary to 100% traffic."""
        self.sm_client.update_endpoint_weights_and_capacities(
            EndpointName=endpoint_name,
            DesiredWeightsAndCapacities=[
                {"VariantName": "canary", "DesiredWeight": 100},
            ],
        )

    def _rollback(self, endpoint_name: str):
        """Rollback to production variant."""
        self.sm_client.update_endpoint_weights_and_capacities(
            EndpointName=endpoint_name,
            DesiredWeightsAndCapacities=[
                {"VariantName": "production", "DesiredWeight": 100},
                {"VariantName": "canary", "DesiredWeight": 0},
            ],
        )

    def _get_canary_metrics(self, endpoint_name: str) -> dict:
        """Fetch real-time metrics for canary variant."""
        response = self.cw_client.get_metric_data(
            MetricDataQueries=[
                {
                    "Id": "error_rate",
                    "MetricStat": {
                        "Metric": {
                            "Namespace": "AWS/SageMaker",
                            "MetricName": "Invocation5XXErrors",
                            "Dimensions": [
                                {"Name": "EndpointName", "Value": endpoint_name},
                                {"Name": "VariantName", "Value": "canary"},
                            ],
                        },
                        "Period": 300,
                        "Stat": "Average",
                    },
                },
            ],
            StartTime=self._minutes_ago(5),
            EndTime=self._now(),
        )
        return {"error_rate": response["MetricDataResults"][0]["Values"][0] if response["MetricDataResults"][0]["Values"] else 0}

    def _get_inference_image(self):
        return sagemaker.image_uris.retrieve("huggingface", "us-east-1", "4.37", "pytorch2.1", "inference")

    def _get_role(self):
        return sagemaker.get_execution_role()

    def _minutes_ago(self, n):
        from datetime import datetime, timedelta
        return datetime.utcnow() - timedelta(minutes=n)

    def _now(self):
        from datetime import datetime
        return datetime.utcnow()

Group Discussion: Key Decision Points

Decision Point 1: SageMaker vs Self-Managed ECS Training

Jordan (MLOps): We need to choose between SageMaker managed training and running training jobs on our own ECS Fargate/EC2 cluster.

Factor SageMaker Self-Managed (ECS/EC2)
Setup complexity Low (managed) High (docker, GPU drivers, NCCL)
Cost (intent model, weekly) $0.74/run $0.45/run
Cost (LLM LoRA, quarterly) $390/run $310/run
Scale flexibility Instant Need to maintain idle GPUs or wait for allocation
Experiment tracking Built-in + MLflow MLflow only
Spot instance support Native (70% savings) Manual setup
Multi-GPU Native NCCL Manual config

Marcus (Architect): SageMaker costs 40-60% more per run, but consider the hidden costs of self-managed: maintaining GPU driver versions, CUDA compatibility, NCCL configuration, container images. Plus our team of 4 engineers does not have the bandwidth for GPU cluster management.

Sam (PM): At our scale ($214/month total GPU cost), the overhead delta is ~$60/month for SageMaker. That is less than 1 hour of an engineer's time per month.

Priya (ML Engineer): The killer feature is SageMaker spot instances. For the LLM LoRA training, spot reduces $390 to ~$120. With automatic checkpointing, spot interruptions are handled transparently.

Resolution: SageMaker for all training. The managed overhead cost (<$60/month) is trivially offset by engineering time savings and spot instance support. Self-managed GPU cluster only makes sense at >$5K/month training spend.

Decision Point 2: Training Cadence

Jordan (MLOps): How often should each model retrain?

Priya (ML Engineer): I measured accuracy decay vs training freshness:

Model 1 week stale 2 weeks stale 1 month stale 3 months stale
Intent classifier -0.3% -0.8% -2.1% -4.7%
Sentiment -0.2% -0.5% -1.4% -3.2%
Embedding adapter -0.1% -0.2% -0.4% -1.1%
Reranker -0.1% -0.2% -0.5% -1.3%
LLM LoRA <0.1% -0.1% -0.2% -0.6%

Aiko (Data Scientist): Intent and sentiment decay fastest because customer language patterns evolve rapidly — new manga titles, seasonal vocabulary, trending slang. The embedding adapter and reranker are more stable because they operate on semantic similarity (which changes slowly).

Sam (PM): Our quality SLA allows ≤1% accuracy drop before intervention. That maps to: - Intent: weekly (stays within 0.3% drop) - Sentiment: weekly (0.2% drop) - Embedding/reranker: monthly (0.4-0.5% drop) - LLM LoRA: quarterly (0.2% drop)

Jordan (MLOps): Weekly retrains for intent+sentiment mean we need robust automation — human-in-the-loop is not feasible at that cadence. The quality gate must be trustworthy.

Resolution: Weekly: intent + sentiment. Monthly: embedding + reranker. Quarterly: LLM LoRA. All automated with quality gates. Human review only when the gate rejects a model.

Decision Point 3: Quality Gate Design

Jordan (MLOps): What metrics gate model promotion?

Priya (ML Engineer): Multi-criteria gate:

Criterion Intent Threshold Sentiment Threshold Action on Fail
F1 macro (absolute) ≥ 0.90 ≥ 0.82 Reject
F1 macro (vs production) ≥ -0.01 ≥ -0.01 Reject
Per-class F1 regression No class drops > 3% frustration drops > 2% Reject
Latency P50 ≤ 15ms ≤ 12ms Reject
Latency P99 ≤ 25ms ≤ 20ms Reject

Aiko (Data Scientist): The per-class regression check is critical. A model might improve macro F1 by 0.5% while catastrophically failing on "product_recall" intent (from 88% to 60%). The per-class check catches this.

Marcus (Architect): We also need a "no worse than random" sanity check. If any class drops below 50% accuracy, the run is automatically rejected regardless of other metrics.

Sam (PM): What about false positive escalations? The sentiment model directly triggers human escalation. If a new model false-positives at 2× the rate, our support team drowns.

Jordan (MLOps): We add a "false escalation rate" metric to the sentiment gate. It must not exceed 15% of production's false positive rate. This is measured during the shadow deployment phase.

Resolution: Multi-criteria quality gate with absolute thresholds, regression checks, per-class monitoring, and operational metrics (false escalation rate). If any criterion fails, the model is rejected and the pipeline alerts via SNS → Slack.

Decision Point 4: Mixed Precision Strategy

Priya (ML Engineer): Mixed precision results:

Config Memory Throughput Final F1 Notes
FP32 2.26GB 850 samples/s 0.921 Baseline
FP16 (amp) 1.66GB 1,580 samples/s 0.920 Negligible quality loss
BF16 (A100) 1.66GB 1,650 samples/s 0.921 Best on A100
Pure FP16 (no master weights) 1.44GB 1,620 samples/s 0.908 Quality degradation

Aiko (Data Scientist): Mixed precision (AMP) is essentially free: 1.9× throughput, 27% less memory, 0.001 F1 loss. Pure FP16 without master weights degrades quality because very small gradient updates accumulate rounding errors.

Marcus (Architect): BF16 on A100s avoids the loss scaling dance entirely (BF16 has the same exponent range as FP32, so no overflow). For our T4 training (intent/sentiment), we use FP16 AMP. For A100 training (LLM LoRA), BF16.

Resolution: FP16 AMP for T4 instances (intent/sentiment/reranker). BF16 for A100 instances (LLM LoRA). Always maintain FP32 master weights. No pure FP16 training.


Research Paper References

1. ZeRO: Memory Optimizations Toward Training Trillion Parameter Models (Rajbhandari et al., 2019)

Key contribution: Introduced the three-stage ZeRO optimization that partitions optimizer states (Stage 1), gradients (Stage 2), and parameters (Stage 3) across data-parallel processes. ZeRO-3 achieves perfect memory scaling: memory per GPU is proportional to $1/P$, not constant. This enables training models 8× larger than possible with standard data parallelism.

Relevance to MangaAssist: ZeRO-3 makes our Llama 3 70B LoRA training feasible on 8 A100s. Without ZeRO, we would need model parallelism (splitting the model across GPUs), which adds significant code complexity. ZeRO-2 is sufficient for our smaller models and provides gradient partitioning with minimal communication overhead.

2. Mixed Precision Training (Micikevicius et al., 2017)

Key contribution: Showed that neural networks can be trained in half-precision (FP16) with minimal quality loss by maintaining FP32 master weights and using dynamic loss scaling. The paper demonstrated 2-3× speedup on NVIDIA Volta GPUs with Tensor Cores. The three-component framework (FP16 compute, FP32 master weights, loss scaling) became the standard approach adopted by all major frameworks.

Relevance to MangaAssist: All our training jobs use mixed precision (AMP). For small models (DistilBERT), the primary benefit is 1.9× throughput (training completes in 30 min vs 55 min). For LLM LoRA training, memory savings enable larger batch sizes or longer sequences, directly improving training quality.

3. Accurate, Large Minibatch SGD: Training ImageNet in 1 Hour (Goyal et al., 2017)

Key contribution: Proved the linear scaling rule for learning rates in distributed training: when scaling batch size by $k\times$, scale learning rate by $k\times$ with a gradual warmup. This enables near-linear speedup from 1 to 256 GPUs. The warmup prevents early training instability caused by the large effective learning rate.

Relevance to MangaAssist: When scaling our LLM LoRA training from 4 to 8 A100s, we apply the linear scaling rule with warmup. Without it, the doubled learning rate causes divergence. With it, we achieve 1.85× speedup (92% efficiency).

4. Experiment Tracking and Model Registry: MLflow (Zaharia et al., 2018)

Key contribution: Open-source platform for ML lifecycle management: experiment tracking, model packaging, model registry, and model serving. MLflow's design philosophy of "lightweight, API-first, and framework-agnostic" made it the de facto standard for experiment tracking in industry.

Relevance to MangaAssist: MLflow runs on ECS Fargate alongside our inference services. Every training run logs hyperparameters, metrics, and artifacts. The model registry manages version transitions (staging → production). The comparison API powers our quality gate's regression detection.


Production Evaluation

Training Infrastructure Cost Breakdown

Component Monthly Cost Notes
SageMaker (intent, weekly) $32 4 × ml.g4dn.xlarge spot @ $0.53/hr × 0.5hr
SageMaker (sentiment, weekly) $32 Same as intent
SageMaker (embedding, monthly) $3 ml.g4dn.xlarge spot × 2hr
SageMaker (reranker, monthly) $2.50 ml.g4dn.xlarge spot × 1.5hr
SageMaker (LLM LoRA, quarterly) $40/month avg ml.p4d.24xlarge spot × 48hr / 3 months
MLflow on ECS Fargate $15 0.5 vCPU, 1GB RAM
S3 (training data/artifacts) $5 ~50GB total
CloudWatch (monitoring) $3 Custom metrics + logs
Total ~$133/month

Pipeline Reliability (6 months)

Metric Value
Total training runs 62
Successful runs 58 (93.5%)
Quality gate passes 51 (87.9%)
Failed runs (infra) 4 (spot interruption recovery: 3, OOM: 1)
Average training time (intent) 28 min
Average deployment time (end-to-end) 2.1 hours
Rollbacks triggered 2 (canary metric regression)
Mean time to rollback 8 minutes