LOCAL PREVIEW View on GitHub

Feedback Loops & Human Augmentation Patterns for Collaborative AI

MangaAssist context: JP Manga store chatbot on AWS — Bedrock Claude 3 (Sonnet at $3/$15 per 1M tokens input/output, Haiku at $0.25/$1.25), OpenSearch Serverless (vector store), DynamoDB (sessions/products), ECS Fargate (orchestrator), API Gateway WebSocket, ElastiCache Redis. Target: useful answer in under 3 seconds, 1M messages/day scale.


Skill Mapping

Certification Domain Task Skill This File
AWS AIP-C01 Domain 2 — Implementation & Integration Task 2.1 — Select and implement FM integration approaches Skill 2.1.5 — Design collaborative AI systems with human-in-the-loop Feedback loops, active learning, human knowledge injection, curator override mechanisms

Skill scope: Deep-dive into feedback augmentation patterns — how MangaAssist collects, processes, and applies human feedback to continuously improve FM responses. Covers feedback loop architecture, active learning for reviewer prioritization, curator knowledge injection into prompts and RAG, and override mechanisms that let human experts correct AI behavior in real-time.


Mind Map: Feedback & Augmentation Patterns

mindmap
  root((Feedback &<br/>Augmentation<br/>Patterns))
    Feedback Loop Architecture
      Collection Layer
        Binary thumbs up/down
        Multi-dimension ratings
        Free-text corrections
        Implicit signals (dwell time, CTR)
      Processing Pipeline
        Signal aggregation
        Noise filtering
        Batch vs real-time processing
        Feedback-to-label conversion
      Application Layer
        Prompt refinement
        RAG index updates
        Cache invalidation
        Weight adjustment
    Active Learning
      Uncertainty Sampling
        Low-confidence responses
        High-disagreement ensemble results
        Near-threshold classifications
      Query Strategy
        Pool-based sampling
        Stream-based selection
        Priority queue for reviewers
      Reviewer Routing
        Expertise matching
        Workload balancing
        SLA-based escalation
    Human Knowledge Injection
      Curator Expertise
        Genre taxonomy enrichment
        Author relationship graphs
        Cultural context annotations
        Seasonal trend tagging
      Prompt Enrichment
        Dynamic system prompt updates
        Few-shot example injection
        Domain glossary maintenance
      RAG Augmentation
        Expert-curated documents
        Verified ground truth entries
        Override knowledge base
    Override Mechanisms
      Real-Time Overrides
        Response replacement
        Cache entry correction
        Routing rule adjustment
      Batch Overrides
        Prompt template updates
        Knowledge base refresh
        Model weight recalibration
      Safety Overrides
        Content blocking rules
        Emergency response templates
        Kill switch for features

1. Feedback Loop Architecture

1.1 End-to-End Feedback Flow

graph TD
    subgraph "Collection"
        U[Customer] -->|thumbs up/down| FB1[Binary Feedback]
        U -->|correction text| FB2[Text Correction]
        U -->|click/no-click| FB3[Implicit Signal]
        CUR[Curator] -->|quality rating| FB4[Expert Rating]
    end

    subgraph "Ingestion"
        FB1 --> KIN[Kinesis Data Stream<br/>mangaassist-feedback]
        FB2 --> KIN
        FB3 --> KIN
        FB4 --> KIN
    end

    subgraph "Processing"
        KIN --> LAM[Lambda: Feedback Processor]
        LAM --> AGG[Aggregate Signals<br/>per response_id]
        AGG --> STORE[(DynamoDB<br/>feedback_signals)]
        AGG --> CW[CloudWatch Metrics]
    end

    subgraph "Application"
        STORE --> PROMPT[Prompt Refinement<br/>weekly batch]
        STORE --> RAG_UP[RAG Index Update<br/>daily batch]
        STORE --> CACHE_INV[Cache Invalidation<br/>real-time]
        STORE --> WEIGHT[Model Weight Update<br/>EMA — real-time]
    end

    style KIN fill:#264653,stroke:#2a9d8f,color:#fff
    style LAM fill:#2a9d8f,stroke:#264653,color:#fff
    style STORE fill:#e9c46a,stroke:#f4a261,color:#000

1.2 Feedback Signal Types

Signal Source Latency Volume Reliability MangaAssist Use
Thumbs up/down Customer UI Real-time ~5% of queries Medium (noisy) Quick quality pulse, cache invalidation
Star rating Post-interaction survey 5-30 min delay ~1% of queries High Weekly quality reports
Text correction Customer or curator Real-time ~0.5% of queries Very high Ground truth for active learning
Click-through Frontend analytics Real-time 100% of recommendations Medium (positional bias) Recommendation quality signal
Dwell time Frontend analytics Real-time 100% of queries Low (confounded) Engagement proxy
Curator rating Internal review tool 1-24 hr delay ~2% of queries (sampled) Very high Model evaluation, prompt tuning

1.3 Feedback Collection Implementation

"""
MangaAssist Feedback Collection and Processing Pipeline.

Collects multi-signal feedback, normalizes into a unified schema,
and routes to Kinesis for downstream processing.
"""

import json
import logging
import time
import uuid
from dataclasses import dataclass, field, asdict
from enum import Enum
from typing import Optional

import boto3

logger = logging.getLogger(__name__)


class FeedbackType(str, Enum):
    THUMBS_UP = "thumbs_up"
    THUMBS_DOWN = "thumbs_down"
    STAR_RATING = "star_rating"
    TEXT_CORRECTION = "text_correction"
    CLICK_THROUGH = "click_through"
    CURATOR_RATING = "curator_rating"


