LOCAL PREVIEW View on GitHub

Scenarios & Runbooks — Advanced GenAI Applications

MangaAssist context: JP Manga store chatbot on AWS — Bedrock Claude 3 (Sonnet at $3/$15 per 1M tokens input/output, Haiku at $0.25/$1.25), OpenSearch Serverless (vector store), DynamoDB (sessions/products), ECS Fargate (orchestrator), API Gateway WebSocket, ElastiCache Redis. Target: useful answer in under 3 seconds, 1M messages/day scale.

Skill Mapping

Attribute Detail
Domain 2 — Implementation & Integration of GenAI Applications
Task 2.5 — Application Integration Patterns
Skill 2.5.5 — Advanced GenAI Applications
Focus Troubleshooting agent chains, supervisor coordination, execution limits, race conditions, reflection loops
MangaAssist Scope Five production scenarios with detection, root cause analysis, resolution code, and prevention strategies

Mind Map

mindmap
  root((Advanced GenAI<br/>Scenarios))
    Scenario 1
      Chain Break from Unexpected Format
        FM Returns Markdown Instead of JSON
        JSON Parse Failure Mid-Chain
        Downstream Steps Receive None
        Chain Halts or Produces Garbage
    Scenario 2
      Supervisor Losing Coordination
        Agent Timeout Not Propagated
        Orphaned Agent Tasks
        Duplicate Responses
        State Desynchronization
    Scenario 3
      Execution History Limit
        Context Window Overflow
        DynamoDB Item Size Exceeded
        Memory Pressure on ECS
        Truncated Conversation
    Scenario 4
      Parallel Chain Race Condition
        Shared State Mutation
        Non-Deterministic Result Order
        Partial Failure Masking
        Resource Contention
    Scenario 5
      Reflection Loop Not Converging
        Self-Correction Cycles Forever
        Token Budget Explosion
        Quality Score Oscillates
        Timeout Without Response

Scenario 1 — Chain Breaks from Unexpected FM Output Format

Problem Statement

The manga search chain expects the intent classification step to return valid JSON ({"intent": "search", "search_terms": ["鬼滅の刃"]}), but Claude occasionally returns the JSON wrapped in a markdown code block (```json...```) or adds explanatory text before the JSON. The downstream json.loads() call fails, causing the entire chain to halt with no response sent to the user.

Detection

graph TB
    FM[Bedrock Claude Response<br/>Returns markdown-wrapped JSON] --> JP[JSON Parse<br/>json.loads fails]
    JP --> EX[Exception<br/>JSONDecodeError]
    EX --> CH[Chain Halts<br/>No downstream steps execute]
    CH --> WS[WebSocket<br/>Empty response or timeout]
    WS --> USR[User<br/>Sees loading spinner forever]

    EX --> CW[CloudWatch<br/>Error rate spike on<br/>classify_intent step]
    CW --> INV[Investigation<br/>Sample failed payloads]
    INV --> RC[Root Cause<br/>FM wraps JSON in code blocks<br/>or adds preamble text]

    style EX fill:#ff6b6b,color:#fff
    style RC fill:#ffd43b,color:#333

Root Cause

Foundation models are non-deterministic in output formatting. Even with explicit instructions to "return only JSON," Claude sometimes wraps the output in markdown code fences, adds a brief preamble like "Here is the classification:", or includes trailing explanation text. The chain's json.loads() call on raw FM output fails because it cannot parse these decorations.

Resolution

"""
Resolution: Robust FM output parser with multiple extraction strategies.
Handles markdown blocks, preamble text, and malformed JSON gracefully.
"""

import json
import logging
import re
from typing import Any, Optional

logger = logging.getLogger(__name__)


class FMOutputParseError(Exception):
    """Raised when all parsing strategies fail."""
    pass


