LOCAL PREVIEW View on GitHub

PO-02: Intent Classifier Latency Optimization

User Story

As a senior backend engineer, I want to minimize intent classification latency to under 50ms for rule-matched intents and under 150ms for ML-classified intents, So that intent routing does not become a bottleneck in the critical path and the user perceives near-instant understanding of their request.

Acceptance Criteria

  • Rule-based Stage 1 classifies 60-70% of messages in under 10ms.
  • SageMaker ML inference (Stage 2) completes in under 150ms at p95.
  • Entity extraction runs in parallel with classification, not sequentially.
  • Model warm-up eliminates cold-start latency spikes on SageMaker endpoints.
  • A local model cache avoids redundant SageMaker calls for identical messages within a session.

High-Level Design

Latency Profile of Current Two-Stage System

graph LR
    A[User Message] --> B{Stage 1<br>Rule-Based<br>~5ms}
    B -->|High confidence<br>60-70% of traffic| C[Return Intent<br>Total: ~5ms]
    B -->|Low confidence<br>30-40% of traffic| D[Stage 2<br>SageMaker BERT<br>~120ms]
    D --> E{Confidence<br>Check}
    E -->|>= 0.6| F[Return Intent<br>Total: ~125ms]
    E -->|< 0.6| G[Fallback: general_query<br>Total: ~125ms]

    style C fill:#2d8,stroke:#333
    style F fill:#fd2,stroke:#333
    style G fill:#f66,stroke:#333

Optimization Strategy

graph TD
    subgraph "Stage 1 Optimizations"
        A1[Compiled Regex<br>Patterns] --> A2[Aho-Corasick<br>Multi-Pattern Matching]
        A2 --> A3[Intent Pattern<br>Hot-Reload]
    end

    subgraph "Stage 2 Optimizations"
        B1[SageMaker Real-Time<br>Inference] --> B2[Model Compilation<br>with Neo]
        B2 --> B3[Endpoint Warm Pool]
        B3 --> B4[Request Batching<br>for burst traffic]
    end

    subgraph "Cross-Stage"
        C1[Session-Level<br>Intent Cache] --> C2[Parallel Entity<br>Extraction]
        C2 --> C3[Async Metrics<br>Emission]
    end

    style A2 fill:#2d8,stroke:#333
    style B2 fill:#2d8,stroke:#333
    style C1 fill:#2d8,stroke:#333

Low-Level Design

1. High-Performance Rule Engine with Aho-Corasick

Standard regex matching checks patterns one at a time. Aho-Corasick builds a finite state machine over all patterns and matches them in a single pass over the input text.

graph LR
    subgraph "Regex Approach (Before)"
        A[Message] --> B[Pattern 1<br>O(m)]
        A --> C[Pattern 2<br>O(m)]
        A --> D[Pattern 3<br>O(m)]
        A --> E[...<br>Pattern N]
        B --> F[Total: O(N × m)]
    end

    subgraph "Aho-Corasick (After)"
        G[Message] --> H[Single Pass<br>State Machine]
        H --> I[All matches<br>O(m + matches)]
    end

    style F fill:#f66,stroke:#333
    style I fill:#2d8,stroke:#333

Code Example: Aho-Corasick Intent Matcher

import time
from dataclasses import dataclass
from typing import Optional

import ahocorasick


@dataclass
class IntentMatch:
    intent: str
    confidence: float
    entities: dict
    matched_patterns: list[str]
    latency_ms: float