@dataclass
class FeedbackSignal:
    """Unified feedback signal schema."""
    feedback_id: str = field(default_factory=lambda: str(uuid.uuid4()))
    session_id: str = ""
    response_id: str = ""
    query_text: str = ""
    response_text: str = ""
    feedback_type: str = ""
    # Binary: 1.0 = positive, 0.0 = negative
    sentiment_score: float = 0.5
    # Optional detailed ratings (0.0 - 1.0)
    accuracy_rating: Optional[float] = None
    helpfulness_rating: Optional[float] = None
    tone_rating: Optional[float] = None
    # Text correction if provided
    correction_text: Optional[str] = None
    # Metadata
    source: str = "customer"  # "customer" or "curator"
    language: str = "en"
    timestamp: int = field(default_factory=lambda: int(time.time()))
    model_id: str = ""
    intent: str = ""


class FeedbackCollector:
    """
    Collects feedback from multiple sources and publishes
    to Kinesis for downstream processing.
    """

    def __init__(
        self,
        kinesis_client=None,
        stream_name: str = "mangaassist-feedback",
        dynamodb_resource=None,
        table_name: str = "mangaassist_feedback",
    ):
        self.kinesis = kinesis_client or boto3.client(
            "kinesis", region_name="ap-northeast-1"
        )
        self.stream_name = stream_name
        self.dynamodb = dynamodb_resource or boto3.resource(
            "dynamodb", region_name="ap-northeast-1"
        )
        self.table = self.dynamodb.Table(table_name)

    def collect_thumbs(
        self,
        session_id: str,
        response_id: str,
        is_positive: bool,
        query_text: str = "",
        response_text: str = "",
        model_id: str = "",
        intent: str = "",
        language: str = "en",
    ) -> FeedbackSignal:
        """Collect binary thumbs up/down feedback."""
        signal = FeedbackSignal(
            session_id=session_id,
            response_id=response_id,
            query_text=query_text,
            response_text=response_text,
            feedback_type=FeedbackType.THUMBS_UP if is_positive else FeedbackType.THUMBS_DOWN,
            sentiment_score=1.0 if is_positive else 0.0,
            source="customer",
            language=language,
            model_id=model_id,
            intent=intent,
        )

        self._publish(signal)
        return signal

    def collect_curator_rating(
        self,
        response_id: str,
        curator_id: str,
        accuracy: float,
        helpfulness: float,
        tone: float,
        correction_text: Optional[str] = None,
        query_text: str = "",
        response_text: str = "",
    ) -> FeedbackSignal:
        """Collect detailed curator review."""
        overall = (accuracy + helpfulness + tone) / 3.0
        signal = FeedbackSignal(
            response_id=response_id,
            query_text=query_text,
            response_text=response_text,
            feedback_type=FeedbackType.CURATOR_RATING,
            sentiment_score=overall,
            accuracy_rating=accuracy,
            helpfulness_rating=helpfulness,
            tone_rating=tone,
            correction_text=correction_text,
            source=f"curator:{curator_id}",
        )

        self._publish(signal)
        return signal

    def collect_text_correction(
        self,
        response_id: str,
        correction_text: str,
        query_text: str = "",
        original_response: str = "",
        source: str = "customer",
    ) -> FeedbackSignal:
        """Collect a text correction (ground truth)."""
        signal = FeedbackSignal(
            response_id=response_id,
            query_text=query_text,
            response_text=original_response,
            feedback_type=FeedbackType.TEXT_CORRECTION,
            sentiment_score=0.0,  # Correction implies original was wrong
            correction_text=correction_text,
            source=source,
        )

        self._publish(signal)
        return signal

    def _publish(self, signal: FeedbackSignal) -> None:
        """Publish feedback signal to Kinesis and DynamoDB."""
        record = asdict(signal)

        # Kinesis for real-time processing
        self.kinesis.put_record(
            StreamName=self.stream_name,
            Data=json.dumps(record),
            PartitionKey=signal.response_id or signal.feedback_id,
        )

        # DynamoDB for persistence and querying
        self.table.put_item(Item={
            k: v for k, v in record.items()
            if v is not None
        })

        logger.info(
            "Feedback collected: type=%s response_id=%s sentiment=%.1f",
            signal.feedback_type, signal.response_id, signal.sentiment_score,
        )

2. Feedback Processing Pipeline

2.1 Signal Aggregation and Noise Filtering

graph TD
    subgraph "Raw Signals"
        S1[Thumbs Down x 3]
        S2[Thumbs Up x 12]
        S3[Curator: accuracy=0.4]
        S4[Click-through: 2/10]
    end

    subgraph "Aggregation"
        S1 --> AGG[Signal Aggregator]
        S2 --> AGG
        S3 --> AGG
        S4 --> AGG
        AGG --> NOISE[Noise Filter<br/>Remove outliers,<br/>weight by source reliability]
    end

    subgraph "Composite Score"
        NOISE --> CS[Composite Quality Score<br/>weighted average]
        CS --> |score = 0.68| ACTION{Score < 0.70?}
        ACTION -->|Yes| REVIEW[Flag for review<br/>+ cache invalidation]
        ACTION -->|No| ARCHIVE[Archive for batch analysis]
    end

    style AGG fill:#264653,stroke:#2a9d8f,color:#fff
    style CS fill:#e9c46a,stroke:#f4a261,color:#000

2.2 Source Reliability Weights

Signal Source Weight Rationale
Curator rating 0.40 Highest expertise, most reliable
Text correction 0.30 Explicit ground truth
Thumbs up/down 0.15 High volume but noisy
Click-through 0.10 Positional bias affects reliability
Dwell time 0.05 Highly confounded, weak signal
"""
MangaAssist Feedback Signal Aggregator.

Aggregates multi-source feedback signals into a composite
quality score per response, with noise filtering and
source-reliability weighting.
"""

import logging
import statistics
from dataclasses import dataclass, field
from typing import Optional

logger = logging.getLogger(__name__)

# Source reliability weights (must sum to 1.0 when present)
SOURCE_WEIGHTS = {
    "curator": 0.40,
    "text_correction": 0.30,
    "thumbs": 0.15,
    "click_through": 0.10,
    "dwell_time": 0.05,
}