class RobustFMOutputParser:
    """
    Extracts structured data from non-deterministic FM outputs.
    Tries multiple strategies in order of specificity.
    """

    @staticmethod
    def extract_json(
        raw_output: str,
        default: Any = None,
        strict: bool = False,
    ) -> Any:
        """
        Extract JSON from FM output using multiple strategies.

        Strategies (in order):
        1. Direct parse (output is pure JSON)
        2. Markdown code block extraction (```json ... ```)
        3. Generic code block extraction (``` ... ```)
        4. First JSON object/array extraction via regex
        5. Return default if all fail
        """
        if not raw_output or not raw_output.strip():
            if strict:
                raise FMOutputParseError("Empty FM output")
            return default

        text = raw_output.strip()

        # Strategy 1: Direct JSON parse
        try:
            return json.loads(text)
        except json.JSONDecodeError:
            pass

        # Strategy 2: Extract from ```json ... ``` blocks
        json_blocks = re.findall(r"```json\s*\n?(.*?)\n?\s*```", text, re.DOTALL)
        for block in json_blocks:
            try:
                return json.loads(block.strip())
            except json.JSONDecodeError:
                continue

        # Strategy 3: Extract from generic ``` ... ``` blocks
        code_blocks = re.findall(r"```\s*\n?(.*?)\n?\s*```", text, re.DOTALL)
        for block in code_blocks:
            try:
                return json.loads(block.strip())
            except json.JSONDecodeError:
                continue

        # Strategy 4: Find first JSON object or array via regex
        # Match outermost { ... } or [ ... ]
        json_patterns = [
            re.compile(r"\{[^{}]*(?:\{[^{}]*\}[^{}]*)*\}", re.DOTALL),
            re.compile(r"\[.*?\]", re.DOTALL),
        ]
        for pattern in json_patterns:
            matches = pattern.findall(text)
            for match in matches:
                try:
                    return json.loads(match)
                except json.JSONDecodeError:
                    continue

        # Strategy 5: Try to fix common JSON issues
        cleaned = text
        # Remove trailing commas before } or ]
        cleaned = re.sub(r",\s*([}\]])", r"\1", cleaned)
        # Fix single quotes to double quotes
        cleaned = cleaned.replace("'", '"')
        try:
            return json.loads(cleaned)
        except json.JSONDecodeError:
            pass

        if strict:
            raise FMOutputParseError(
                f"All JSON extraction strategies failed. Raw output: {text[:200]}"
            )

        logger.warning(
            "All JSON extraction strategies failed, returning default. "
            "Raw output: %s...", text[:100],
        )
        return default

    @staticmethod
    def extract_text(raw_output: str) -> str:
        """Extract clean text from FM output, stripping code blocks."""
        text = raw_output.strip()
        # Remove code blocks
        text = re.sub(r"```\w*\n?.*?\n?```", "", text, flags=re.DOTALL)
        # Remove leading/trailing whitespace from each line
        lines = [line.strip() for line in text.split("\n")]
        return "\n".join(line for line in lines if line)

    @staticmethod
    def extract_list(raw_output: str, separator: str = "\n") -> list[str]:
        """Extract a list of items from FM output."""
        text = RobustFMOutputParser.extract_text(raw_output)
        # Try numbered list extraction
        numbered = re.findall(r"^\d+[\.\)]\s*(.+)$", text, re.MULTILINE)
        if numbered:
            return numbered
        # Try bullet list extraction
        bulleted = re.findall(r"^[-*•]\s*(.+)$", text, re.MULTILINE)
        if bulleted:
            return bulleted
        # Fall back to line splitting
        return [line for line in text.split(separator) if line.strip()]


def safe_chain_step_parse(
    step_output: Any,
    expected_type: str = "json",
    default: Any = None,
) -> Any:
    """
    Safe parser for use between chain steps.
    Wraps RobustFMOutputParser with step-specific error handling.
    """
    if step_output is None:
        return default

    raw = str(step_output)

    if expected_type == "json":
        return RobustFMOutputParser.extract_json(raw, default=default)
    elif expected_type == "text":
        return RobustFMOutputParser.extract_text(raw)
    elif expected_type == "list":
        return RobustFMOutputParser.extract_list(raw)
    else:
        return raw

Prevention

  1. Always use RobustFMOutputParser.extract_json() instead of raw json.loads() between chain steps.
  2. Add format enforcement in prompts: "Return ONLY a JSON object. No explanation, no code fences, no markdown."
  3. Add a transform step after every prompt step that normalizes the output format before downstream consumption.
  4. Set up CloudWatch alerts on chain step error rates — a spike in parse errors indicates a model behavior change.
  5. Maintain a fallback default for every JSON extraction so the chain can continue with degraded quality rather than halting.

Scenario 2 — Supervisor Agent Loses Coordination with Agent Squad

Problem Statement

The Supervisor agent delegates a search task to the Search Agent and a recommendation task to the Recommendation Agent in parallel. The Search Agent responds in 800ms, but the Recommendation Agent times out at 2500ms. The Supervisor does not properly handle the partial results: it either waits indefinitely for the timed-out agent, sends duplicate responses when the late agent eventually completes, or drops the successful result entirely.

Detection

graph TB
    SUP[Supervisor<br/>Dispatches parallel tasks] --> SA[Search Agent<br/>Responds in 800ms]
    SUP --> RA[Recommendation Agent<br/>Times out at 2500ms]

    SA --> SUP_WAIT[Supervisor Waiting<br/>For all agents to respond]
    RA -->|Timeout| SUP_WAIT

    SUP_WAIT --> P1[Problem 1<br/>Supervisor hangs<br/>waiting for RA]
    SUP_WAIT --> P2[Problem 2<br/>RA responds late<br/>duplicate delivery]
    SUP_WAIT --> P3[Problem 3<br/>Supervisor drops SA result<br/>sends error to user]

    P1 --> CW[CloudWatch<br/>Supervisor latency spike]
    P2 --> WS[WebSocket<br/>User sees two responses]
    P3 --> USR[User<br/>Gets error despite<br/>search working fine]

    style P1 fill:#ff6b6b,color:#fff
    style P2 fill:#ff6b6b,color:#fff
    style P3 fill:#ff6b6b,color:#fff