# Pattern definitions: each pattern maps to an intent with a base confidence
INTENT_PATTERNS = {
    # Order tracking patterns
    "where is my order": ("order_tracking", 0.95),
    "track my order": ("order_tracking", 0.95),
    "order status": ("order_tracking", 0.90),
    "when will it arrive": ("order_tracking", 0.85),
    "delivery date": ("order_tracking", 0.85),
    "shipping status": ("order_tracking", 0.90),
    # Return patterns
    "return this": ("return_request", 0.90),
    "want to return": ("return_request", 0.92),
    "return policy": ("faq", 0.90),
    "damaged item": ("return_request", 0.85),
    "wrong item": ("return_request", 0.88),
    "refund": ("return_request", 0.85),
    # Product discovery
    "show me": ("product_discovery", 0.80),
    "what's popular": ("product_discovery", 0.85),
    "best selling": ("product_discovery", 0.85),
    "new releases": ("product_discovery", 0.85),
    "trending manga": ("product_discovery", 0.90),
    # Recommendation
    "something like": ("recommendation", 0.90),
    "similar to": ("recommendation", 0.90),
    "recommend": ("recommendation", 0.85),
    "suggest": ("recommendation", 0.82),
    # FAQ
    "how do i": ("faq", 0.80),
    "what is the policy": ("faq", 0.85),
    "help with": ("faq", 0.75),
    # Promotions
    "any deals": ("promotion", 0.90),
    "discount": ("promotion", 0.85),
    "coupon": ("promotion", 0.90),
    "sale": ("promotion", 0.80),
    # Chitchat
    "hello": ("chitchat", 0.95),
    "hi there": ("chitchat", 0.95),
    "thanks": ("chitchat", 0.95),
    "thank you": ("chitchat", 0.95),
    "goodbye": ("chitchat", 0.95),
    "bye": ("chitchat", 0.95),
    # Escalation
    "talk to a human": ("escalation", 0.98),
    "speak to agent": ("escalation", 0.98),
    "real person": ("escalation", 0.95),
    "customer service": ("escalation", 0.90),
}


class AhoCorasickIntentMatcher:
    """Ultra-fast multi-pattern intent matching using Aho-Corasick automaton."""

    def __init__(self):
        self.automaton = ahocorasick.Automaton()
        self._build_automaton()

    def _build_automaton(self) -> None:
        """Build the Aho-Corasick automaton from intent patterns."""
        for pattern, (intent, confidence) in INTENT_PATTERNS.items():
            self.automaton.add_word(
                pattern.lower(), (pattern, intent, confidence)
            )
        self.automaton.make_automaton()

    def match(self, message: str) -> Optional[IntentMatch]:
        """Match message against all patterns in a single pass."""
        start = time.monotonic()
        normalized = message.lower().strip()

        matches = []
        for end_index, (pattern, intent, confidence) in self.automaton.iter(normalized):
            matches.append((intent, confidence, pattern))

        latency_ms = (time.monotonic() - start) * 1000

        if not matches:
            return None

        # Group by intent and pick the highest confidence
        intent_scores: dict[str, tuple[float, list[str]]] = {}
        for intent, confidence, pattern in matches:
            if intent not in intent_scores:
                intent_scores[intent] = (confidence, [pattern])
            else:
                existing_conf, existing_patterns = intent_scores[intent]
                # Multiple pattern matches for same intent boost confidence
                boosted = min(existing_conf + 0.03, 0.99)
                intent_scores[intent] = (
                    max(boosted, confidence),
                    existing_patterns + [pattern],
                )

        # Select the intent with the highest confidence
        best_intent = max(intent_scores.items(), key=lambda x: x[1][0])
        intent_name = best_intent[0]
        best_confidence, matched_patterns = best_intent[1]

        return IntentMatch(
            intent=intent_name,
            confidence=best_confidence,
            entities={},  # Entity extraction runs separately
            matched_patterns=matched_patterns,
            latency_ms=latency_ms,
        )

    def reload_patterns(self, new_patterns: dict) -> None:
        """Hot-reload patterns without service restart."""
        self.automaton = ahocorasick.Automaton()
        for pattern, (intent, confidence) in new_patterns.items():
            self.automaton.add_word(pattern.lower(), (pattern, intent, confidence))
        self.automaton.make_automaton()

2. SageMaker Endpoint Optimization

graph TD
    subgraph "Model Compilation"
        A[BERT Model<br>PyTorch] --> B[SageMaker Neo<br>Compilation]
        B --> C[Optimized Model<br>TensorRT / ONNX]
        C --> D[2-3x faster<br>inference]
    end

    subgraph "Endpoint Configuration"
        E[Real-Time Endpoint] --> F[Instance: ml.g5.xlarge<br>GPU inference]
        F --> G[Min Instances: 2<br>Always warm]
        G --> H[Auto-scaling:<br>Target p95 < 100ms]
    end

    subgraph "Warm Pool"
        I[Warm Pool Size: 2] --> J[Instances stay<br>initialized]
        J --> K[Scale-up time:<br>seconds vs minutes]
    end

    style D fill:#2d8,stroke:#333
    style H fill:#2d8,stroke:#333
    style K fill:#2d8,stroke:#333