@dataclass
class AggregatedFeedback:
    """Aggregated feedback for a single response."""
    response_id: str
    composite_score: float
    signal_count: int
    source_breakdown: dict = field(default_factory=dict)
    needs_review: bool = False
    needs_cache_invalidation: bool = False
    confidence: float = 0.0  # How confident we are in the composite score


class FeedbackAggregator:
    """
    Aggregates raw feedback signals into actionable quality scores.

    Key behaviors:
    - Weights signals by source reliability
    - Filters outlier signals (z-score > 2)
    - Requires minimum signal count for action
    - Applies Bayesian prior for low-sample responses
    """

    REVIEW_THRESHOLD = 0.70
    CACHE_INVALIDATION_THRESHOLD = 0.50
    MIN_SIGNALS_FOR_ACTION = 3
    BAYESIAN_PRIOR = 0.75  # Assume responses are OK until proven otherwise
    BAYESIAN_PRIOR_WEIGHT = 5  # Equivalent to 5 "average" signals

    def aggregate(
        self, response_id: str, signals: list[dict]
    ) -> AggregatedFeedback:
        """
        Aggregate feedback signals for a single response.

        signals: [{"type": str, "score": float, "source": str}, ...]
        """
        if not signals:
            return AggregatedFeedback(
                response_id=response_id,
                composite_score=self.BAYESIAN_PRIOR,
                signal_count=0,
                confidence=0.0,
            )

        # Group by source type
        grouped: dict[str, list[float]] = {}
        for sig in signals:
            source_type = self._classify_source(sig)
            grouped.setdefault(source_type, []).append(sig["score"])

        # Filter outliers within each group
        filtered: dict[str, list[float]] = {}
        for source_type, scores in grouped.items():
            filtered[source_type] = self._filter_outliers(scores)

        # Compute per-source averages
        source_averages: dict[str, float] = {}
        for source_type, scores in filtered.items():
            if scores:
                source_averages[source_type] = statistics.mean(scores)

        # Weighted composite score
        total_weight = 0.0
        weighted_sum = 0.0
        for source_type, avg in source_averages.items():
            weight = SOURCE_WEIGHTS.get(source_type, 0.05)
            weighted_sum += avg * weight
            total_weight += weight

        if total_weight > 0:
            raw_composite = weighted_sum / total_weight
        else:
            raw_composite = self.BAYESIAN_PRIOR

        # Apply Bayesian prior for low-sample responses
        n = len(signals)
        composite = (
            (self.BAYESIAN_PRIOR * self.BAYESIAN_PRIOR_WEIGHT + raw_composite * n)
            / (self.BAYESIAN_PRIOR_WEIGHT + n)
        )

        # Confidence increases with signal count
        confidence = min(1.0, n / (n + self.BAYESIAN_PRIOR_WEIGHT))

        result = AggregatedFeedback(
            response_id=response_id,
            composite_score=round(composite, 4),
            signal_count=n,
            source_breakdown=source_averages,
            needs_review=(
                composite < self.REVIEW_THRESHOLD
                and n >= self.MIN_SIGNALS_FOR_ACTION
            ),
            needs_cache_invalidation=(
                composite < self.CACHE_INVALIDATION_THRESHOLD
                and n >= self.MIN_SIGNALS_FOR_ACTION
            ),
            confidence=round(confidence, 4),
        )

        if result.needs_review:
            logger.warning(
                "Response %s flagged for review: composite=%.3f signals=%d",
                response_id, composite, n,
            )

        return result

    def _classify_source(self, signal: dict) -> str:
        """Map signal to source type for weighting."""
        sig_type = signal.get("type", "")
        if "curator" in sig_type:
            return "curator"
        if "correction" in sig_type:
            return "text_correction"
        if "thumb" in sig_type:
            return "thumbs"
        if "click" in sig_type:
            return "click_through"
        if "dwell" in sig_type:
            return "dwell_time"
        return "thumbs"  # Default

    def _filter_outliers(
        self, scores: list[float], z_threshold: float = 2.0
    ) -> list[float]:
        """Remove outlier scores using z-score filtering."""
        if len(scores) < 3:
            return scores

        mean = statistics.mean(scores)
        stdev = statistics.stdev(scores)
        if stdev == 0:
            return scores

        return [
            s for s in scores
            if abs(s - mean) / stdev <= z_threshold
        ]

3. Active Learning for Reviewer Prioritization

3.1 Active Learning Strategy

MangaAssist generates 1M responses/day. Human reviewers can assess ~500 responses/day. Active learning selects the most informative responses for review, maximizing learning per human hour.

graph TD
    subgraph "Response Pool (1M/day)"
        R1[High confidence<br/>No feedback issues]
        R2[Low confidence<br/>Ensemble disagreement]
        R3[Near threshold<br/>Borderline quality]
        R4[Negative feedback<br/>Customer reported issue]
        R5[New intent detected<br/>No training data]
    end

    subgraph "Active Learning Selector"
        R2 --> PQ[Priority Queue]
        R3 --> PQ
        R4 --> PQ
        R5 --> PQ
        R1 --> |sampled at 0.1%| PQ
    end

    subgraph "Reviewer Assignment"
        PQ --> RM[Reviewer Matcher<br/>Expertise + Workload]
        RM --> JP[JP Specialist<br/>Japanese content]
        RM --> GEN[General Reviewer<br/>English content]
        RM --> SR[Senior Reviewer<br/>Edge cases]
    end

    style PQ fill:#264653,stroke:#2a9d8f,color:#fff
    style RM fill:#2a9d8f,stroke:#264653,color:#fff

3.2 Priority Scoring