Root Cause

The Supervisor used asyncio.gather() without return_exceptions=True and without an explicit timeout wrapper. When one agent timed out, the gather raised an exception that cancelled the other coroutine or caused the entire result set to be discarded. There was no mechanism to accept partial results from agents that completed successfully.

Resolution

"""
Resolution: Robust parallel agent coordination with partial result handling.
Supervisor gracefully handles agent timeouts and delivers partial results.
"""

import asyncio
import logging
import time
from dataclasses import dataclass, field
from typing import Any

logger = logging.getLogger(__name__)


@dataclass
class AgentTaskResult:
    """Result from a delegated agent task."""
    agent_id: str
    success: bool
    output: Any = None
    error: str | None = None
    latency_ms: float = 0.0
    timed_out: bool = False


class RobustSupervisorCoordinator:
    """
    Coordinates parallel agent execution with graceful timeout handling.
    Delivers partial results when some agents time out.
    """

    def __init__(
        self,
        agent_registry: dict[str, Any],
        total_budget_ms: float = 2500.0,
        per_agent_timeout_ms: float = 2000.0,
        min_results_required: int = 1,
    ):
        self.agent_registry = agent_registry
        self.total_budget_ms = total_budget_ms
        self.per_agent_timeout_ms = per_agent_timeout_ms
        self.min_results_required = min_results_required
        self._delivered: set[str] = set()  # Track delivered correlation IDs

    async def dispatch_parallel(
        self,
        tasks: list[dict[str, Any]],
        context: Any,
    ) -> list[AgentTaskResult]:
        """
        Dispatch tasks to agents in parallel with robust timeout handling.

        Returns all results (including partial) — never blocks indefinitely.
        """
        if not tasks:
            return []

        results: list[AgentTaskResult] = []
        start_time = time.monotonic()

        # Create wrapped tasks with individual timeouts
        async_tasks = []
        for task_spec in tasks:
            agent_id = task_spec["agent_id"]
            agent = self.agent_registry.get(agent_id)
            if agent is None:
                results.append(AgentTaskResult(
                    agent_id=agent_id,
                    success=False,
                    error=f"Agent '{agent_id}' not found",
                ))
                continue
            async_tasks.append(
                self._execute_with_timeout(
                    agent_id=agent_id,
                    agent=agent,
                    message=task_spec.get("message", ""),
                    context=context,
                    timeout_ms=min(self.per_agent_timeout_ms, self.total_budget_ms),
                )
            )

        # Execute all tasks with a global timeout
        global_timeout = self.total_budget_ms / 1000.0
        try:
            completed = await asyncio.wait_for(
                asyncio.gather(*async_tasks, return_exceptions=True),
                timeout=global_timeout,
            )
            for result in completed:
                if isinstance(result, Exception):
                    results.append(AgentTaskResult(
                        agent_id="unknown",
                        success=False,
                        error=str(result),
                    ))
                elif isinstance(result, AgentTaskResult):
                    results.append(result)
        except asyncio.TimeoutError:
            logger.warning(
                "Global timeout reached (%.0fms). Collecting partial results.",
                self.total_budget_ms,
            )
            # Any tasks that completed before the global timeout are in results
            # The rest are timed out

        total_elapsed = (time.monotonic() - start_time) * 1000
        logger.info(
            "Parallel dispatch completed: %d/%d succeeded in %.0fms",
            sum(1 for r in results if r.success),
            len(tasks),
            total_elapsed,
        )

        return results

    async def _execute_with_timeout(
        self,
        agent_id: str,
        agent: Any,
        message: str,
        context: Any,
        timeout_ms: float,
    ) -> AgentTaskResult:
        """Execute a single agent with timeout protection."""
        start = time.monotonic()
        try:
            result = await asyncio.wait_for(
                agent.run(message, context),
                timeout=timeout_ms / 1000.0,
            )
            latency = (time.monotonic() - start) * 1000
            return AgentTaskResult(
                agent_id=agent_id,
                success=True,
                output=result.content if hasattr(result, "content") else str(result),
                latency_ms=round(latency, 2),
            )
        except asyncio.TimeoutError:
            latency = (time.monotonic() - start) * 1000
            logger.warning("Agent '%s' timed out after %.0fms", agent_id, latency)
            return AgentTaskResult(
                agent_id=agent_id,
                success=False,
                timed_out=True,
                latency_ms=round(latency, 2),
                error=f"Timed out after {timeout_ms}ms",
            )
        except Exception as e:
            latency = (time.monotonic() - start) * 1000
            logger.error("Agent '%s' failed: %s", agent_id, e)
            return AgentTaskResult(
                agent_id=agent_id,
                success=False,
                error=str(e),
                latency_ms=round(latency, 2),
            )

    def aggregate_results(
        self,
        results: list[AgentTaskResult],
        correlation_id: str,
    ) -> dict[str, Any]:
        """
        Aggregate agent results into a unified response.
        Handles partial results gracefully — never duplicate delivers.
        """
        # Prevent duplicate delivery
        if correlation_id in self._delivered:
            logger.warning("Duplicate delivery prevented for %s", correlation_id)
            return {"error": "Already delivered", "correlation_id": correlation_id}
        self._delivered.add(correlation_id)

        successful = [r for r in results if r.success]
        timed_out = [r for r in results if r.timed_out]
        failed = [r for r in results if not r.success and not r.timed_out]

        if not successful:
            return {
                "status": "error",
                "message": "All agents failed or timed out",
                "details": [r.error for r in results],
            }

        # Build composite response from successful results
        response_parts = [r.output for r in successful if r.output]
        composite = "\n\n".join(str(part) for part in response_parts)

        # Add notes about timed-out agents
        if timed_out:
            agent_names = [r.agent_id for r in timed_out]
            composite += (
                f"\n\n(Note: Some information may be incomplete. "
                f"The {', '.join(agent_names)} service(s) are temporarily slow.)"
            )

        return {
            "status": "partial" if timed_out else "complete",
            "response": composite,
            "agents_succeeded": [r.agent_id for r in successful],
            "agents_timed_out": [r.agent_id for r in timed_out],
            "agents_failed": [r.agent_id for r in failed],
            "total_latency_ms": max((r.latency_ms for r in results), default=0),
        }

