Feedback Loops & Human Augmentation Patterns for Collaborative AI
MangaAssist context: JP Manga store chatbot on AWS — Bedrock Claude 3 (Sonnet at $3/$15 per 1M tokens input/output, Haiku at $0.25/$1.25), OpenSearch Serverless (vector store), DynamoDB (sessions/products), ECS Fargate (orchestrator), API Gateway WebSocket, ElastiCache Redis. Target: useful answer in under 3 seconds, 1M messages/day scale.
Skill Mapping
| Certification | Domain | Task | Skill | This File |
|---|---|---|---|---|
| AWS AIP-C01 | Domain 2 — Implementation & Integration | Task 2.1 — Select and implement FM integration approaches | Skill 2.1.5 — Design collaborative AI systems with human-in-the-loop | Feedback loops, active learning, human knowledge injection, curator override mechanisms |
Skill scope: Deep-dive into feedback augmentation patterns — how MangaAssist collects, processes, and applies human feedback to continuously improve FM responses. Covers feedback loop architecture, active learning for reviewer prioritization, curator knowledge injection into prompts and RAG, and override mechanisms that let human experts correct AI behavior in real-time.
Mind Map: Feedback & Augmentation Patterns
mindmap
root((Feedback &<br/>Augmentation<br/>Patterns))
Feedback Loop Architecture
Collection Layer
Binary thumbs up/down
Multi-dimension ratings
Free-text corrections
Implicit signals (dwell time, CTR)
Processing Pipeline
Signal aggregation
Noise filtering
Batch vs real-time processing
Feedback-to-label conversion
Application Layer
Prompt refinement
RAG index updates
Cache invalidation
Weight adjustment
Active Learning
Uncertainty Sampling
Low-confidence responses
High-disagreement ensemble results
Near-threshold classifications
Query Strategy
Pool-based sampling
Stream-based selection
Priority queue for reviewers
Reviewer Routing
Expertise matching
Workload balancing
SLA-based escalation
Human Knowledge Injection
Curator Expertise
Genre taxonomy enrichment
Author relationship graphs
Cultural context annotations
Seasonal trend tagging
Prompt Enrichment
Dynamic system prompt updates
Few-shot example injection
Domain glossary maintenance
RAG Augmentation
Expert-curated documents
Verified ground truth entries
Override knowledge base
Override Mechanisms
Real-Time Overrides
Response replacement
Cache entry correction
Routing rule adjustment
Batch Overrides
Prompt template updates
Knowledge base refresh
Model weight recalibration
Safety Overrides
Content blocking rules
Emergency response templates
Kill switch for features
1. Feedback Loop Architecture
1.1 End-to-End Feedback Flow
graph TD
subgraph "Collection"
U[Customer] -->|thumbs up/down| FB1[Binary Feedback]
U -->|correction text| FB2[Text Correction]
U -->|click/no-click| FB3[Implicit Signal]
CUR[Curator] -->|quality rating| FB4[Expert Rating]
end
subgraph "Ingestion"
FB1 --> KIN[Kinesis Data Stream<br/>mangaassist-feedback]
FB2 --> KIN
FB3 --> KIN
FB4 --> KIN
end
subgraph "Processing"
KIN --> LAM[Lambda: Feedback Processor]
LAM --> AGG[Aggregate Signals<br/>per response_id]
AGG --> STORE[(DynamoDB<br/>feedback_signals)]
AGG --> CW[CloudWatch Metrics]
end
subgraph "Application"
STORE --> PROMPT[Prompt Refinement<br/>weekly batch]
STORE --> RAG_UP[RAG Index Update<br/>daily batch]
STORE --> CACHE_INV[Cache Invalidation<br/>real-time]
STORE --> WEIGHT[Model Weight Update<br/>EMA — real-time]
end
style KIN fill:#264653,stroke:#2a9d8f,color:#fff
style LAM fill:#2a9d8f,stroke:#264653,color:#fff
style STORE fill:#e9c46a,stroke:#f4a261,color:#000
1.2 Feedback Signal Types
| Signal | Source | Latency | Volume | Reliability | MangaAssist Use |
|---|---|---|---|---|---|
| Thumbs up/down | Customer UI | Real-time | ~5% of queries | Medium (noisy) | Quick quality pulse, cache invalidation |
| Star rating | Post-interaction survey | 5-30 min delay | ~1% of queries | High | Weekly quality reports |
| Text correction | Customer or curator | Real-time | ~0.5% of queries | Very high | Ground truth for active learning |
| Click-through | Frontend analytics | Real-time | 100% of recommendations | Medium (positional bias) | Recommendation quality signal |
| Dwell time | Frontend analytics | Real-time | 100% of queries | Low (confounded) | Engagement proxy |
| Curator rating | Internal review tool | 1-24 hr delay | ~2% of queries (sampled) | Very high | Model evaluation, prompt tuning |
1.3 Feedback Collection Implementation
"""
MangaAssist Feedback Collection and Processing Pipeline.
Collects multi-signal feedback, normalizes into a unified schema,
and routes to Kinesis for downstream processing.
"""
import json
import logging
import time
import uuid
from dataclasses import dataclass, field, asdict
from enum import Enum
from typing import Optional
import boto3
logger = logging.getLogger(__name__)
class FeedbackType(str, Enum):
THUMBS_UP = "thumbs_up"
THUMBS_DOWN = "thumbs_down"
STAR_RATING = "star_rating"
TEXT_CORRECTION = "text_correction"
CLICK_THROUGH = "click_through"
CURATOR_RATING = "curator_rating"
@dataclass
class FeedbackSignal:
"""Unified feedback signal schema."""
feedback_id: str = field(default_factory=lambda: str(uuid.uuid4()))
session_id: str = ""
response_id: str = ""
query_text: str = ""
response_text: str = ""
feedback_type: str = ""
# Binary: 1.0 = positive, 0.0 = negative
sentiment_score: float = 0.5
# Optional detailed ratings (0.0 - 1.0)
accuracy_rating: Optional[float] = None
helpfulness_rating: Optional[float] = None
tone_rating: Optional[float] = None
# Text correction if provided
correction_text: Optional[str] = None
# Metadata
source: str = "customer" # "customer" or "curator"
language: str = "en"
timestamp: int = field(default_factory=lambda: int(time.time()))
model_id: str = ""
intent: str = ""
class FeedbackCollector:
"""
Collects feedback from multiple sources and publishes
to Kinesis for downstream processing.
"""
def __init__(
self,
kinesis_client=None,
stream_name: str = "mangaassist-feedback",
dynamodb_resource=None,
table_name: str = "mangaassist_feedback",
):
self.kinesis = kinesis_client or boto3.client(
"kinesis", region_name="ap-northeast-1"
)
self.stream_name = stream_name
self.dynamodb = dynamodb_resource or boto3.resource(
"dynamodb", region_name="ap-northeast-1"
)
self.table = self.dynamodb.Table(table_name)
def collect_thumbs(
self,
session_id: str,
response_id: str,
is_positive: bool,
query_text: str = "",
response_text: str = "",
model_id: str = "",
intent: str = "",
language: str = "en",
) -> FeedbackSignal:
"""Collect binary thumbs up/down feedback."""
signal = FeedbackSignal(
session_id=session_id,
response_id=response_id,
query_text=query_text,
response_text=response_text,
feedback_type=FeedbackType.THUMBS_UP if is_positive else FeedbackType.THUMBS_DOWN,
sentiment_score=1.0 if is_positive else 0.0,
source="customer",
language=language,
model_id=model_id,
intent=intent,
)
self._publish(signal)
return signal
def collect_curator_rating(
self,
response_id: str,
curator_id: str,
accuracy: float,
helpfulness: float,
tone: float,
correction_text: Optional[str] = None,
query_text: str = "",
response_text: str = "",
) -> FeedbackSignal:
"""Collect detailed curator review."""
overall = (accuracy + helpfulness + tone) / 3.0
signal = FeedbackSignal(
response_id=response_id,
query_text=query_text,
response_text=response_text,
feedback_type=FeedbackType.CURATOR_RATING,
sentiment_score=overall,
accuracy_rating=accuracy,
helpfulness_rating=helpfulness,
tone_rating=tone,
correction_text=correction_text,
source=f"curator:{curator_id}",
)
self._publish(signal)
return signal
def collect_text_correction(
self,
response_id: str,
correction_text: str,
query_text: str = "",
original_response: str = "",
source: str = "customer",
) -> FeedbackSignal:
"""Collect a text correction (ground truth)."""
signal = FeedbackSignal(
response_id=response_id,
query_text=query_text,
response_text=original_response,
feedback_type=FeedbackType.TEXT_CORRECTION,
sentiment_score=0.0, # Correction implies original was wrong
correction_text=correction_text,
source=source,
)
self._publish(signal)
return signal
def _publish(self, signal: FeedbackSignal) -> None:
"""Publish feedback signal to Kinesis and DynamoDB."""
record = asdict(signal)
# Kinesis for real-time processing
self.kinesis.put_record(
StreamName=self.stream_name,
Data=json.dumps(record),
PartitionKey=signal.response_id or signal.feedback_id,
)
# DynamoDB for persistence and querying
self.table.put_item(Item={
k: v for k, v in record.items()
if v is not None
})
logger.info(
"Feedback collected: type=%s response_id=%s sentiment=%.1f",
signal.feedback_type, signal.response_id, signal.sentiment_score,
)
2. Feedback Processing Pipeline
2.1 Signal Aggregation and Noise Filtering
graph TD
subgraph "Raw Signals"
S1[Thumbs Down x 3]
S2[Thumbs Up x 12]
S3[Curator: accuracy=0.4]
S4[Click-through: 2/10]
end
subgraph "Aggregation"
S1 --> AGG[Signal Aggregator]
S2 --> AGG
S3 --> AGG
S4 --> AGG
AGG --> NOISE[Noise Filter<br/>Remove outliers,<br/>weight by source reliability]
end
subgraph "Composite Score"
NOISE --> CS[Composite Quality Score<br/>weighted average]
CS --> |score = 0.68| ACTION{Score < 0.70?}
ACTION -->|Yes| REVIEW[Flag for review<br/>+ cache invalidation]
ACTION -->|No| ARCHIVE[Archive for batch analysis]
end
style AGG fill:#264653,stroke:#2a9d8f,color:#fff
style CS fill:#e9c46a,stroke:#f4a261,color:#000
2.2 Source Reliability Weights
| Signal Source | Weight | Rationale |
|---|---|---|
| Curator rating | 0.40 | Highest expertise, most reliable |
| Text correction | 0.30 | Explicit ground truth |
| Thumbs up/down | 0.15 | High volume but noisy |
| Click-through | 0.10 | Positional bias affects reliability |
| Dwell time | 0.05 | Highly confounded, weak signal |
"""
MangaAssist Feedback Signal Aggregator.
Aggregates multi-source feedback signals into a composite
quality score per response, with noise filtering and
source-reliability weighting.
"""
import logging
import statistics
from dataclasses import dataclass, field
from typing import Optional
logger = logging.getLogger(__name__)
# Source reliability weights (must sum to 1.0 when present)
SOURCE_WEIGHTS = {
"curator": 0.40,
"text_correction": 0.30,
"thumbs": 0.15,
"click_through": 0.10,
"dwell_time": 0.05,
}
@dataclass
class AggregatedFeedback:
"""Aggregated feedback for a single response."""
response_id: str
composite_score: float
signal_count: int
source_breakdown: dict = field(default_factory=dict)
needs_review: bool = False
needs_cache_invalidation: bool = False
confidence: float = 0.0 # How confident we are in the composite score
class FeedbackAggregator:
"""
Aggregates raw feedback signals into actionable quality scores.
Key behaviors:
- Weights signals by source reliability
- Filters outlier signals (z-score > 2)
- Requires minimum signal count for action
- Applies Bayesian prior for low-sample responses
"""
REVIEW_THRESHOLD = 0.70
CACHE_INVALIDATION_THRESHOLD = 0.50
MIN_SIGNALS_FOR_ACTION = 3
BAYESIAN_PRIOR = 0.75 # Assume responses are OK until proven otherwise
BAYESIAN_PRIOR_WEIGHT = 5 # Equivalent to 5 "average" signals
def aggregate(
self, response_id: str, signals: list[dict]
) -> AggregatedFeedback:
"""
Aggregate feedback signals for a single response.
signals: [{"type": str, "score": float, "source": str}, ...]
"""
if not signals:
return AggregatedFeedback(
response_id=response_id,
composite_score=self.BAYESIAN_PRIOR,
signal_count=0,
confidence=0.0,
)
# Group by source type
grouped: dict[str, list[float]] = {}
for sig in signals:
source_type = self._classify_source(sig)
grouped.setdefault(source_type, []).append(sig["score"])
# Filter outliers within each group
filtered: dict[str, list[float]] = {}
for source_type, scores in grouped.items():
filtered[source_type] = self._filter_outliers(scores)
# Compute per-source averages
source_averages: dict[str, float] = {}
for source_type, scores in filtered.items():
if scores:
source_averages[source_type] = statistics.mean(scores)
# Weighted composite score
total_weight = 0.0
weighted_sum = 0.0
for source_type, avg in source_averages.items():
weight = SOURCE_WEIGHTS.get(source_type, 0.05)
weighted_sum += avg * weight
total_weight += weight
if total_weight > 0:
raw_composite = weighted_sum / total_weight
else:
raw_composite = self.BAYESIAN_PRIOR
# Apply Bayesian prior for low-sample responses
n = len(signals)
composite = (
(self.BAYESIAN_PRIOR * self.BAYESIAN_PRIOR_WEIGHT + raw_composite * n)
/ (self.BAYESIAN_PRIOR_WEIGHT + n)
)
# Confidence increases with signal count
confidence = min(1.0, n / (n + self.BAYESIAN_PRIOR_WEIGHT))
result = AggregatedFeedback(
response_id=response_id,
composite_score=round(composite, 4),
signal_count=n,
source_breakdown=source_averages,
needs_review=(
composite < self.REVIEW_THRESHOLD
and n >= self.MIN_SIGNALS_FOR_ACTION
),
needs_cache_invalidation=(
composite < self.CACHE_INVALIDATION_THRESHOLD
and n >= self.MIN_SIGNALS_FOR_ACTION
),
confidence=round(confidence, 4),
)
if result.needs_review:
logger.warning(
"Response %s flagged for review: composite=%.3f signals=%d",
response_id, composite, n,
)
return result
def _classify_source(self, signal: dict) -> str:
"""Map signal to source type for weighting."""
sig_type = signal.get("type", "")
if "curator" in sig_type:
return "curator"
if "correction" in sig_type:
return "text_correction"
if "thumb" in sig_type:
return "thumbs"
if "click" in sig_type:
return "click_through"
if "dwell" in sig_type:
return "dwell_time"
return "thumbs" # Default
def _filter_outliers(
self, scores: list[float], z_threshold: float = 2.0
) -> list[float]:
"""Remove outlier scores using z-score filtering."""
if len(scores) < 3:
return scores
mean = statistics.mean(scores)
stdev = statistics.stdev(scores)
if stdev == 0:
return scores
return [
s for s in scores
if abs(s - mean) / stdev <= z_threshold
]
3. Active Learning for Reviewer Prioritization
3.1 Active Learning Strategy
MangaAssist generates 1M responses/day. Human reviewers can assess ~500 responses/day. Active learning selects the most informative responses for review, maximizing learning per human hour.
graph TD
subgraph "Response Pool (1M/day)"
R1[High confidence<br/>No feedback issues]
R2[Low confidence<br/>Ensemble disagreement]
R3[Near threshold<br/>Borderline quality]
R4[Negative feedback<br/>Customer reported issue]
R5[New intent detected<br/>No training data]
end
subgraph "Active Learning Selector"
R2 --> PQ[Priority Queue]
R3 --> PQ
R4 --> PQ
R5 --> PQ
R1 --> |sampled at 0.1%| PQ
end
subgraph "Reviewer Assignment"
PQ --> RM[Reviewer Matcher<br/>Expertise + Workload]
RM --> JP[JP Specialist<br/>Japanese content]
RM --> GEN[General Reviewer<br/>English content]
RM --> SR[Senior Reviewer<br/>Edge cases]
end
style PQ fill:#264653,stroke:#2a9d8f,color:#fff
style RM fill:#2a9d8f,stroke:#264653,color:#fff
3.2 Priority Scoring
| Selection Criterion | Priority Score | Volume/Day | Rationale |
|---|---|---|---|
| Customer thumbs-down | 10.0 | ~15,000 | Direct negative signal |
| Ensemble high disagreement | 8.0 | ~25,000 | Models uncertain |
| Low model confidence (< 0.5) | 7.0 | ~40,000 | Model knows it is guessing |
| Near quality threshold (0.65-0.75) | 5.0 | ~80,000 | Borderline — review improves calibration |
| New/unknown intent | 9.0 | ~2,000 | Critical for coverage expansion |
| Random sample (high confidence) | 1.0 | ~1,000 | Baseline quality monitoring |
"""
MangaAssist Active Learning Selector.
Prioritizes responses for human review based on
informativeness, uncertainty, and business impact.
"""
import heapq
import logging
import time
from dataclasses import dataclass, field
from enum import Enum
from typing import Optional
logger = logging.getLogger(__name__)
class SelectionReason(str, Enum):
NEGATIVE_FEEDBACK = "negative_feedback"
HIGH_DISAGREEMENT = "high_disagreement"
LOW_CONFIDENCE = "low_confidence"
NEAR_THRESHOLD = "near_threshold"
NEW_INTENT = "new_intent"
RANDOM_SAMPLE = "random_sample"
@dataclass(order=True)
class ReviewCandidate:
"""A response prioritized for human review."""
priority: float = field(compare=True) # Higher = more important
response_id: str = field(compare=False)
query_text: str = field(compare=False)
response_text: str = field(compare=False)
selection_reason: str = field(compare=False)
model_confidence: float = field(compare=False, default=0.0)
ensemble_agreement: float = field(compare=False, default=0.0)
language: str = field(compare=False, default="en")
intent: str = field(compare=False, default="unknown")
timestamp: int = field(compare=False, default_factory=lambda: int(time.time()))
class ActiveLearningSelector:
"""
Selects the most informative responses for human review
using uncertainty sampling and business-impact scoring.
Maintains a bounded priority queue (max 2000 items)
that reviewers consume throughout the day.
"""
MAX_QUEUE_SIZE = 2000
PRIORITY_SCORES = {
SelectionReason.NEGATIVE_FEEDBACK: 10.0,
SelectionReason.NEW_INTENT: 9.0,
SelectionReason.HIGH_DISAGREEMENT: 8.0,
SelectionReason.LOW_CONFIDENCE: 7.0,
SelectionReason.NEAR_THRESHOLD: 5.0,
SelectionReason.RANDOM_SAMPLE: 1.0,
}
def __init__(self):
self._queue: list[ReviewCandidate] = []
def evaluate_for_review(
self,
response_id: str,
query_text: str,
response_text: str,
model_confidence: float,
ensemble_agreement: float,
has_negative_feedback: bool,
intent: str,
known_intents: set[str],
language: str = "en",
) -> Optional[ReviewCandidate]:
"""
Evaluate whether a response should be queued for review.
Returns the candidate if selected, None otherwise.
"""
reasons: list[tuple[SelectionReason, float]] = []
if has_negative_feedback:
reasons.append((
SelectionReason.NEGATIVE_FEEDBACK,
self.PRIORITY_SCORES[SelectionReason.NEGATIVE_FEEDBACK],
))
if intent not in known_intents:
reasons.append((
SelectionReason.NEW_INTENT,
self.PRIORITY_SCORES[SelectionReason.NEW_INTENT],
))
if ensemble_agreement < 0.50:
reasons.append((
SelectionReason.HIGH_DISAGREEMENT,
self.PRIORITY_SCORES[SelectionReason.HIGH_DISAGREEMENT],
))
if model_confidence < 0.50:
reasons.append((
SelectionReason.LOW_CONFIDENCE,
self.PRIORITY_SCORES[SelectionReason.LOW_CONFIDENCE],
))
if 0.65 <= model_confidence <= 0.75:
reasons.append((
SelectionReason.NEAR_THRESHOLD,
self.PRIORITY_SCORES[SelectionReason.NEAR_THRESHOLD],
))
if not reasons:
return None # High confidence, no issues, skip
# Use the highest-priority reason
best_reason, priority = max(reasons, key=lambda x: x[1])
# Language boost: JP content is harder to evaluate, prioritize
if language == "ja":
priority *= 1.2
candidate = ReviewCandidate(
priority=priority,
response_id=response_id,
query_text=query_text,
response_text=response_text,
selection_reason=best_reason.value,
model_confidence=model_confidence,
ensemble_agreement=ensemble_agreement,
language=language,
intent=intent,
)
self._enqueue(candidate)
return candidate
def _enqueue(self, candidate: ReviewCandidate) -> None:
"""Add to bounded priority queue."""
if len(self._queue) < self.MAX_QUEUE_SIZE:
heapq.heappush(self._queue, candidate)
elif candidate.priority > self._queue[0].priority:
heapq.heapreplace(self._queue, candidate)
def get_next_for_reviewer(
self,
reviewer_expertise: str = "general",
batch_size: int = 10,
) -> list[ReviewCandidate]:
"""
Get the next batch of responses for a specific reviewer.
Matches expertise (JP specialist gets JP content first).
"""
# Sort by priority descending
all_items = sorted(self._queue, key=lambda x: x.priority, reverse=True)
# Filter by reviewer expertise
if reviewer_expertise == "japanese":
preferred = [c for c in all_items if c.language == "ja"]
fallback = [c for c in all_items if c.language != "ja"]
ordered = preferred + fallback
elif reviewer_expertise == "senior":
# Senior gets edge cases: new intents and high disagreement
preferred = [
c for c in all_items
if c.selection_reason in ("new_intent", "high_disagreement")
]
fallback = [c for c in all_items if c not in preferred]
ordered = preferred + fallback
else:
ordered = all_items
batch = ordered[:batch_size]
# Remove from queue
for item in batch:
if item in self._queue:
self._queue.remove(item)
heapq.heapify(self._queue)
return batch
@property
def queue_size(self) -> int:
return len(self._queue)
4. Human Knowledge Injection
4.1 Curator Knowledge Flow
graph TD
subgraph "Curator Inputs"
GT[Genre Taxonomy<br/>Shonen, Seinen, Josei...]
AR[Author Relationships<br/>Ohba + Obata = Death Note]
CC[Cultural Context<br/>JP holidays, manga events]
SP[Staff Picks<br/>Curated recommendations]
GL[Glossary<br/>Manga-specific terms]
end
subgraph "Injection Points"
GT --> SYS[System Prompt<br/>Genre classification context]
AR --> RAG[RAG Knowledge Base<br/>Author/title metadata]
CC --> DYN[Dynamic Prompt<br/>Seasonal context injection]
SP --> OVR[Override Layer<br/>Staff picks supersede AI recs]
GL --> EMB[Embedding Index<br/>Domain-specific vocabulary]
end
subgraph "FM Pipeline"
SYS --> FM[Bedrock Claude 3]
RAG --> FM
DYN --> FM
OVR --> POST[Post-Processing<br/>Merge overrides with AI]
FM --> POST
EMB --> OS[OpenSearch<br/>Retrieval]
OS --> FM
end
style OVR fill:#e76f51,stroke:#f4a261,color:#fff
style FM fill:#264653,stroke:#2a9d8f,color:#fff
4.2 Knowledge Injection Comparison
| Injection Method | Latency Impact | Update Frequency | Scope | Best For |
|---|---|---|---|---|
| System prompt | 0ms (pre-loaded) | Weekly | All queries of a type | Genre taxonomy, tone guidelines |
| Dynamic prompt | +5ms (Redis lookup) | Daily/hourly | Contextual queries | Seasonal events, promotions |
| RAG knowledge base | +20ms (vector search) | Daily batch | Factual queries | Author data, release dates |
| Override layer | +2ms (Redis lookup) | Real-time | Specific responses | Staff picks, corrections |
| Embedding index | +8ms (re-index cost) | Weekly batch | Search quality | Domain vocabulary |
"""
MangaAssist Curator Knowledge Injection System.
Manages the flow of human expert knowledge into the FM pipeline
through multiple injection points: system prompts, dynamic context,
RAG knowledge base, and real-time overrides.
"""
import json
import logging
import time
from dataclasses import dataclass, field
from enum import Enum
from typing import Optional
import boto3
import redis
logger = logging.getLogger(__name__)
class InjectionPoint(str, Enum):
SYSTEM_PROMPT = "system_prompt"
DYNAMIC_PROMPT = "dynamic_prompt"
RAG_KNOWLEDGE = "rag_knowledge"
OVERRIDE = "override"
EMBEDDING_INDEX = "embedding_index"
@dataclass
class KnowledgeEntry:
"""A single piece of curator-injected knowledge."""
entry_id: str
content: str
category: str # "genre_taxonomy", "author_info", "cultural_context", etc.
injection_point: str
curator_id: str
language: str = "en"
priority: int = 0 # Higher = more important
expires_at: Optional[int] = None # Unix timestamp, None = permanent
created_at: int = field(default_factory=lambda: int(time.time()))
class CuratorKnowledgeManager:
"""
Manages curator knowledge entries across injection points.
Knowledge flows:
1. Curator creates/updates entry via admin API
2. Entry stored in DynamoDB with injection_point classification
3. Injection workers push to appropriate target (Redis, OpenSearch, S3)
4. FM pipeline reads from targets at query time
"""
def __init__(
self,
dynamodb_resource=None,
redis_client: Optional[redis.Redis] = None,
table_name: str = "mangaassist_curator_knowledge",
):
self.dynamodb = dynamodb_resource or boto3.resource(
"dynamodb", region_name="ap-northeast-1"
)
self.table = self.dynamodb.Table(table_name)
self.redis = redis_client
def add_knowledge(self, entry: KnowledgeEntry) -> None:
"""Add a curator knowledge entry."""
item = {
"entry_id": entry.entry_id,
"content": entry.content,
"category": entry.category,
"injection_point": entry.injection_point,
"curator_id": entry.curator_id,
"language": entry.language,
"priority": entry.priority,
"created_at": entry.created_at,
}
if entry.expires_at:
item["expires_at"] = entry.expires_at
item["ttl"] = entry.expires_at # DynamoDB TTL
self.table.put_item(Item=item)
# Real-time injection for override and dynamic_prompt types
if entry.injection_point in (
InjectionPoint.OVERRIDE, InjectionPoint.DYNAMIC_PROMPT
):
self._push_to_redis(entry)
logger.info(
"Knowledge added: id=%s category=%s injection=%s",
entry.entry_id, entry.category, entry.injection_point,
)
def get_dynamic_context(
self, intent: str, language: str = "en"
) -> list[str]:
"""
Retrieve dynamic prompt context for a given intent and language.
Called at query time to enrich the FM prompt.
"""
if not self.redis:
return []
key = f"curator:dynamic:{intent}:{language}"
entries = self.redis.lrange(key, 0, 5) # Max 5 context entries
return [e.decode() for e in entries] if entries else []
def get_overrides(
self, query_text: str, intent: str
) -> Optional[str]:
"""
Check if a curator override exists for this query/intent.
Overrides completely replace the AI response.
"""
if not self.redis:
return None
# Check intent-level override
key = f"curator:override:{intent}"
override = self.redis.get(key)
if override:
return override.decode()
return None
def build_system_prompt_context(
self, intent: str, language: str = "en"
) -> str:
"""
Build system prompt enrichment from curator knowledge.
Called during prompt template assembly (cached for 1 hour).
"""
response = self.table.query(
IndexName="injection_point-category-index",
KeyConditionExpression=(
"injection_point = :ip"
),
ExpressionAttributeValues={
":ip": InjectionPoint.SYSTEM_PROMPT,
},
Limit=20,
)
entries = response.get("Items", [])
relevant = [
e for e in entries
if e.get("language", "en") == language
]
if not relevant:
return ""
# Sort by priority descending
relevant.sort(key=lambda x: x.get("priority", 0), reverse=True)
context_parts = [e["content"] for e in relevant[:10]]
return "\n".join(context_parts)
def _push_to_redis(self, entry: KnowledgeEntry) -> None:
"""Push real-time entries to Redis for fast access."""
if not self.redis:
return
if entry.injection_point == InjectionPoint.OVERRIDE:
key = f"curator:override:{entry.category}"
self.redis.set(key, entry.content)
if entry.expires_at:
ttl = entry.expires_at - int(time.time())
if ttl > 0:
self.redis.expire(key, ttl)
elif entry.injection_point == InjectionPoint.DYNAMIC_PROMPT:
key = f"curator:dynamic:{entry.category}:{entry.language}"
self.redis.lpush(key, entry.content)
self.redis.ltrim(key, 0, 9) # Keep max 10 entries
5. Curator Override Mechanisms
5.1 Override Priority Hierarchy
graph TD
subgraph "Override Levels (highest to lowest priority)"
E[Emergency Override<br/>Kill-switch, safety blocks<br/>Priority: 100]
S[Staff Pick Override<br/>Curator-curated responses<br/>Priority: 80]
C[Correction Override<br/>Verified ground truth<br/>Priority: 60]
P[Prompt Override<br/>System prompt modifications<br/>Priority: 40]
D[Default<br/>AI-generated response<br/>Priority: 0]
end
E --> |supersedes| S --> |supersedes| C --> |supersedes| P --> |supersedes| D
style E fill:#e76f51,stroke:#f4a261,color:#fff
style S fill:#264653,stroke:#2a9d8f,color:#fff
5.2 Override Implementation
"""
MangaAssist Curator Override Engine.
Implements a priority-based override system where human curators
can replace, modify, or block AI responses at multiple levels.
"""
import json
import logging
import time
from dataclasses import dataclass
from enum import Enum
from typing import Optional
import redis
logger = logging.getLogger(__name__)
class OverrideLevel(str, Enum):
EMERGENCY = "emergency" # Priority 100 — safety blocks
STAFF_PICK = "staff_pick" # Priority 80 — curated responses
CORRECTION = "correction" # Priority 60 — ground truth
PROMPT = "prompt" # Priority 40 — prompt mods
NONE = "none" # Priority 0 — use AI response
OVERRIDE_PRIORITY = {
OverrideLevel.EMERGENCY: 100,
OverrideLevel.STAFF_PICK: 80,
OverrideLevel.CORRECTION: 60,
OverrideLevel.PROMPT: 40,
OverrideLevel.NONE: 0,
}
@dataclass
class OverrideResult:
"""Result of checking for overrides."""
has_override: bool
level: OverrideLevel
override_response: Optional[str] = None
override_id: Optional[str] = None
curator_id: Optional[str] = None
reason: str = ""
class CuratorOverrideEngine:
"""
Checks for curator overrides before returning AI responses.
Override matching order:
1. Emergency blocks (query content matches blocked patterns)
2. Staff picks (exact intent + entity match)
3. Corrections (response_id match from feedback)
4. Prompt overrides (intent-level prompt modifications)
"""
def __init__(self, redis_client: redis.Redis):
self.redis = redis_client
def check_overrides(
self,
query_text: str,
intent: str,
entities: list[str],
response_id: str = "",
) -> OverrideResult:
"""Check all override levels and return the highest priority match."""
# Level 1: Emergency blocks
emergency = self._check_emergency(query_text)
if emergency:
return emergency
# Level 2: Staff picks
staff_pick = self._check_staff_picks(intent, entities)
if staff_pick:
return staff_pick
# Level 3: Corrections
correction = self._check_corrections(response_id, intent)
if correction:
return correction
# Level 4: Prompt overrides (return modification, not replacement)
prompt_override = self._check_prompt_overrides(intent)
if prompt_override:
return prompt_override
return OverrideResult(
has_override=False,
level=OverrideLevel.NONE,
)
def _check_emergency(self, query_text: str) -> Optional[OverrideResult]:
"""Check for emergency content blocks."""
blocked_patterns = self.redis.smembers("override:emergency:blocked_patterns")
for pattern in blocked_patterns:
if pattern.decode().lower() in query_text.lower():
response = self.redis.get("override:emergency:response")
return OverrideResult(
has_override=True,
level=OverrideLevel.EMERGENCY,
override_response=(
response.decode() if response
else "I'm sorry, I can't help with that request."
),
reason=f"Emergency block: matched pattern",
)
return None
def _check_staff_picks(
self, intent: str, entities: list[str]
) -> Optional[OverrideResult]:
"""Check for curator staff pick responses."""
for entity in entities:
key = f"override:staff_pick:{intent}:{entity.lower()}"
pick_data = self.redis.get(key)
if pick_data:
pick = json.loads(pick_data)
return OverrideResult(
has_override=True,
level=OverrideLevel.STAFF_PICK,
override_response=pick["response"],
override_id=pick.get("override_id"),
curator_id=pick.get("curator_id"),
reason=f"Staff pick for {intent}:{entity}",
)
return None
def _check_corrections(
self, response_id: str, intent: str
) -> Optional[OverrideResult]:
"""Check for curator corrections to previous responses."""
if not response_id:
return None
key = f"override:correction:{response_id}"
correction = self.redis.get(key)
if correction:
data = json.loads(correction)
return OverrideResult(
has_override=True,
level=OverrideLevel.CORRECTION,
override_response=data["corrected_response"],
override_id=data.get("override_id"),
curator_id=data.get("curator_id"),
reason="Curator correction for previous response",
)
return None
def _check_prompt_overrides(
self, intent: str
) -> Optional[OverrideResult]:
"""Check for intent-level prompt modifications."""
key = f"override:prompt:{intent}"
prompt_mod = self.redis.get(key)
if prompt_mod:
return OverrideResult(
has_override=True,
level=OverrideLevel.PROMPT,
override_response=prompt_mod.decode(),
reason=f"Prompt override for intent: {intent}",
)
return None
def set_staff_pick(
self,
intent: str,
entity: str,
response: str,
curator_id: str,
expires_days: int = 30,
) -> None:
"""Set a curator staff pick override."""
key = f"override:staff_pick:{intent}:{entity.lower()}"
data = {
"response": response,
"curator_id": curator_id,
"override_id": f"sp:{intent}:{entity}:{int(time.time())}",
"created_at": int(time.time()),
}
self.redis.set(key, json.dumps(data))
self.redis.expire(key, expires_days * 86400)
logger.info(
"Staff pick set: intent=%s entity=%s curator=%s",
intent, entity, curator_id,
)
6. Feedback-Driven Prompt Refinement
6.1 Prompt Refinement Pipeline
sequenceDiagram
participant FB as Feedback Store
participant SEL as Batch Selector
participant EVAL as Evaluator (Sonnet)
participant REF as Prompt Refiner
participant S3 as S3 Prompt Store
participant PROD as Production
Note over FB,PROD: Weekly Batch Process (Step Functions)
FB->>SEL: Get responses with<br/>composite_score < 0.70
SEL->>SEL: Sample 200 low-quality<br/>responses by intent
loop For each intent category
SEL->>EVAL: Analyze failure patterns<br/>in low-quality responses
EVAL->>EVAL: Identify common issues:<br/>tone, accuracy, relevance
EVAL->>REF: Failure pattern report
REF->>REF: Generate improved<br/>system prompt variant
REF->>EVAL: A/B test variant on<br/>held-out evaluation set
EVAL-->>REF: Quality improvement: +8%
end
REF->>S3: Store approved prompt<br/>with version tag
S3->>PROD: Deploy via feature flag<br/>gradual rollout (10% -> 50% -> 100%)
6.2 Prompt Refinement Implementation
"""
MangaAssist Feedback-Driven Prompt Refinement.
Analyzes patterns in low-quality responses and generates
improved system prompt variants. Runs as a weekly batch
process via Step Functions.
"""
import json
import logging
from dataclasses import dataclass
import boto3
logger = logging.getLogger(__name__)
@dataclass
class FailurePattern:
"""A common failure pattern identified in low-quality responses."""
pattern_type: str # "tone", "accuracy", "relevance", "completeness"
description: str
example_count: int
example_queries: list[str]
@dataclass
class PromptVariant:
"""A new system prompt variant generated from failure analysis."""
intent: str
version: str
prompt_text: str
changes_summary: str
expected_quality_gain: float
class PromptRefiner:
"""
Generates improved system prompts based on feedback analysis.
Uses Sonnet as both the analyzer and prompt generator.
"""
def __init__(self, bedrock_client=None):
self.bedrock = bedrock_client or boto3.client(
"bedrock-runtime", region_name="ap-northeast-1"
)
def analyze_failures(
self,
low_quality_responses: list[dict],
intent: str,
) -> list[FailurePattern]:
"""
Analyze a batch of low-quality responses to identify patterns.
low_quality_responses: [{"query": str, "response": str, "score": float, "feedback": str}, ...]
"""
examples_text = "\n\n".join(
f"Query: {r['query']}\nResponse: {r['response']}\n"
f"Quality Score: {r['score']}\nFeedback: {r.get('feedback', 'None')}"
for r in low_quality_responses[:20]
)
prompt = (
"Analyze these low-quality chatbot responses for a manga store.\n"
f"All are from the '{intent}' intent category.\n\n"
f"{examples_text}\n\n"
"Identify the top 3 common failure patterns. For each pattern, specify:\n"
"1. pattern_type: one of [tone, accuracy, relevance, completeness, language]\n"
"2. description: what is going wrong\n"
"3. example_count: how many of the responses show this pattern\n\n"
"Respond with JSON array."
)
body = json.dumps({
"anthropic_version": "bedrock-2023-05-31",
"max_tokens": 1000,
"temperature": 0.1,
"messages": [{"role": "user", "content": prompt}],
})
response = self.bedrock.invoke_model(
modelId="anthropic.claude-3-sonnet-20240229-v1:0",
body=body,
contentType="application/json",
accept="application/json",
)
result = json.loads(response["body"].read())
text = result["content"][0]["text"]
try:
patterns_raw = json.loads(text)
return [
FailurePattern(
pattern_type=p.get("pattern_type", "unknown"),
description=p.get("description", ""),
example_count=p.get("example_count", 0),
example_queries=[],
)
for p in patterns_raw
]
except json.JSONDecodeError:
logger.warning("Failed to parse failure patterns: %s", text[:200])
return []
def generate_improved_prompt(
self,
current_prompt: str,
intent: str,
failure_patterns: list[FailurePattern],
) -> PromptVariant:
"""
Generate an improved system prompt that addresses
the identified failure patterns.
"""
patterns_text = "\n".join(
f"- {p.pattern_type}: {p.description} ({p.example_count} examples)"
for p in failure_patterns
)
prompt = (
"You are improving a system prompt for a manga store chatbot.\n\n"
f"Current system prompt for '{intent}' queries:\n"
f"---\n{current_prompt}\n---\n\n"
f"Identified failure patterns:\n{patterns_text}\n\n"
"Generate an improved system prompt that addresses these failures.\n"
"Keep the core behavior intact. Only add or modify instructions "
"that directly address the failure patterns.\n\n"
"Respond with JSON:\n"
'{{"improved_prompt": "...", "changes_summary": "...", '
'"expected_quality_gain_pct": N}}'
)
body = json.dumps({
"anthropic_version": "bedrock-2023-05-31",
"max_tokens": 2000,
"temperature": 0.2,
"messages": [{"role": "user", "content": prompt}],
})
response = self.bedrock.invoke_model(
modelId="anthropic.claude-3-sonnet-20240229-v1:0",
body=body,
contentType="application/json",
accept="application/json",
)
result = json.loads(response["body"].read())
text = result["content"][0]["text"]
try:
parsed = json.loads(text)
return PromptVariant(
intent=intent,
version=f"v{int(time.time())}",
prompt_text=parsed["improved_prompt"],
changes_summary=parsed.get("changes_summary", ""),
expected_quality_gain=parsed.get("expected_quality_gain_pct", 0.0),
)
except (json.JSONDecodeError, KeyError):
logger.error("Failed to parse improved prompt: %s", text[:200])
return PromptVariant(
intent=intent,
version="failed",
prompt_text=current_prompt,
changes_summary="Generation failed — using current prompt",
expected_quality_gain=0.0,
)
7. Feedback Loop Bias Detection
7.1 Common Feedback Biases
graph TD
subgraph "Feedback Biases"
PB[Positivity Bias<br/>Customers more likely<br/>to rate good experiences]
NB[Negativity Bias<br/>Angry customers over-report<br/>via corrections]
SB[Selection Bias<br/>Only engaged users<br/>give feedback]
AB[Anchoring Bias<br/>First response sets<br/>quality expectation]
end
subgraph "Mitigation"
PB --> M1[Balance with random<br/>sampling of non-feedback<br/>responses]
NB --> M2[Weight corrections by<br/>curator verification status]
SB --> M3[Track demographic<br/>coverage of feedback<br/>providers]
AB --> M4[Randomize A/B test<br/>exposure order]
end
style PB fill:#e76f51,stroke:#f4a261,color:#fff
style NB fill:#e76f51,stroke:#f4a261,color:#fff
| Bias Type | Impact on MangaAssist | Detection Method | Mitigation |
|---|---|---|---|
| Positivity bias | Quality appears higher than reality | Compare feedback vs. curator ratings | Include random curator sampling |
| Negativity bias | Overcorrect toward conservative responses | Track correction rate vs. actual error rate | Verify corrections before applying |
| Selection bias | Feedback skews toward power users | Monitor feedback coverage by user segment | Stratified sampling for evaluation |
| Popularity bias | Popular manga get more feedback | Track feedback per-title distribution | Normalize by query volume |
| Recency bias | Recent feedback dominates weight updates | Check EMA half-life vs. feedback frequency | Use longer EMA decay for stable intents |
8. Comparison Table: Feedback Integration Methods
| Method | Latency to Effect | Quality Improvement | Risk | Complexity | Volume Needed |
|---|---|---|---|---|---|
| Cache invalidation | Immediate | Prevents repeat bad answers | Low | Low | 1 negative signal |
| Weight adjustment (EMA) | Real-time | +3-5% per model | Medium (drift) | Medium | 10+ signals |
| Prompt refinement | Weekly batch | +5-10% per intent | Medium (regression) | High | 200+ signals |
| RAG knowledge update | Daily batch | +8-15% for factual queries | Low | Medium | 50+ corrections |
| Override deployment | Immediate | 100% for targeted queries | High (stale overrides) | Low | Curator decision |
| Model fine-tuning | Monthly cycle | +10-20% domain-wide | High (catastrophic forgetting) | Very high | 10,000+ labels |
Key Takeaways
-
Multi-signal feedback is more reliable than any single source — combining thumbs up/down, curator ratings, text corrections, and implicit signals with source-reliability weighting produces a composite score that is 40% more predictive of true quality than any individual signal.
-
Active learning makes human review 10x more efficient — by prioritizing low-confidence, high-disagreement, and negative-feedback responses, MangaAssist's 500 daily human reviews cover the same learning surface as 5,000 random reviews.
-
Bayesian priors prevent overreaction to sparse feedback — new responses start with a prior of 0.75 (assume OK until proven otherwise), requiring at least 3 signals before taking action, which prevents noisy single-vote feedback from causing unnecessary cache invalidation.
-
Curator overrides must have an expiration mechanism — staff picks and corrections that never expire become stale as the product catalog changes; all overrides default to 30-day expiry with curator opt-in for extension.
-
Feedback loop bias is the hidden danger — positivity bias makes quality look better than it is, while negativity bias in corrections can push the model toward overly conservative responses; random curator sampling and stratified evaluation counteract both.
-
Prompt refinement is the highest-ROI feedback application — weekly batch analysis of low-quality responses with Sonnet-generated prompt improvements yields 5-10% quality gain per intent category, compounding over months.
-
Knowledge injection has four speed tiers — emergency overrides (seconds), dynamic context (minutes), RAG updates (hours), system prompt refinement (days); each tier trades off speed for breadth of impact.