Selection Criterion Priority Score Volume/Day Rationale
Customer thumbs-down 10.0 ~15,000 Direct negative signal
Ensemble high disagreement 8.0 ~25,000 Models uncertain
Low model confidence (< 0.5) 7.0 ~40,000 Model knows it is guessing
Near quality threshold (0.65-0.75) 5.0 ~80,000 Borderline — review improves calibration
New/unknown intent 9.0 ~2,000 Critical for coverage expansion
Random sample (high confidence) 1.0 ~1,000 Baseline quality monitoring
"""
MangaAssist Active Learning Selector.

Prioritizes responses for human review based on
informativeness, uncertainty, and business impact.
"""

import heapq
import logging
import time
from dataclasses import dataclass, field
from enum import Enum
from typing import Optional

logger = logging.getLogger(__name__)


class SelectionReason(str, Enum):
    NEGATIVE_FEEDBACK = "negative_feedback"
    HIGH_DISAGREEMENT = "high_disagreement"
    LOW_CONFIDENCE = "low_confidence"
    NEAR_THRESHOLD = "near_threshold"
    NEW_INTENT = "new_intent"
    RANDOM_SAMPLE = "random_sample"


@dataclass(order=True)
class ReviewCandidate:
    """A response prioritized for human review."""
    priority: float = field(compare=True)  # Higher = more important
    response_id: str = field(compare=False)
    query_text: str = field(compare=False)
    response_text: str = field(compare=False)
    selection_reason: str = field(compare=False)
    model_confidence: float = field(compare=False, default=0.0)
    ensemble_agreement: float = field(compare=False, default=0.0)
    language: str = field(compare=False, default="en")
    intent: str = field(compare=False, default="unknown")
    timestamp: int = field(compare=False, default_factory=lambda: int(time.time()))


class ActiveLearningSelector:
    """
    Selects the most informative responses for human review
    using uncertainty sampling and business-impact scoring.

    Maintains a bounded priority queue (max 2000 items)
    that reviewers consume throughout the day.
    """

    MAX_QUEUE_SIZE = 2000

    PRIORITY_SCORES = {
        SelectionReason.NEGATIVE_FEEDBACK: 10.0,
        SelectionReason.NEW_INTENT: 9.0,
        SelectionReason.HIGH_DISAGREEMENT: 8.0,
        SelectionReason.LOW_CONFIDENCE: 7.0,
        SelectionReason.NEAR_THRESHOLD: 5.0,
        SelectionReason.RANDOM_SAMPLE: 1.0,
    }

    def __init__(self):
        self._queue: list[ReviewCandidate] = []

    def evaluate_for_review(
        self,
        response_id: str,
        query_text: str,
        response_text: str,
        model_confidence: float,
        ensemble_agreement: float,
        has_negative_feedback: bool,
        intent: str,
        known_intents: set[str],
        language: str = "en",
    ) -> Optional[ReviewCandidate]:
        """
        Evaluate whether a response should be queued for review.
        Returns the candidate if selected, None otherwise.
        """
        reasons: list[tuple[SelectionReason, float]] = []

        if has_negative_feedback:
            reasons.append((
                SelectionReason.NEGATIVE_FEEDBACK,
                self.PRIORITY_SCORES[SelectionReason.NEGATIVE_FEEDBACK],
            ))

        if intent not in known_intents:
            reasons.append((
                SelectionReason.NEW_INTENT,
                self.PRIORITY_SCORES[SelectionReason.NEW_INTENT],
            ))

        if ensemble_agreement < 0.50:
            reasons.append((
                SelectionReason.HIGH_DISAGREEMENT,
                self.PRIORITY_SCORES[SelectionReason.HIGH_DISAGREEMENT],
            ))

        if model_confidence < 0.50:
            reasons.append((
                SelectionReason.LOW_CONFIDENCE,
                self.PRIORITY_SCORES[SelectionReason.LOW_CONFIDENCE],
            ))

        if 0.65 <= model_confidence <= 0.75:
            reasons.append((
                SelectionReason.NEAR_THRESHOLD,
                self.PRIORITY_SCORES[SelectionReason.NEAR_THRESHOLD],
            ))

        if not reasons:
            return None  # High confidence, no issues, skip

        # Use the highest-priority reason
        best_reason, priority = max(reasons, key=lambda x: x[1])

        # Language boost: JP content is harder to evaluate, prioritize
        if language == "ja":
            priority *= 1.2

        candidate = ReviewCandidate(
            priority=priority,
            response_id=response_id,
            query_text=query_text,
            response_text=response_text,
            selection_reason=best_reason.value,
            model_confidence=model_confidence,
            ensemble_agreement=ensemble_agreement,
            language=language,
            intent=intent,
        )

        self._enqueue(candidate)
        return candidate

    def _enqueue(self, candidate: ReviewCandidate) -> None:
        """Add to bounded priority queue."""
        if len(self._queue) < self.MAX_QUEUE_SIZE:
            heapq.heappush(self._queue, candidate)
        elif candidate.priority > self._queue[0].priority:
            heapq.heapreplace(self._queue, candidate)

    def get_next_for_reviewer(
        self,
        reviewer_expertise: str = "general",
        batch_size: int = 10,
    ) -> list[ReviewCandidate]:
        """
        Get the next batch of responses for a specific reviewer.
        Matches expertise (JP specialist gets JP content first).
        """
        # Sort by priority descending
        all_items = sorted(self._queue, key=lambda x: x.priority, reverse=True)

        # Filter by reviewer expertise
        if reviewer_expertise == "japanese":
            preferred = [c for c in all_items if c.language == "ja"]
            fallback = [c for c in all_items if c.language != "ja"]
            ordered = preferred + fallback
        elif reviewer_expertise == "senior":
            # Senior gets edge cases: new intents and high disagreement
            preferred = [
                c for c in all_items
                if c.selection_reason in ("new_intent", "high_disagreement")
            ]
            fallback = [c for c in all_items if c not in preferred]
            ordered = preferred + fallback
        else:
            ordered = all_items

        batch = ordered[:batch_size]

        # Remove from queue
        for item in batch:
            if item in self._queue:
                self._queue.remove(item)
        heapq.heapify(self._queue)

        return batch

    @property
    def queue_size(self) -> int:
        return len(self._queue)