Prevention

  1. Always use return_exceptions=True in asyncio.gather() for parallel agent dispatch.
  2. Wrap every agent call in asyncio.wait_for() with per-agent timeouts.
  3. Track correlation IDs to prevent duplicate response delivery.
  4. Aggregate partial results — a response with 2 of 3 agents is better than no response.
  5. Log agent timeout rates per agent type to identify consistently slow specialists.

Scenario 3 — Execution History Exceeds Context Window Limit

Problem Statement

A user has a long conversation with MangaAssist (50+ turns) asking about various manga, comparing titles, and discussing recommendations. The accumulated conversation history exceeds Claude 3's context window when passed as conversation_history to the next Bedrock call, causing a ValidationException: Input is too long. Even before hitting the hard limit, the growing context inflates token costs and pushes latency beyond 3 seconds.

Detection

graph TB
    CONV[Long Conversation<br/>50+ turns accumulated] --> CTX[Context Assembly<br/>Full history → prompt]
    CTX --> TK[Token Count<br/>Exceeds model limit]
    TK --> ERR[Bedrock Error<br/>ValidationException<br/>Input is too long]
    ERR --> USR[User<br/>Chat breaks mid-conversation]

    CTX --> COST[Cost Escalation<br/>$0.01+ per message<br/>from huge input]
    COST --> LAT[Latency Increase<br/>4-5 seconds per response]
    LAT --> TO[Timeout<br/>3-second budget breached]

    ERR --> CW[CloudWatch<br/>ValidationException spike]
    COST --> BILL[Billing Alert<br/>Daily cost 3x expected]

    style ERR fill:#ff6b6b,color:#fff
    style COST fill:#ffd43b,color:#333

Root Cause

The conversation history was stored verbatim in DynamoDB and passed in full to every Bedrock call. No truncation, summarization, or sliding window was applied. At 50+ turns with manga descriptions, the input exceeded 100K tokens — well beyond the model's capacity and budget constraints.

Resolution

"""
Resolution: Conversation history manager with sliding window and summarization.
Prevents context overflow while maintaining conversation coherence.
"""

import json
import logging
from dataclasses import dataclass, field
from typing import Any

logger = logging.getLogger(__name__)


@dataclass
class ConversationWindow:
    """Manages a bounded conversation history window."""
    max_turns: int = 10
    max_tokens_estimate: int = 3000
    summary: str = ""
    recent_turns: list[dict[str, str]] = field(default_factory=list)
    total_turns: int = 0


