PO-02: Intent Classifier Latency Optimization
User Story
As a senior backend engineer, I want to minimize intent classification latency to under 50ms for rule-matched intents and under 150ms for ML-classified intents, So that intent routing does not become a bottleneck in the critical path and the user perceives near-instant understanding of their request.
Acceptance Criteria
- Rule-based Stage 1 classifies 60-70% of messages in under 10ms.
- SageMaker ML inference (Stage 2) completes in under 150ms at p95.
- Entity extraction runs in parallel with classification, not sequentially.
- Model warm-up eliminates cold-start latency spikes on SageMaker endpoints.
- A local model cache avoids redundant SageMaker calls for identical messages within a session.
High-Level Design
Latency Profile of Current Two-Stage System
graph LR
A[User Message] --> B{Stage 1<br>Rule-Based<br>~5ms}
B -->|High confidence<br>60-70% of traffic| C[Return Intent<br>Total: ~5ms]
B -->|Low confidence<br>30-40% of traffic| D[Stage 2<br>SageMaker BERT<br>~120ms]
D --> E{Confidence<br>Check}
E -->|>= 0.6| F[Return Intent<br>Total: ~125ms]
E -->|< 0.6| G[Fallback: general_query<br>Total: ~125ms]
style C fill:#2d8,stroke:#333
style F fill:#fd2,stroke:#333
style G fill:#f66,stroke:#333
Optimization Strategy
graph TD
subgraph "Stage 1 Optimizations"
A1[Compiled Regex<br>Patterns] --> A2[Aho-Corasick<br>Multi-Pattern Matching]
A2 --> A3[Intent Pattern<br>Hot-Reload]
end
subgraph "Stage 2 Optimizations"
B1[SageMaker Real-Time<br>Inference] --> B2[Model Compilation<br>with Neo]
B2 --> B3[Endpoint Warm Pool]
B3 --> B4[Request Batching<br>for burst traffic]
end
subgraph "Cross-Stage"
C1[Session-Level<br>Intent Cache] --> C2[Parallel Entity<br>Extraction]
C2 --> C3[Async Metrics<br>Emission]
end
style A2 fill:#2d8,stroke:#333
style B2 fill:#2d8,stroke:#333
style C1 fill:#2d8,stroke:#333
Low-Level Design
1. High-Performance Rule Engine with Aho-Corasick
Standard regex matching checks patterns one at a time. Aho-Corasick builds a finite state machine over all patterns and matches them in a single pass over the input text.
graph LR
subgraph "Regex Approach (Before)"
A[Message] --> B[Pattern 1<br>O(m)]
A --> C[Pattern 2<br>O(m)]
A --> D[Pattern 3<br>O(m)]
A --> E[...<br>Pattern N]
B --> F[Total: O(N × m)]
end
subgraph "Aho-Corasick (After)"
G[Message] --> H[Single Pass<br>State Machine]
H --> I[All matches<br>O(m + matches)]
end
style F fill:#f66,stroke:#333
style I fill:#2d8,stroke:#333
Code Example: Aho-Corasick Intent Matcher
import time
from dataclasses import dataclass
from typing import Optional
import ahocorasick
@dataclass
class IntentMatch:
intent: str
confidence: float
entities: dict
matched_patterns: list[str]
latency_ms: float
# Pattern definitions: each pattern maps to an intent with a base confidence
INTENT_PATTERNS = {
# Order tracking patterns
"where is my order": ("order_tracking", 0.95),
"track my order": ("order_tracking", 0.95),
"order status": ("order_tracking", 0.90),
"when will it arrive": ("order_tracking", 0.85),
"delivery date": ("order_tracking", 0.85),
"shipping status": ("order_tracking", 0.90),
# Return patterns
"return this": ("return_request", 0.90),
"want to return": ("return_request", 0.92),
"return policy": ("faq", 0.90),
"damaged item": ("return_request", 0.85),
"wrong item": ("return_request", 0.88),
"refund": ("return_request", 0.85),
# Product discovery
"show me": ("product_discovery", 0.80),
"what's popular": ("product_discovery", 0.85),
"best selling": ("product_discovery", 0.85),
"new releases": ("product_discovery", 0.85),
"trending manga": ("product_discovery", 0.90),
# Recommendation
"something like": ("recommendation", 0.90),
"similar to": ("recommendation", 0.90),
"recommend": ("recommendation", 0.85),
"suggest": ("recommendation", 0.82),
# FAQ
"how do i": ("faq", 0.80),
"what is the policy": ("faq", 0.85),
"help with": ("faq", 0.75),
# Promotions
"any deals": ("promotion", 0.90),
"discount": ("promotion", 0.85),
"coupon": ("promotion", 0.90),
"sale": ("promotion", 0.80),
# Chitchat
"hello": ("chitchat", 0.95),
"hi there": ("chitchat", 0.95),
"thanks": ("chitchat", 0.95),
"thank you": ("chitchat", 0.95),
"goodbye": ("chitchat", 0.95),
"bye": ("chitchat", 0.95),
# Escalation
"talk to a human": ("escalation", 0.98),
"speak to agent": ("escalation", 0.98),
"real person": ("escalation", 0.95),
"customer service": ("escalation", 0.90),
}
class AhoCorasickIntentMatcher:
"""Ultra-fast multi-pattern intent matching using Aho-Corasick automaton."""
def __init__(self):
self.automaton = ahocorasick.Automaton()
self._build_automaton()
def _build_automaton(self) -> None:
"""Build the Aho-Corasick automaton from intent patterns."""
for pattern, (intent, confidence) in INTENT_PATTERNS.items():
self.automaton.add_word(
pattern.lower(), (pattern, intent, confidence)
)
self.automaton.make_automaton()
def match(self, message: str) -> Optional[IntentMatch]:
"""Match message against all patterns in a single pass."""
start = time.monotonic()
normalized = message.lower().strip()
matches = []
for end_index, (pattern, intent, confidence) in self.automaton.iter(normalized):
matches.append((intent, confidence, pattern))
latency_ms = (time.monotonic() - start) * 1000
if not matches:
return None
# Group by intent and pick the highest confidence
intent_scores: dict[str, tuple[float, list[str]]] = {}
for intent, confidence, pattern in matches:
if intent not in intent_scores:
intent_scores[intent] = (confidence, [pattern])
else:
existing_conf, existing_patterns = intent_scores[intent]
# Multiple pattern matches for same intent boost confidence
boosted = min(existing_conf + 0.03, 0.99)
intent_scores[intent] = (
max(boosted, confidence),
existing_patterns + [pattern],
)
# Select the intent with the highest confidence
best_intent = max(intent_scores.items(), key=lambda x: x[1][0])
intent_name = best_intent[0]
best_confidence, matched_patterns = best_intent[1]
return IntentMatch(
intent=intent_name,
confidence=best_confidence,
entities={}, # Entity extraction runs separately
matched_patterns=matched_patterns,
latency_ms=latency_ms,
)
def reload_patterns(self, new_patterns: dict) -> None:
"""Hot-reload patterns without service restart."""
self.automaton = ahocorasick.Automaton()
for pattern, (intent, confidence) in new_patterns.items():
self.automaton.add_word(pattern.lower(), (pattern, intent, confidence))
self.automaton.make_automaton()
2. SageMaker Endpoint Optimization
graph TD
subgraph "Model Compilation"
A[BERT Model<br>PyTorch] --> B[SageMaker Neo<br>Compilation]
B --> C[Optimized Model<br>TensorRT / ONNX]
C --> D[2-3x faster<br>inference]
end
subgraph "Endpoint Configuration"
E[Real-Time Endpoint] --> F[Instance: ml.g5.xlarge<br>GPU inference]
F --> G[Min Instances: 2<br>Always warm]
G --> H[Auto-scaling:<br>Target p95 < 100ms]
end
subgraph "Warm Pool"
I[Warm Pool Size: 2] --> J[Instances stay<br>initialized]
J --> K[Scale-up time:<br>seconds vs minutes]
end
style D fill:#2d8,stroke:#333
style H fill:#2d8,stroke:#333
style K fill:#2d8,stroke:#333
Code Example: Optimized SageMaker Inference Client
import asyncio
import hashlib
import json
import time
from collections import OrderedDict
from dataclasses import dataclass
from typing import Optional
import boto3
@dataclass
class ClassificationResult:
intent: str
confidence: float
entities: dict
latency_ms: float
source: str # "cache", "sagemaker"
class LRUCache:
"""Simple LRU cache for session-level intent caching."""
def __init__(self, max_size: int = 100):
self._cache: OrderedDict[str, ClassificationResult] = OrderedDict()
self._max_size = max_size
def get(self, key: str) -> Optional[ClassificationResult]:
if key in self._cache:
self._cache.move_to_end(key)
return self._cache[key]
return None
def put(self, key: str, value: ClassificationResult) -> None:
if key in self._cache:
self._cache.move_to_end(key)
elif len(self._cache) >= self._max_size:
self._cache.popitem(last=False)
self._cache[key] = value
class OptimizedSageMakerClassifier:
"""Low-latency SageMaker inference with caching and connection reuse."""
def __init__(
self,
endpoint_name: str,
region: str = "us-east-1",
cache_size: int = 200,
):
self.endpoint_name = endpoint_name
# Reuse a single boto3 client to avoid connection overhead
self.sagemaker = boto3.client(
"sagemaker-runtime",
region_name=region,
)
self._cache = LRUCache(max_size=cache_size)
def _make_cache_key(self, message: str, session_id: str) -> str:
"""Create a cache key from normalized message."""
normalized = message.lower().strip()
raw = f"{session_id}:{normalized}"
return hashlib.sha256(raw.encode()).hexdigest()[:24]
async def classify(
self,
message: str,
session_id: str,
) -> ClassificationResult:
"""Classify with cache-first strategy."""
start = time.monotonic()
# Check session-level cache
cache_key = self._make_cache_key(message, session_id)
cached = self._cache.get(cache_key)
if cached is not None:
cached.source = "cache"
cached.latency_ms = (time.monotonic() - start) * 1000
return cached
# Invoke SageMaker endpoint
result = await self._invoke_endpoint(message, start)
# Cache the result
self._cache.put(cache_key, result)
return result
async def _invoke_endpoint(
self, message: str, start: float
) -> ClassificationResult:
"""Call SageMaker endpoint with connection reuse."""
payload = json.dumps({"text": message, "return_entities": True})
response = await asyncio.to_thread(
self.sagemaker.invoke_endpoint,
EndpointName=self.endpoint_name,
ContentType="application/json",
Body=payload,
# Enable server-side inference pipeline caching
InferenceComponentName="intent-classifier-v2",
)
body = json.loads(response["Body"].read().decode("utf-8"))
latency_ms = (time.monotonic() - start) * 1000
return ClassificationResult(
intent=body["intent"],
confidence=body["confidence"],
entities=body.get("entities", {}),
latency_ms=latency_ms,
source="sagemaker",
)
3. Parallel Entity Extraction
Entity extraction (ASIN, series name, volume number) runs in parallel with intent classification rather than after it.
sequenceDiagram
participant Orchestrator
participant RuleEngine as Rule Engine
participant SageMaker
participant EntityExtractor
Orchestrator->>RuleEngine: Classify intent
Orchestrator->>EntityExtractor: Extract entities (parallel)
alt Rule engine confident
RuleEngine-->>Orchestrator: Intent (5ms)
EntityExtractor-->>Orchestrator: Entities (15ms)
Note over Orchestrator: Total: ~15ms (parallel)
else Need ML model
RuleEngine-->>Orchestrator: Low confidence (5ms)
Orchestrator->>SageMaker: Classify + extract (combined)
EntityExtractor-->>Orchestrator: Entities (15ms)
SageMaker-->>Orchestrator: Intent + entities (120ms)
Note over Orchestrator: Merge entity results
end
Code Example: Parallel Entity Extractor
import asyncio
import re
from dataclasses import dataclass
@dataclass
class ExtractedEntities:
asin: str | None = None
series_name: str | None = None
volume_number: str | None = None
order_id: str | None = None
genre: str | None = None
attribute: str | None = None
latency_ms: float = 0.0
# Pre-compiled regex patterns for entity extraction
ASIN_PATTERN = re.compile(r"\b(B[0-9A-Z]{9})\b")
ORDER_ID_PATTERN = re.compile(r"\b(\d{3}-\d{7}-\d{7})\b")
VOLUME_PATTERN = re.compile(
r"(?:vol(?:ume)?\.?\s*|#\s*)(\d{1,3})", re.IGNORECASE
)
MANGA_GENRES = {
"shonen", "shounen", "shojo", "shoujo", "seinen", "josei",
"isekai", "horror", "romance", "action", "comedy", "drama",
"fantasy", "sci-fi", "slice of life", "sports", "thriller",
"mystery", "supernatural", "mecha",
}
PRODUCT_ATTRIBUTES = {
"language", "pages", "format", "author", "publisher", "price",
"availability", "edition", "isbn", "release date", "rating",
"dimensions", "weight", "paperback", "hardcover", "kindle",
}
class FastEntityExtractor:
"""Regex-based entity extraction that runs in parallel with intent classification."""
def __init__(self, known_series: set[str] | None = None):
self.known_series = known_series or set()
async def extract(self, message: str) -> ExtractedEntities:
"""Extract all entities in parallel using compiled patterns."""
import time
start = time.monotonic()
# Run all extractions concurrently
results = await asyncio.gather(
asyncio.to_thread(self._extract_asin, message),
asyncio.to_thread(self._extract_order_id, message),
asyncio.to_thread(self._extract_volume, message),
asyncio.to_thread(self._extract_genre, message),
asyncio.to_thread(self._extract_attribute, message),
asyncio.to_thread(self._extract_series_name, message),
)
latency_ms = (time.monotonic() - start) * 1000
return ExtractedEntities(
asin=results[0],
order_id=results[1],
volume_number=results[2],
genre=results[3],
attribute=results[4],
series_name=results[5],
latency_ms=latency_ms,
)
def _extract_asin(self, message: str) -> str | None:
match = ASIN_PATTERN.search(message)
return match.group(1) if match else None
def _extract_order_id(self, message: str) -> str | None:
match = ORDER_ID_PATTERN.search(message)
return match.group(1) if match else None
def _extract_volume(self, message: str) -> str | None:
match = VOLUME_PATTERN.search(message)
return match.group(1) if match else None
def _extract_genre(self, message: str) -> str | None:
lower = message.lower()
for genre in MANGA_GENRES:
if genre in lower:
return genre
return None
def _extract_attribute(self, message: str) -> str | None:
lower = message.lower()
for attr in PRODUCT_ATTRIBUTES:
if attr in lower:
return attr
return None
def _extract_series_name(self, message: str) -> str | None:
"""Match against known manga series names."""
lower = message.lower()
best_match = None
best_len = 0
for series in self.known_series:
if series.lower() in lower and len(series) > best_len:
best_match = series
best_len = len(series)
return best_match
4. Combined Two-Stage Classifier with All Optimizations
graph TD
A[User Message] --> B[Parallel Start]
B --> C[Aho-Corasick<br>Rule Matcher<br>~2ms]
B --> D[Entity Extractor<br>~10ms]
C --> E{Confidence >= 0.8?}
E -->|Yes 60-70%| F[Merge: Intent + Entities]
E -->|No 30-40%| G{Session Cache<br>Hit?}
G -->|Yes ~10%| F
G -->|No| H[SageMaker<br>BERT Classifier<br>~100ms]
H --> I{Confidence >= 0.6?}
I -->|Yes| F
I -->|No| J[Fallback: general_query]
J --> F
D --> F
F --> K[Return ClassificationResult]
style C fill:#2d8,stroke:#333
style D fill:#2d8,stroke:#333
style F fill:#2d8,stroke:#333
Code Example: Unified Intent Classification Service
import asyncio
import time
from dataclasses import dataclass
from typing import Optional
@dataclass
class FullClassificationResult:
intent: str
confidence: float
entities: dict
source: str # "rules", "cache", "sagemaker", "fallback"
latency_ms: float
RULE_CONFIDENCE_THRESHOLD = 0.80
ML_CONFIDENCE_THRESHOLD = 0.60
class IntentClassificationService:
"""Two-stage intent classifier with all performance optimizations."""
def __init__(
self,
rule_matcher: "AhoCorasickIntentMatcher",
ml_classifier: "OptimizedSageMakerClassifier",
entity_extractor: "FastEntityExtractor",
):
self.rules = rule_matcher
self.ml = ml_classifier
self.entities_extractor = entity_extractor
async def classify(
self,
message: str,
session_id: str,
) -> FullClassificationResult:
"""Classify with parallel entity extraction and two-stage intent detection."""
start = time.monotonic()
# Run rule matching and entity extraction in parallel
rule_task = asyncio.to_thread(self.rules.match, message)
entity_task = self.entities_extractor.extract(message)
rule_result, entities = await asyncio.gather(rule_task, entity_task)
# Stage 1: Check rule-based match
if rule_result and rule_result.confidence >= RULE_CONFIDENCE_THRESHOLD:
latency = (time.monotonic() - start) * 1000
return FullClassificationResult(
intent=rule_result.intent,
confidence=rule_result.confidence,
entities=self._merge_entities(rule_result.entities, entities),
source="rules",
latency_ms=latency,
)
# Stage 2: ML model (with session cache)
ml_result = await self.ml.classify(message, session_id)
if ml_result.confidence >= ML_CONFIDENCE_THRESHOLD:
latency = (time.monotonic() - start) * 1000
return FullClassificationResult(
intent=ml_result.intent,
confidence=ml_result.confidence,
entities=self._merge_entities(ml_result.entities, entities),
source=ml_result.source,
latency_ms=latency,
)
# Fallback
latency = (time.monotonic() - start) * 1000
return FullClassificationResult(
intent="general_query",
confidence=0.0,
entities=self._merge_entities({}, entities),
source="fallback",
latency_ms=latency,
)
def _merge_entities(
self,
intent_entities: dict,
extracted: "ExtractedEntities",
) -> dict:
"""Merge entities from intent classification and regex extraction."""
merged = {}
# Regex-extracted entities take precedence for structured fields
if extracted.asin:
merged["asin"] = extracted.asin
if extracted.series_name:
merged["series_name"] = extracted.series_name
if extracted.volume_number:
merged["volume_number"] = extracted.volume_number
if extracted.order_id:
merged["order_id"] = extracted.order_id
if extracted.genre:
merged["genre"] = extracted.genre
if extracted.attribute:
merged["attribute"] = extracted.attribute
# ML-extracted entities fill gaps
for key, value in intent_entities.items():
if key not in merged and value is not None:
merged[key] = value
return merged
Metrics and Monitoring
| Metric | Target | Alarm |
|---|---|---|
intent.rule_match_latency_ms |
p95 < 10ms | p95 > 20ms |
intent.ml_inference_latency_ms |
p95 < 150ms | p95 > 200ms |
intent.total_latency_ms |
p95 < 50ms (weighted) | p95 > 100ms |
intent.rule_hit_rate |
> 60% | < 50% |
intent.cache_hit_rate |
> 10% | < 5% |
intent.fallback_rate |
< 5% | > 10% |
entity.extraction_latency_ms |
p95 < 15ms | p95 > 30ms |
graph LR
subgraph "Latency Distribution Target"
A[Rules: 60-70%<br>~5ms] --> D[Weighted Average<br>~35ms]
B[Cache: ~10%<br>~1ms] --> D
C[SageMaker: 20-30%<br>~120ms] --> D
end