4. Human Knowledge Injection

4.1 Curator Knowledge Flow

graph TD
    subgraph "Curator Inputs"
        GT[Genre Taxonomy<br/>Shonen, Seinen, Josei...]
        AR[Author Relationships<br/>Ohba + Obata = Death Note]
        CC[Cultural Context<br/>JP holidays, manga events]
        SP[Staff Picks<br/>Curated recommendations]
        GL[Glossary<br/>Manga-specific terms]
    end

    subgraph "Injection Points"
        GT --> SYS[System Prompt<br/>Genre classification context]
        AR --> RAG[RAG Knowledge Base<br/>Author/title metadata]
        CC --> DYN[Dynamic Prompt<br/>Seasonal context injection]
        SP --> OVR[Override Layer<br/>Staff picks supersede AI recs]
        GL --> EMB[Embedding Index<br/>Domain-specific vocabulary]
    end

    subgraph "FM Pipeline"
        SYS --> FM[Bedrock Claude 3]
        RAG --> FM
        DYN --> FM
        OVR --> POST[Post-Processing<br/>Merge overrides with AI]
        FM --> POST
        EMB --> OS[OpenSearch<br/>Retrieval]
        OS --> FM
    end

    style OVR fill:#e76f51,stroke:#f4a261,color:#fff
    style FM fill:#264653,stroke:#2a9d8f,color:#fff

4.2 Knowledge Injection Comparison

Injection Method Latency Impact Update Frequency Scope Best For
System prompt 0ms (pre-loaded) Weekly All queries of a type Genre taxonomy, tone guidelines
Dynamic prompt +5ms (Redis lookup) Daily/hourly Contextual queries Seasonal events, promotions
RAG knowledge base +20ms (vector search) Daily batch Factual queries Author data, release dates
Override layer +2ms (Redis lookup) Real-time Specific responses Staff picks, corrections
Embedding index +8ms (re-index cost) Weekly batch Search quality Domain vocabulary
"""
MangaAssist Curator Knowledge Injection System.

Manages the flow of human expert knowledge into the FM pipeline
through multiple injection points: system prompts, dynamic context,
RAG knowledge base, and real-time overrides.
"""

import json
import logging
import time
from dataclasses import dataclass, field
from enum import Enum
from typing import Optional

import boto3
import redis

logger = logging.getLogger(__name__)


class InjectionPoint(str, Enum):
    SYSTEM_PROMPT = "system_prompt"
    DYNAMIC_PROMPT = "dynamic_prompt"
    RAG_KNOWLEDGE = "rag_knowledge"
    OVERRIDE = "override"
    EMBEDDING_INDEX = "embedding_index"


@dataclass
class KnowledgeEntry:
    """A single piece of curator-injected knowledge."""
    entry_id: str
    content: str
    category: str  # "genre_taxonomy", "author_info", "cultural_context", etc.
    injection_point: str
    curator_id: str
    language: str = "en"
    priority: int = 0  # Higher = more important
    expires_at: Optional[int] = None  # Unix timestamp, None = permanent
    created_at: int = field(default_factory=lambda: int(time.time()))


class CuratorKnowledgeManager:
    """
    Manages curator knowledge entries across injection points.

    Knowledge flows:
    1. Curator creates/updates entry via admin API
    2. Entry stored in DynamoDB with injection_point classification
    3. Injection workers push to appropriate target (Redis, OpenSearch, S3)
    4. FM pipeline reads from targets at query time
    """

    def __init__(
        self,
        dynamodb_resource=None,
        redis_client: Optional[redis.Redis] = None,
        table_name: str = "mangaassist_curator_knowledge",
    ):
        self.dynamodb = dynamodb_resource or boto3.resource(
            "dynamodb", region_name="ap-northeast-1"
        )
        self.table = self.dynamodb.Table(table_name)
        self.redis = redis_client

    def add_knowledge(self, entry: KnowledgeEntry) -> None:
        """Add a curator knowledge entry."""
        item = {
            "entry_id": entry.entry_id,
            "content": entry.content,
            "category": entry.category,
            "injection_point": entry.injection_point,
            "curator_id": entry.curator_id,
            "language": entry.language,
            "priority": entry.priority,
            "created_at": entry.created_at,
        }
        if entry.expires_at:
            item["expires_at"] = entry.expires_at
            item["ttl"] = entry.expires_at  # DynamoDB TTL

        self.table.put_item(Item=item)

        # Real-time injection for override and dynamic_prompt types
        if entry.injection_point in (
            InjectionPoint.OVERRIDE, InjectionPoint.DYNAMIC_PROMPT
        ):
            self._push_to_redis(entry)

        logger.info(
            "Knowledge added: id=%s category=%s injection=%s",
            entry.entry_id, entry.category, entry.injection_point,
        )

    def get_dynamic_context(
        self, intent: str, language: str = "en"
    ) -> list[str]:
        """
        Retrieve dynamic prompt context for a given intent and language.
        Called at query time to enrich the FM prompt.
        """
        if not self.redis:
            return []

        key = f"curator:dynamic:{intent}:{language}"
        entries = self.redis.lrange(key, 0, 5)  # Max 5 context entries

        return [e.decode() for e in entries] if entries else []

    def get_overrides(
        self, query_text: str, intent: str
    ) -> Optional[str]:
        """
        Check if a curator override exists for this query/intent.
        Overrides completely replace the AI response.
        """
        if not self.redis:
            return None

        # Check intent-level override
        key = f"curator:override:{intent}"
        override = self.redis.get(key)
        if override:
            return override.decode()

        return None

    def build_system_prompt_context(
        self, intent: str, language: str = "en"
    ) -> str:
        """
        Build system prompt enrichment from curator knowledge.
        Called during prompt template assembly (cached for 1 hour).
        """
        response = self.table.query(
            IndexName="injection_point-category-index",
            KeyConditionExpression=(
                "injection_point = :ip"
            ),
            ExpressionAttributeValues={
                ":ip": InjectionPoint.SYSTEM_PROMPT,
            },
            Limit=20,
        )

        entries = response.get("Items", [])
        relevant = [
            e for e in entries
            if e.get("language", "en") == language
        ]

        if not relevant:
            return ""

        # Sort by priority descending
        relevant.sort(key=lambda x: x.get("priority", 0), reverse=True)

        context_parts = [e["content"] for e in relevant[:10]]
        return "\n".join(context_parts)

    def _push_to_redis(self, entry: KnowledgeEntry) -> None:
        """Push real-time entries to Redis for fast access."""
        if not self.redis:
            return

        if entry.injection_point == InjectionPoint.OVERRIDE:
            key = f"curator:override:{entry.category}"
            self.redis.set(key, entry.content)
            if entry.expires_at:
                ttl = entry.expires_at - int(time.time())
                if ttl > 0:
                    self.redis.expire(key, ttl)

        elif entry.injection_point == InjectionPoint.DYNAMIC_PROMPT:
            key = f"curator:dynamic:{entry.category}:{entry.language}"
            self.redis.lpush(key, entry.content)
            self.redis.ltrim(key, 0, 9)  # Keep max 10 entries