class ConversationHistoryManager:
    """
    Manages conversation history with bounded context windows.
    Prevents context overflow by summarizing old turns and keeping recent ones.
    """

    CHARS_PER_TOKEN_ESTIMATE = 4  # Mixed JP/EN average

    def __init__(
        self,
        bedrock_client: Any,
        max_recent_turns: int = 10,
        max_context_tokens: int = 3000,
        summary_model: str = "anthropic.claude-3-haiku-20240307-v1:0",
    ):
        self.bedrock_client = bedrock_client
        self.max_recent_turns = max_recent_turns
        self.max_context_tokens = max_context_tokens
        self.summary_model = summary_model

    def _estimate_tokens(self, text: str) -> int:
        return max(1, len(text) // self.CHARS_PER_TOKEN_ESTIMATE)

    def _total_tokens(self, turns: list[dict[str, str]]) -> int:
        return sum(
            self._estimate_tokens(t.get("content", ""))
            for t in turns
        )

    async def prepare_context(
        self, full_history: list[dict[str, str]], existing_summary: str = ""
    ) -> ConversationWindow:
        """
        Prepare a bounded conversation context from full history.

        Strategy:
        1. Keep the last N turns as-is (recent context)
        2. Summarize older turns into a concise summary
        3. Ensure total tokens stay within budget
        """
        total_turns = len(full_history)

        if total_turns <= self.max_recent_turns:
            # No truncation needed
            return ConversationWindow(
                max_turns=self.max_recent_turns,
                max_tokens_estimate=self._total_tokens(full_history),
                summary="",
                recent_turns=full_history,
                total_turns=total_turns,
            )

        # Split into old and recent
        recent = full_history[-self.max_recent_turns:]
        old = full_history[:-self.max_recent_turns]

        # Check if recent turns alone exceed budget
        recent_tokens = self._total_tokens(recent)
        if recent_tokens > self.max_context_tokens:
            # Trim recent turns to fit
            while recent and self._total_tokens(recent) > self.max_context_tokens:
                recent.pop(0)

        # Summarize old turns
        summary = await self._summarize_turns(old, existing_summary)

        return ConversationWindow(
            max_turns=self.max_recent_turns,
            max_tokens_estimate=self._estimate_tokens(summary) + self._total_tokens(recent),
            summary=summary,
            recent_turns=recent,
            total_turns=total_turns,
        )

    async def _summarize_turns(
        self, turns: list[dict[str, str]], existing_summary: str
    ) -> str:
        """Summarize older conversation turns using Haiku."""
        if not turns:
            return existing_summary

        turns_text = "\n".join(
            f"{t['role']}: {t['content'][:200]}" for t in turns[-20:]
        )
        prompt = (
            "Summarize this manga store chat conversation in 2-3 sentences. "
            "Focus on: manga titles discussed, user preferences, and any pending actions.\n\n"
        )
        if existing_summary:
            prompt += f"Previous summary: {existing_summary}\n\n"
        prompt += f"New conversation:\n{turns_text}\n\nConcise summary:"

        response = await self.bedrock_client.invoke(
            model_id=self.summary_model,
            prompt=prompt,
            max_tokens=200,
        )
        return response.get("text", existing_summary)

    def build_messages(self, window: ConversationWindow) -> list[dict[str, str]]:
        """Build the messages array for Bedrock from a conversation window."""
        messages = []
        if window.summary:
            messages.append({
                "role": "user",
                "content": f"[Previous conversation summary: {window.summary}]",
            })
            messages.append({
                "role": "assistant",
                "content": "I understand the context from our earlier conversation. How can I help you now?",
            })
        messages.extend(window.recent_turns)
        return messages


# --- DynamoDB session size guard ---

MAX_DYNAMODB_ITEM_KB = 400  # DynamoDB 400KB item limit


def check_session_size(session_data: dict[str, Any]) -> dict[str, Any]:
    """Check if session data is approaching DynamoDB item size limit."""
    serialized = json.dumps(session_data, ensure_ascii=False)
    size_kb = len(serialized.encode("utf-8")) / 1024

    return {
        "size_kb": round(size_kb, 2),
        "limit_kb": MAX_DYNAMODB_ITEM_KB,
        "usage_pct": round(size_kb / MAX_DYNAMODB_ITEM_KB * 100, 1),
        "needs_compaction": size_kb > MAX_DYNAMODB_ITEM_KB * 0.75,
        "exceeds_limit": size_kb > MAX_DYNAMODB_ITEM_KB,
    }

Prevention

  1. Always use ConversationHistoryManager — never pass raw full history to Bedrock.
  2. Set max_recent_turns=10 as the default window — enough for context without overflow.
  3. Summarize old turns with Haiku (cheap and fast) before they fall off the window.
  4. Monitor DynamoDB item sizes — alert when session items exceed 75% of the 400KB limit.
  5. Implement session compaction — periodically compress old messages and store summaries.

Scenario 4 — Parallel Chain Race Condition on Shared State

Problem Statement

Two parallel chain steps (Search Agent and Recommendation Agent) both read and write to the shared ChainState.data dictionary. The Search Agent writes data["manga_results"] = [...] while the Recommendation Agent reads data["manga_results"] expecting it to contain the search results. Due to parallel execution, the Recommendation Agent reads before the Search Agent writes, getting None instead of results. Alternatively, both agents write to the same key, and the last write wins, silently dropping the first agent's output.

Detection

graph TB
    PAR[Parallel Execution<br/>Search ∥ Recommendation] --> SA_W[Search Agent<br/>Writes manga_results]
    PAR --> RA_R[Recommendation Agent<br/>Reads manga_results]

    SA_W -->|Write at t=800ms| STATE[Shared ChainState.data]
    RA_R -->|Read at t=200ms| STATE

    STATE --> RACE[Race Condition<br/>RA reads before SA writes]
    RACE --> P1[Problem<br/>RA gets None,<br/>generates generic recs]

    SA_W --> OW[Overwrite Risk<br/>Both write to same key]
    OW --> P2[Problem<br/>Last-write-wins<br/>drops SA output]

    RACE --> USR[User<br/>Gets irrelevant recommendations<br/>that ignore search context]

    style RACE fill:#ff6b6b,color:#fff
    style P2 fill:#ff6b6b,color:#fff

Root Cause

The ChainState was passed by reference to parallel steps. Both steps received the same mutable dictionary and operated on it concurrently. Python's asyncio is single-threaded but task switching between await points means one coroutine can read stale data that another coroutine hasn't written yet. Additionally, no namespacing prevented key collisions between agents writing to data.

Resolution

"""
Resolution: Immutable state management for parallel chain execution.
Each parallel step gets an isolated copy; results are merged safely.
"""

import asyncio
import copy
import logging
from dataclasses import dataclass, field
from typing import Any

logger = logging.getLogger(__name__)


@dataclass
class IsolatedStepState:
    """Isolated state snapshot for a single parallel step."""
    step_name: str
    input_data: dict[str, Any]   # Frozen copy of shared state at dispatch time
    output_data: dict[str, Any] = field(default_factory=dict)  # Step's own outputs

    def read(self, key: str, default: Any = None) -> Any:
        """Read from input data (frozen snapshot)."""
        return self.input_data.get(key, default)

    def write(self, key: str, value: Any) -> None:
        """Write to output data (step-local namespace)."""
        namespaced_key = f"{self.step_name}.{key}"
        self.output_data[namespaced_key] = value
        # Also write un-namespaced for convenience
        self.output_data[key] = value


class ParallelStateManager:
    """
    Manages state isolation and merging for parallel chain steps.
    Prevents race conditions by giving each step an immutable snapshot.
    """

    def __init__(self, merge_strategy: str = "namespace"):
        """
        Args:
            merge_strategy: "namespace" prefixes keys with step name,
                           "last_write_wins" allows overwrites (not recommended),
                           "collect" groups all values per key into lists.
        """
        self.merge_strategy = merge_strategy

    def create_isolated_states(
        self,
        shared_state_data: dict[str, Any],
        step_names: list[str],
    ) -> dict[str, IsolatedStepState]:
        """Create frozen state copies for each parallel step."""
        isolated = {}
        for name in step_names:
            isolated[name] = IsolatedStepState(
                step_name=name,
                input_data=copy.deepcopy(shared_state_data),
            )
        return isolated

    def merge_results(
        self,
        original_data: dict[str, Any],
        isolated_states: dict[str, IsolatedStepState],
    ) -> dict[str, Any]:
        """Merge results from parallel steps back into shared state."""
        merged = copy.deepcopy(original_data)

        if self.merge_strategy == "namespace":
            # Each step's outputs are namespaced: "step_name.key"
            for step_name, state in isolated_states.items():
                for key, value in state.output_data.items():
                    if "." not in key:
                        # Add namespaced version
                        merged[f"{step_name}.{key}"] = value
                    else:
                        merged[key] = value
                # Also add the step's complete output under its name
                merged[step_name] = state.output_data

        elif self.merge_strategy == "collect":
            # Collect all values per key into lists
            for state in isolated_states.values():
                for key, value in state.output_data.items():
                    if key not in merged:
                        merged[key] = [value]
                    elif isinstance(merged[key], list):
                        merged[key].append(value)
                    else:
                        merged[key] = [merged[key], value]

        else:  # last_write_wins
            for state in isolated_states.values():
                merged.update(state.output_data)

        return merged


async def execute_parallel_steps_safely(
    steps: list[dict[str, Any]],
    shared_data: dict[str, Any],
    state_manager: ParallelStateManager,
) -> dict[str, Any]:
    """
    Execute parallel steps with isolated state and safe merging.

    Each step receives a frozen copy of shared state.
    Results are merged back with namespacing to prevent collisions.
    """
    step_names = [s["name"] for s in steps]
    isolated_states = state_manager.create_isolated_states(shared_data, step_names)

    async def run_step(step_config: dict, iso_state: IsolatedStepState) -> None:
        handler = step_config.get("handler")
        if handler:
            result = await handler(iso_state.input_data)
            iso_state.write("result", result)

    tasks = [
        run_step(step, isolated_states[step["name"]])
        for step in steps
    ]
    await asyncio.gather(*tasks, return_exceptions=True)

    merged = state_manager.merge_results(shared_data, isolated_states)

    logger.info(
        "Parallel merge complete: %d steps, %d output keys",
        len(steps),
        len(merged) - len(shared_data),
    )
    return merged

Prevention

  1. Never share mutable state between parallel steps — always use copy.deepcopy() for isolation.
  2. Namespace all step outputs with the step name prefix to prevent key collisions.
  3. Use IsolatedStepState as the state interface for parallel steps — it enforces read-from-snapshot, write-to-local.
  4. Merge results explicitly after all parallel steps complete — never during execution.
  5. If one step depends on another's output, they cannot be parallel — use a sequential chain segment instead.

Scenario 5 — Reflection Loop Does Not Converge

Problem Statement

The Reflection Agent evaluates the Recommendation Agent's output, finds it has only 2 recommendations instead of the requested 3, and asks for regeneration. The regenerated output now has 3 recommendations but one title is in English only (missing Japanese). Reflection rejects again. The cycle repeats: each fix introduces a new issue, the quality score oscillates between 0.65 and 0.75 (never reaching the 0.8 threshold), and the 3-second budget is consumed entirely by reflection loops without ever sending a response.

Detection

graph TB
    GEN[Generate Recommendations<br/>Iteration 1: 2 titles] --> REF1[Reflection 1<br/>Score: 0.65<br/>Needs 3+ titles]
    REF1 --> REGEN1[Regenerate<br/>Iteration 2: 3 titles<br/>but missing JP names]
    REGEN1 --> REF2[Reflection 2<br/>Score: 0.70<br/>Needs JP names]
    REF2 --> REGEN2[Regenerate<br/>Iteration 3: JP names<br/>but only 2 titles again]
    REGEN2 --> REF3[Reflection 3<br/>Score: 0.68<br/>Oscillating!]
    REF3 --> TO[Timeout<br/>3-second budget exhausted<br/>No response sent]

    TO --> CW[CloudWatch<br/>Reflection loop timeouts]
    CW --> INV[Investigation<br/>Reflection iteration count > 3]
    INV --> RC[Root Cause<br/>Quality criteria conflict<br/>No convergence guarantee]

    style TO fill:#ff6b6b,color:#fff
    style RC fill:#ffd43b,color:#333

Root Cause

The reflection loop had no maximum iteration limit, no convergence detection, and no "good enough" acceptance threshold. Each reflection cycle corrected one issue but regressed on another because the regeneration prompt did not carry forward all accumulated feedback — only the most recent critique.

Resolution

"""
Resolution: Bounded reflection loop with convergence detection and
accumulated feedback for MangaAssist quality control.
"""

import logging
import time
from dataclasses import dataclass, field
from typing import Any

logger = logging.getLogger(__name__)


@dataclass
class ReflectionResult:
    """Result of a single reflection iteration."""
    iteration: int
    quality_score: float
    issues: list[str]
    accepted: bool
    feedback: str


@dataclass
class ReflectionConfig:
    """Configuration for the bounded reflection loop."""
    max_iterations: int = 3
    target_score: float = 0.8
    acceptable_score: float = 0.65      # "Good enough" threshold
    convergence_epsilon: float = 0.02   # Min score improvement per iteration
    budget_ms: float = 1500.0           # Max time for entire reflection loop
    accumulate_feedback: bool = True     # Carry forward all feedback


class BoundedReflectionLoop:
    """
    Reflection loop with convergence detection and hard limits.
    Prevents infinite loops by tracking score trends and enforcing budgets.
    """

    def __init__(self, bedrock_client: Any, config: ReflectionConfig | None = None):
        self.bedrock_client = bedrock_client
        self.config = config or ReflectionConfig()
        self.history: list[ReflectionResult] = []

    async def evaluate_quality(self, response: str) -> tuple[float, list[str]]:
        """Evaluate response quality and return score + issues list."""
        prompt = (
            "Evaluate this manga recommendation response for quality. "
            "Score 0-1 on these criteria:\n"
            "1. Has 3+ manga titles (0.25 points)\n"
            "2. Includes Japanese titles (0.25 points)\n"
            "3. Has author names (0.25 points)\n"
            "4. Includes pricing/availability (0.25 points)\n\n"
            f"Response:\n{response}\n\n"
            "Return JSON: {\"score\": 0.X, \"issues\": [\"issue1\", ...]}"
        )
        result = await self.bedrock_client.invoke(
            model_id="anthropic.claude-3-haiku-20240307-v1:0",
            prompt=prompt,
            max_tokens=200,
        )

        try:
            import json
            parsed = json.loads(result.get("text", "{}"))
            return parsed.get("score", 0.5), parsed.get("issues", [])
        except (json.JSONDecodeError, AttributeError):
            return 0.5, ["Could not parse quality evaluation"]

    def _detect_convergence(self) -> bool:
        """Check if scores are improving or oscillating."""
        if len(self.history) < 2:
            return False
        recent_scores = [r.quality_score for r in self.history[-3:]]
        if len(recent_scores) >= 2:
            improvement = recent_scores[-1] - recent_scores[-2]
            if abs(improvement) < self.config.convergence_epsilon:
                logger.info("Reflection converged (delta=%.3f)", improvement)
                return True
        # Check for oscillation
        if len(recent_scores) >= 3:
            deltas = [recent_scores[i+1] - recent_scores[i] for i in range(len(recent_scores)-1)]
            if all(d1 * d2 < 0 for d1, d2 in zip(deltas, deltas[1:])):
                logger.warning("Reflection oscillating — stopping")
                return True
        return False

    def _build_accumulated_feedback(self) -> str:
        """Build feedback that includes all prior issues."""
        if not self.config.accumulate_feedback or not self.history:
            return ""
        all_issues = []
        for r in self.history:
            all_issues.extend(r.issues)
        unique_issues = list(dict.fromkeys(all_issues))
        return "Fix ALL of these issues simultaneously:\n" + "\n".join(
            f"- {issue}" for issue in unique_issues
        )

    async def run(
        self,
        generate_func: Any,
        initial_prompt: str,
        context: dict[str, Any] | None = None,
    ) -> dict[str, Any]:
        """
        Run the bounded reflection loop.

        Returns the best response found, even if target score is not reached.
        """
        self.history.clear()
        start_time = time.monotonic()
        best_response = ""
        best_score = 0.0

        for iteration in range(self.config.max_iterations):
            elapsed_ms = (time.monotonic() - start_time) * 1000
            if elapsed_ms >= self.config.budget_ms:
                logger.warning(
                    "Reflection budget exhausted at iteration %d (%.0fms)",
                    iteration, elapsed_ms,
                )
                break

            # Generate (or regenerate) response
            if iteration == 0:
                prompt = initial_prompt
            else:
                feedback = self._build_accumulated_feedback()
                prompt = (
                    f"{initial_prompt}\n\n"
                    f"IMPORTANT — Previous attempt had quality issues. "
                    f"{feedback}"
                )

            response = await generate_func(prompt)
            response_text = response if isinstance(response, str) else str(response)

            # Evaluate quality
            score, issues = await self.evaluate_quality(response_text)

            # Track best response
            if score > best_score:
                best_score = score
                best_response = response_text

            result = ReflectionResult(
                iteration=iteration,
                quality_score=score,
                issues=issues,
                accepted=score >= self.config.target_score,
                feedback="\n".join(issues),
            )
            self.history.append(result)

            logger.info(
                "Reflection iteration %d: score=%.2f, issues=%d, accepted=%s",
                iteration, score, len(issues), result.accepted,
            )

            # Accept if target reached
            if score >= self.config.target_score:
                return {
                    "response": response_text,
                    "score": score,
                    "iterations": iteration + 1,
                    "accepted": True,
                    "reason": "Target score reached",
                }

            # Accept if "good enough" and not improving
            if score >= self.config.acceptable_score and self._detect_convergence():
                return {
                    "response": best_response,
                    "score": best_score,
                    "iterations": iteration + 1,
                    "accepted": True,
                    "reason": "Converged at acceptable quality",
                }

            # Check for oscillation even below acceptable
            if self._detect_convergence():
                break

        # Return best response found
        return {
            "response": best_response,
            "score": best_score,
            "iterations": len(self.history),
            "accepted": best_score >= self.config.acceptable_score,
            "reason": "Max iterations or budget reached — returning best attempt",
        }

Prevention

  1. Hard limit at 3 reflection iterations — after that, return the best response seen.
  2. Accumulate feedback across iterations so each regeneration addresses ALL prior issues, not just the latest.
  3. Detect oscillation — if scores alternate up/down for 3+ iterations, stop and return the best.
  4. Set an "acceptable" threshold (0.65) below the target (0.8) — a "good enough" response is better than no response.
  5. Budget the reflection loop separately (1500ms max) from the total 3-second budget, leaving time for delivery.

Key Takeaways

# Takeaway MangaAssist Application
1 FM output format is never guaranteed. Always use a robust parser with multiple extraction strategies (direct, code block, regex). Every chain step that expects JSON from Claude uses RobustFMOutputParser.extract_json() with a fallback default.
2 Parallel agent coordination must handle partial results — a timeout from one agent should not kill the entire response. The Supervisor aggregates successful results and notes which agents timed out, delivering partial but useful answers.
3 Conversation context must be bounded with sliding windows and summarization to prevent context overflow and cost explosion. ConversationHistoryManager keeps the last 10 turns and summarizes older turns with Haiku, staying within 3K tokens.
4 Parallel steps need isolated state — shared mutable dictionaries cause race conditions even in single-threaded asyncio. ParallelStateManager gives each step a frozen snapshot and merges results with namespaced keys after completion.
5 Reflection loops must be bounded with max iterations, convergence detection, and accumulated feedback to prevent oscillation. BoundedReflectionLoop stops after 3 iterations or when it detects oscillation, returning the best response found.