Code Example: Optimized SageMaker Inference Client

import asyncio
import hashlib
import json
import time
from collections import OrderedDict
from dataclasses import dataclass
from typing import Optional

import boto3


@dataclass
class ClassificationResult:
    intent: str
    confidence: float
    entities: dict
    latency_ms: float
    source: str  # "cache", "sagemaker"


class LRUCache:
    """Simple LRU cache for session-level intent caching."""

    def __init__(self, max_size: int = 100):
        self._cache: OrderedDict[str, ClassificationResult] = OrderedDict()
        self._max_size = max_size

    def get(self, key: str) -> Optional[ClassificationResult]:
        if key in self._cache:
            self._cache.move_to_end(key)
            return self._cache[key]
        return None

    def put(self, key: str, value: ClassificationResult) -> None:
        if key in self._cache:
            self._cache.move_to_end(key)
        elif len(self._cache) >= self._max_size:
            self._cache.popitem(last=False)
        self._cache[key] = value


class OptimizedSageMakerClassifier:
    """Low-latency SageMaker inference with caching and connection reuse."""

    def __init__(
        self,
        endpoint_name: str,
        region: str = "us-east-1",
        cache_size: int = 200,
    ):
        self.endpoint_name = endpoint_name
        # Reuse a single boto3 client to avoid connection overhead
        self.sagemaker = boto3.client(
            "sagemaker-runtime",
            region_name=region,
        )
        self._cache = LRUCache(max_size=cache_size)

    def _make_cache_key(self, message: str, session_id: str) -> str:
        """Create a cache key from normalized message."""
        normalized = message.lower().strip()
        raw = f"{session_id}:{normalized}"
        return hashlib.sha256(raw.encode()).hexdigest()[:24]

    async def classify(
        self,
        message: str,
        session_id: str,
    ) -> ClassificationResult:
        """Classify with cache-first strategy."""
        start = time.monotonic()

        # Check session-level cache
        cache_key = self._make_cache_key(message, session_id)
        cached = self._cache.get(cache_key)
        if cached is not None:
            cached.source = "cache"
            cached.latency_ms = (time.monotonic() - start) * 1000
            return cached

        # Invoke SageMaker endpoint
        result = await self._invoke_endpoint(message, start)

        # Cache the result
        self._cache.put(cache_key, result)
        return result

    async def _invoke_endpoint(
        self, message: str, start: float
    ) -> ClassificationResult:
        """Call SageMaker endpoint with connection reuse."""
        payload = json.dumps({"text": message, "return_entities": True})

        response = await asyncio.to_thread(
            self.sagemaker.invoke_endpoint,
            EndpointName=self.endpoint_name,
            ContentType="application/json",
            Body=payload,
            # Enable server-side inference pipeline caching
            InferenceComponentName="intent-classifier-v2",
        )

        body = json.loads(response["Body"].read().decode("utf-8"))
        latency_ms = (time.monotonic() - start) * 1000

        return ClassificationResult(
            intent=body["intent"],
            confidence=body["confidence"],
            entities=body.get("entities", {}),
            latency_ms=latency_ms,
            source="sagemaker",
        )

3. Parallel Entity Extraction

Entity extraction (ASIN, series name, volume number) runs in parallel with intent classification rather than after it.

sequenceDiagram
    participant Orchestrator
    participant RuleEngine as Rule Engine
    participant SageMaker
    participant EntityExtractor

    Orchestrator->>RuleEngine: Classify intent
    Orchestrator->>EntityExtractor: Extract entities (parallel)

    alt Rule engine confident
        RuleEngine-->>Orchestrator: Intent (5ms)
        EntityExtractor-->>Orchestrator: Entities (15ms)
        Note over Orchestrator: Total: ~15ms (parallel)
    else Need ML model
        RuleEngine-->>Orchestrator: Low confidence (5ms)
        Orchestrator->>SageMaker: Classify + extract (combined)
        EntityExtractor-->>Orchestrator: Entities (15ms)
        SageMaker-->>Orchestrator: Intent + entities (120ms)
        Note over Orchestrator: Merge entity results
    end