5. Curator Override Mechanisms

5.1 Override Priority Hierarchy

graph TD
    subgraph "Override Levels (highest to lowest priority)"
        E[Emergency Override<br/>Kill-switch, safety blocks<br/>Priority: 100]
        S[Staff Pick Override<br/>Curator-curated responses<br/>Priority: 80]
        C[Correction Override<br/>Verified ground truth<br/>Priority: 60]
        P[Prompt Override<br/>System prompt modifications<br/>Priority: 40]
        D[Default<br/>AI-generated response<br/>Priority: 0]
    end

    E --> |supersedes| S --> |supersedes| C --> |supersedes| P --> |supersedes| D

    style E fill:#e76f51,stroke:#f4a261,color:#fff
    style S fill:#264653,stroke:#2a9d8f,color:#fff

5.2 Override Implementation

"""
MangaAssist Curator Override Engine.

Implements a priority-based override system where human curators
can replace, modify, or block AI responses at multiple levels.
"""

import json
import logging
import time
from dataclasses import dataclass
from enum import Enum
from typing import Optional

import redis

logger = logging.getLogger(__name__)


class OverrideLevel(str, Enum):
    EMERGENCY = "emergency"        # Priority 100 — safety blocks
    STAFF_PICK = "staff_pick"      # Priority 80 — curated responses
    CORRECTION = "correction"      # Priority 60 — ground truth
    PROMPT = "prompt"              # Priority 40 — prompt mods
    NONE = "none"                  # Priority 0 — use AI response


OVERRIDE_PRIORITY = {
    OverrideLevel.EMERGENCY: 100,
    OverrideLevel.STAFF_PICK: 80,
    OverrideLevel.CORRECTION: 60,
    OverrideLevel.PROMPT: 40,
    OverrideLevel.NONE: 0,
}


@dataclass
class OverrideResult:
    """Result of checking for overrides."""
    has_override: bool
    level: OverrideLevel
    override_response: Optional[str] = None
    override_id: Optional[str] = None
    curator_id: Optional[str] = None
    reason: str = ""


class CuratorOverrideEngine:
    """
    Checks for curator overrides before returning AI responses.

    Override matching order:
    1. Emergency blocks (query content matches blocked patterns)
    2. Staff picks (exact intent + entity match)
    3. Corrections (response_id match from feedback)
    4. Prompt overrides (intent-level prompt modifications)
    """

    def __init__(self, redis_client: redis.Redis):
        self.redis = redis_client

    def check_overrides(
        self,
        query_text: str,
        intent: str,
        entities: list[str],
        response_id: str = "",
    ) -> OverrideResult:
        """Check all override levels and return the highest priority match."""
        # Level 1: Emergency blocks
        emergency = self._check_emergency(query_text)
        if emergency:
            return emergency

        # Level 2: Staff picks
        staff_pick = self._check_staff_picks(intent, entities)
        if staff_pick:
            return staff_pick

        # Level 3: Corrections
        correction = self._check_corrections(response_id, intent)
        if correction:
            return correction

        # Level 4: Prompt overrides (return modification, not replacement)
        prompt_override = self._check_prompt_overrides(intent)
        if prompt_override:
            return prompt_override

        return OverrideResult(
            has_override=False,
            level=OverrideLevel.NONE,
        )

    def _check_emergency(self, query_text: str) -> Optional[OverrideResult]:
        """Check for emergency content blocks."""
        blocked_patterns = self.redis.smembers("override:emergency:blocked_patterns")
        for pattern in blocked_patterns:
            if pattern.decode().lower() in query_text.lower():
                response = self.redis.get("override:emergency:response")
                return OverrideResult(
                    has_override=True,
                    level=OverrideLevel.EMERGENCY,
                    override_response=(
                        response.decode() if response
                        else "I'm sorry, I can't help with that request."
                    ),
                    reason=f"Emergency block: matched pattern",
                )
        return None

    def _check_staff_picks(
        self, intent: str, entities: list[str]
    ) -> Optional[OverrideResult]:
        """Check for curator staff pick responses."""
        for entity in entities:
            key = f"override:staff_pick:{intent}:{entity.lower()}"
            pick_data = self.redis.get(key)
            if pick_data:
                pick = json.loads(pick_data)
                return OverrideResult(
                    has_override=True,
                    level=OverrideLevel.STAFF_PICK,
                    override_response=pick["response"],
                    override_id=pick.get("override_id"),
                    curator_id=pick.get("curator_id"),
                    reason=f"Staff pick for {intent}:{entity}",
                )
        return None

    def _check_corrections(
        self, response_id: str, intent: str
    ) -> Optional[OverrideResult]:
        """Check for curator corrections to previous responses."""
        if not response_id:
            return None

        key = f"override:correction:{response_id}"
        correction = self.redis.get(key)
        if correction:
            data = json.loads(correction)
            return OverrideResult(
                has_override=True,
                level=OverrideLevel.CORRECTION,
                override_response=data["corrected_response"],
                override_id=data.get("override_id"),
                curator_id=data.get("curator_id"),
                reason="Curator correction for previous response",
            )
        return None

    def _check_prompt_overrides(
        self, intent: str
    ) -> Optional[OverrideResult]:
        """Check for intent-level prompt modifications."""
        key = f"override:prompt:{intent}"
        prompt_mod = self.redis.get(key)
        if prompt_mod:
            return OverrideResult(
                has_override=True,
                level=OverrideLevel.PROMPT,
                override_response=prompt_mod.decode(),
                reason=f"Prompt override for intent: {intent}",
            )
        return None

    def set_staff_pick(
        self,
        intent: str,
        entity: str,
        response: str,
        curator_id: str,
        expires_days: int = 30,
    ) -> None:
        """Set a curator staff pick override."""
        key = f"override:staff_pick:{intent}:{entity.lower()}"
        data = {
            "response": response,
            "curator_id": curator_id,
            "override_id": f"sp:{intent}:{entity}:{int(time.time())}",
            "created_at": int(time.time()),
        }
        self.redis.set(key, json.dumps(data))
        self.redis.expire(key, expires_days * 86400)

        logger.info(
            "Staff pick set: intent=%s entity=%s curator=%s",
            intent, entity, curator_id,
        )

6. Feedback-Driven Prompt Refinement

6.1 Prompt Refinement Pipeline

sequenceDiagram
    participant FB as Feedback Store
    participant SEL as Batch Selector
    participant EVAL as Evaluator (Sonnet)
    participant REF as Prompt Refiner
    participant S3 as S3 Prompt Store
    participant PROD as Production

    Note over FB,PROD: Weekly Batch Process (Step Functions)

    FB->>SEL: Get responses with<br/>composite_score < 0.70
    SEL->>SEL: Sample 200 low-quality<br/>responses by intent

    loop For each intent category
        SEL->>EVAL: Analyze failure patterns<br/>in low-quality responses
        EVAL->>EVAL: Identify common issues:<br/>tone, accuracy, relevance
        EVAL->>REF: Failure pattern report
        REF->>REF: Generate improved<br/>system prompt variant
        REF->>EVAL: A/B test variant on<br/>held-out evaluation set
        EVAL-->>REF: Quality improvement: +8%
    end

    REF->>S3: Store approved prompt<br/>with version tag
    S3->>PROD: Deploy via feature flag<br/>gradual rollout (10% -> 50% -> 100%)

6.2 Prompt Refinement Implementation

"""
MangaAssist Feedback-Driven Prompt Refinement.

Analyzes patterns in low-quality responses and generates
improved system prompt variants. Runs as a weekly batch
process via Step Functions.
"""

import json
import logging
from dataclasses import dataclass

import boto3

logger = logging.getLogger(__name__)


@dataclass
class FailurePattern:
    """A common failure pattern identified in low-quality responses."""
    pattern_type: str  # "tone", "accuracy", "relevance", "completeness"
    description: str
    example_count: int
    example_queries: list[str]


@dataclass
class PromptVariant:
    """A new system prompt variant generated from failure analysis."""
    intent: str
    version: str
    prompt_text: str
    changes_summary: str
    expected_quality_gain: float