Code Example: Parallel Entity Extractor

import asyncio
import re
from dataclasses import dataclass


@dataclass
class ExtractedEntities:
    asin: str | None = None
    series_name: str | None = None
    volume_number: str | None = None
    order_id: str | None = None
    genre: str | None = None
    attribute: str | None = None
    latency_ms: float = 0.0


# Pre-compiled regex patterns for entity extraction
ASIN_PATTERN = re.compile(r"\b(B[0-9A-Z]{9})\b")
ORDER_ID_PATTERN = re.compile(r"\b(\d{3}-\d{7}-\d{7})\b")
VOLUME_PATTERN = re.compile(
    r"(?:vol(?:ume)?\.?\s*|#\s*)(\d{1,3})", re.IGNORECASE
)

MANGA_GENRES = {
    "shonen", "shounen", "shojo", "shoujo", "seinen", "josei",
    "isekai", "horror", "romance", "action", "comedy", "drama",
    "fantasy", "sci-fi", "slice of life", "sports", "thriller",
    "mystery", "supernatural", "mecha",
}

PRODUCT_ATTRIBUTES = {
    "language", "pages", "format", "author", "publisher", "price",
    "availability", "edition", "isbn", "release date", "rating",
    "dimensions", "weight", "paperback", "hardcover", "kindle",
}


class FastEntityExtractor:
    """Regex-based entity extraction that runs in parallel with intent classification."""

    def __init__(self, known_series: set[str] | None = None):
        self.known_series = known_series or set()

    async def extract(self, message: str) -> ExtractedEntities:
        """Extract all entities in parallel using compiled patterns."""
        import time
        start = time.monotonic()

        # Run all extractions concurrently
        results = await asyncio.gather(
            asyncio.to_thread(self._extract_asin, message),
            asyncio.to_thread(self._extract_order_id, message),
            asyncio.to_thread(self._extract_volume, message),
            asyncio.to_thread(self._extract_genre, message),
            asyncio.to_thread(self._extract_attribute, message),
            asyncio.to_thread(self._extract_series_name, message),
        )

        latency_ms = (time.monotonic() - start) * 1000

        return ExtractedEntities(
            asin=results[0],
            order_id=results[1],
            volume_number=results[2],
            genre=results[3],
            attribute=results[4],
            series_name=results[5],
            latency_ms=latency_ms,
        )

    def _extract_asin(self, message: str) -> str | None:
        match = ASIN_PATTERN.search(message)
        return match.group(1) if match else None

    def _extract_order_id(self, message: str) -> str | None:
        match = ORDER_ID_PATTERN.search(message)
        return match.group(1) if match else None

    def _extract_volume(self, message: str) -> str | None:
        match = VOLUME_PATTERN.search(message)
        return match.group(1) if match else None

    def _extract_genre(self, message: str) -> str | None:
        lower = message.lower()
        for genre in MANGA_GENRES:
            if genre in lower:
                return genre
        return None

    def _extract_attribute(self, message: str) -> str | None:
        lower = message.lower()
        for attr in PRODUCT_ATTRIBUTES:
            if attr in lower:
                return attr
        return None

    def _extract_series_name(self, message: str) -> str | None:
        """Match against known manga series names."""
        lower = message.lower()
        best_match = None
        best_len = 0
        for series in self.known_series:
            if series.lower() in lower and len(series) > best_len:
                best_match = series
                best_len = len(series)
        return best_match

4. Combined Two-Stage Classifier with All Optimizations

graph TD
    A[User Message] --> B[Parallel Start]

    B --> C[Aho-Corasick<br>Rule Matcher<br>~2ms]
    B --> D[Entity Extractor<br>~10ms]

    C --> E{Confidence >= 0.8?}
    E -->|Yes 60-70%| F[Merge: Intent + Entities]
    E -->|No 30-40%| G{Session Cache<br>Hit?}
    G -->|Yes ~10%| F
    G -->|No| H[SageMaker<br>BERT Classifier<br>~100ms]
    H --> I{Confidence >= 0.6?}
    I -->|Yes| F
    I -->|No| J[Fallback: general_query]
    J --> F
    D --> F

    F --> K[Return ClassificationResult]

    style C fill:#2d8,stroke:#333
    style D fill:#2d8,stroke:#333
    style F fill:#2d8,stroke:#333