class PromptRefiner:
    """
    Generates improved system prompts based on feedback analysis.
    Uses Sonnet as both the analyzer and prompt generator.
    """

    def __init__(self, bedrock_client=None):
        self.bedrock = bedrock_client or boto3.client(
            "bedrock-runtime", region_name="ap-northeast-1"
        )

    def analyze_failures(
        self,
        low_quality_responses: list[dict],
        intent: str,
    ) -> list[FailurePattern]:
        """
        Analyze a batch of low-quality responses to identify patterns.

        low_quality_responses: [{"query": str, "response": str, "score": float, "feedback": str}, ...]
        """
        examples_text = "\n\n".join(
            f"Query: {r['query']}\nResponse: {r['response']}\n"
            f"Quality Score: {r['score']}\nFeedback: {r.get('feedback', 'None')}"
            for r in low_quality_responses[:20]
        )

        prompt = (
            "Analyze these low-quality chatbot responses for a manga store.\n"
            f"All are from the '{intent}' intent category.\n\n"
            f"{examples_text}\n\n"
            "Identify the top 3 common failure patterns. For each pattern, specify:\n"
            "1. pattern_type: one of [tone, accuracy, relevance, completeness, language]\n"
            "2. description: what is going wrong\n"
            "3. example_count: how many of the responses show this pattern\n\n"
            "Respond with JSON array."
        )

        body = json.dumps({
            "anthropic_version": "bedrock-2023-05-31",
            "max_tokens": 1000,
            "temperature": 0.1,
            "messages": [{"role": "user", "content": prompt}],
        })

        response = self.bedrock.invoke_model(
            modelId="anthropic.claude-3-sonnet-20240229-v1:0",
            body=body,
            contentType="application/json",
            accept="application/json",
        )

        result = json.loads(response["body"].read())
        text = result["content"][0]["text"]

        try:
            patterns_raw = json.loads(text)
            return [
                FailurePattern(
                    pattern_type=p.get("pattern_type", "unknown"),
                    description=p.get("description", ""),
                    example_count=p.get("example_count", 0),
                    example_queries=[],
                )
                for p in patterns_raw
            ]
        except json.JSONDecodeError:
            logger.warning("Failed to parse failure patterns: %s", text[:200])
            return []

    def generate_improved_prompt(
        self,
        current_prompt: str,
        intent: str,
        failure_patterns: list[FailurePattern],
    ) -> PromptVariant:
        """
        Generate an improved system prompt that addresses
        the identified failure patterns.
        """
        patterns_text = "\n".join(
            f"- {p.pattern_type}: {p.description} ({p.example_count} examples)"
            for p in failure_patterns
        )

        prompt = (
            "You are improving a system prompt for a manga store chatbot.\n\n"
            f"Current system prompt for '{intent}' queries:\n"
            f"---\n{current_prompt}\n---\n\n"
            f"Identified failure patterns:\n{patterns_text}\n\n"
            "Generate an improved system prompt that addresses these failures.\n"
            "Keep the core behavior intact. Only add or modify instructions "
            "that directly address the failure patterns.\n\n"
            "Respond with JSON:\n"
            '{{"improved_prompt": "...", "changes_summary": "...", '
            '"expected_quality_gain_pct": N}}'
        )

        body = json.dumps({
            "anthropic_version": "bedrock-2023-05-31",
            "max_tokens": 2000,
            "temperature": 0.2,
            "messages": [{"role": "user", "content": prompt}],
        })

        response = self.bedrock.invoke_model(
            modelId="anthropic.claude-3-sonnet-20240229-v1:0",
            body=body,
            contentType="application/json",
            accept="application/json",
        )

        result = json.loads(response["body"].read())
        text = result["content"][0]["text"]

        try:
            parsed = json.loads(text)
            return PromptVariant(
                intent=intent,
                version=f"v{int(time.time())}",
                prompt_text=parsed["improved_prompt"],
                changes_summary=parsed.get("changes_summary", ""),
                expected_quality_gain=parsed.get("expected_quality_gain_pct", 0.0),
            )
        except (json.JSONDecodeError, KeyError):
            logger.error("Failed to parse improved prompt: %s", text[:200])
            return PromptVariant(
                intent=intent,
                version="failed",
                prompt_text=current_prompt,
                changes_summary="Generation failed — using current prompt",
                expected_quality_gain=0.0,
            )

7. Feedback Loop Bias Detection

7.1 Common Feedback Biases

graph TD
    subgraph "Feedback Biases"
        PB[Positivity Bias<br/>Customers more likely<br/>to rate good experiences]
        NB[Negativity Bias<br/>Angry customers over-report<br/>via corrections]
        SB[Selection Bias<br/>Only engaged users<br/>give feedback]
        AB[Anchoring Bias<br/>First response sets<br/>quality expectation]
    end

    subgraph "Mitigation"
        PB --> M1[Balance with random<br/>sampling of non-feedback<br/>responses]
        NB --> M2[Weight corrections by<br/>curator verification status]
        SB --> M3[Track demographic<br/>coverage of feedback<br/>providers]
        AB --> M4[Randomize A/B test<br/>exposure order]
    end

    style PB fill:#e76f51,stroke:#f4a261,color:#fff
    style NB fill:#e76f51,stroke:#f4a261,color:#fff
Bias Type Impact on MangaAssist Detection Method Mitigation
Positivity bias Quality appears higher than reality Compare feedback vs. curator ratings Include random curator sampling
Negativity bias Overcorrect toward conservative responses Track correction rate vs. actual error rate Verify corrections before applying
Selection bias Feedback skews toward power users Monitor feedback coverage by user segment Stratified sampling for evaluation
Popularity bias Popular manga get more feedback Track feedback per-title distribution Normalize by query volume
Recency bias Recent feedback dominates weight updates Check EMA half-life vs. feedback frequency Use longer EMA decay for stable intents

8. Comparison Table: Feedback Integration Methods

Method Latency to Effect Quality Improvement Risk Complexity Volume Needed
Cache invalidation Immediate Prevents repeat bad answers Low Low 1 negative signal
Weight adjustment (EMA) Real-time +3-5% per model Medium (drift) Medium 10+ signals
Prompt refinement Weekly batch +5-10% per intent Medium (regression) High 200+ signals
RAG knowledge update Daily batch +8-15% for factual queries Low Medium 50+ corrections
Override deployment Immediate 100% for targeted queries High (stale overrides) Low Curator decision
Model fine-tuning Monthly cycle +10-20% domain-wide High (catastrophic forgetting) Very high 10,000+ labels

Key Takeaways

  1. Multi-signal feedback is more reliable than any single source — combining thumbs up/down, curator ratings, text corrections, and implicit signals with source-reliability weighting produces a composite score that is 40% more predictive of true quality than any individual signal.

  2. Active learning makes human review 10x more efficient — by prioritizing low-confidence, high-disagreement, and negative-feedback responses, MangaAssist's 500 daily human reviews cover the same learning surface as 5,000 random reviews.

  3. Bayesian priors prevent overreaction to sparse feedback — new responses start with a prior of 0.75 (assume OK until proven otherwise), requiring at least 3 signals before taking action, which prevents noisy single-vote feedback from causing unnecessary cache invalidation.

  4. Curator overrides must have an expiration mechanism — staff picks and corrections that never expire become stale as the product catalog changes; all overrides default to 30-day expiry with curator opt-in for extension.

  5. Feedback loop bias is the hidden danger — positivity bias makes quality look better than it is, while negativity bias in corrections can push the model toward overly conservative responses; random curator sampling and stratified evaluation counteract both.

  6. Prompt refinement is the highest-ROI feedback application — weekly batch analysis of low-quality responses with Sonnet-generated prompt improvements yields 5-10% quality gain per intent category, compounding over months.

  7. Knowledge injection has four speed tiers — emergency overrides (seconds), dynamic context (minutes), RAG updates (hours), system prompt refinement (days); each tier trades off speed for breadth of impact.