Code Example: Unified Intent Classification Service

import asyncio
import time
from dataclasses import dataclass
from typing import Optional


@dataclass
class FullClassificationResult:
    intent: str
    confidence: float
    entities: dict
    source: str  # "rules", "cache", "sagemaker", "fallback"
    latency_ms: float


RULE_CONFIDENCE_THRESHOLD = 0.80
ML_CONFIDENCE_THRESHOLD = 0.60


class IntentClassificationService:
    """Two-stage intent classifier with all performance optimizations."""

    def __init__(
        self,
        rule_matcher: "AhoCorasickIntentMatcher",
        ml_classifier: "OptimizedSageMakerClassifier",
        entity_extractor: "FastEntityExtractor",
    ):
        self.rules = rule_matcher
        self.ml = ml_classifier
        self.entities_extractor = entity_extractor

    async def classify(
        self,
        message: str,
        session_id: str,
    ) -> FullClassificationResult:
        """Classify with parallel entity extraction and two-stage intent detection."""
        start = time.monotonic()

        # Run rule matching and entity extraction in parallel
        rule_task = asyncio.to_thread(self.rules.match, message)
        entity_task = self.entities_extractor.extract(message)

        rule_result, entities = await asyncio.gather(rule_task, entity_task)

        # Stage 1: Check rule-based match
        if rule_result and rule_result.confidence >= RULE_CONFIDENCE_THRESHOLD:
            latency = (time.monotonic() - start) * 1000
            return FullClassificationResult(
                intent=rule_result.intent,
                confidence=rule_result.confidence,
                entities=self._merge_entities(rule_result.entities, entities),
                source="rules",
                latency_ms=latency,
            )

        # Stage 2: ML model (with session cache)
        ml_result = await self.ml.classify(message, session_id)

        if ml_result.confidence >= ML_CONFIDENCE_THRESHOLD:
            latency = (time.monotonic() - start) * 1000
            return FullClassificationResult(
                intent=ml_result.intent,
                confidence=ml_result.confidence,
                entities=self._merge_entities(ml_result.entities, entities),
                source=ml_result.source,
                latency_ms=latency,
            )

        # Fallback
        latency = (time.monotonic() - start) * 1000
        return FullClassificationResult(
            intent="general_query",
            confidence=0.0,
            entities=self._merge_entities({}, entities),
            source="fallback",
            latency_ms=latency,
        )

    def _merge_entities(
        self,
        intent_entities: dict,
        extracted: "ExtractedEntities",
    ) -> dict:
        """Merge entities from intent classification and regex extraction."""
        merged = {}

        # Regex-extracted entities take precedence for structured fields
        if extracted.asin:
            merged["asin"] = extracted.asin
        if extracted.series_name:
            merged["series_name"] = extracted.series_name
        if extracted.volume_number:
            merged["volume_number"] = extracted.volume_number
        if extracted.order_id:
            merged["order_id"] = extracted.order_id
        if extracted.genre:
            merged["genre"] = extracted.genre
        if extracted.attribute:
            merged["attribute"] = extracted.attribute

        # ML-extracted entities fill gaps
        for key, value in intent_entities.items():
            if key not in merged and value is not None:
                merged[key] = value

        return merged

Metrics and Monitoring

Metric Target Alarm
intent.rule_match_latency_ms p95 < 10ms p95 > 20ms
intent.ml_inference_latency_ms p95 < 150ms p95 > 200ms
intent.total_latency_ms p95 < 50ms (weighted) p95 > 100ms
intent.rule_hit_rate > 60% < 50%
intent.cache_hit_rate > 10% < 5%
intent.fallback_rate < 5% > 10%
entity.extraction_latency_ms p95 < 15ms p95 > 30ms
graph LR
    subgraph "Latency Distribution Target"
        A[Rules: 60-70%<br>~5ms] --> D[Weighted Average<br>~35ms]
        B[Cache: ~10%<br>~1ms] --> D
        C[SageMaker: 20-30%<br>~120ms] --> D
    end