Scenarios & Runbooks — Advanced GenAI Applications
MangaAssist context: JP Manga store chatbot on AWS — Bedrock Claude 3 (Sonnet at $3/$15 per 1M tokens input/output, Haiku at $0.25/$1.25), OpenSearch Serverless (vector store), DynamoDB (sessions/products), ECS Fargate (orchestrator), API Gateway WebSocket, ElastiCache Redis. Target: useful answer in under 3 seconds, 1M messages/day scale.
Skill Mapping
| Attribute | Detail |
|---|---|
| Domain | 2 — Implementation & Integration of GenAI Applications |
| Task | 2.5 — Application Integration Patterns |
| Skill | 2.5.5 — Advanced GenAI Applications |
| Focus | Troubleshooting agent chains, supervisor coordination, execution limits, race conditions, reflection loops |
| MangaAssist Scope | Five production scenarios with detection, root cause analysis, resolution code, and prevention strategies |
Mind Map
mindmap
root((Advanced GenAI<br/>Scenarios))
Scenario 1
Chain Break from Unexpected Format
FM Returns Markdown Instead of JSON
JSON Parse Failure Mid-Chain
Downstream Steps Receive None
Chain Halts or Produces Garbage
Scenario 2
Supervisor Losing Coordination
Agent Timeout Not Propagated
Orphaned Agent Tasks
Duplicate Responses
State Desynchronization
Scenario 3
Execution History Limit
Context Window Overflow
DynamoDB Item Size Exceeded
Memory Pressure on ECS
Truncated Conversation
Scenario 4
Parallel Chain Race Condition
Shared State Mutation
Non-Deterministic Result Order
Partial Failure Masking
Resource Contention
Scenario 5
Reflection Loop Not Converging
Self-Correction Cycles Forever
Token Budget Explosion
Quality Score Oscillates
Timeout Without Response
Scenario 1 — Chain Breaks from Unexpected FM Output Format
Problem Statement
The manga search chain expects the intent classification step to return valid JSON ({"intent": "search", "search_terms": ["鬼滅の刃"]}), but Claude occasionally returns the JSON wrapped in a markdown code block (```json...```) or adds explanatory text before the JSON. The downstream json.loads() call fails, causing the entire chain to halt with no response sent to the user.
Detection
graph TB
FM[Bedrock Claude Response<br/>Returns markdown-wrapped JSON] --> JP[JSON Parse<br/>json.loads fails]
JP --> EX[Exception<br/>JSONDecodeError]
EX --> CH[Chain Halts<br/>No downstream steps execute]
CH --> WS[WebSocket<br/>Empty response or timeout]
WS --> USR[User<br/>Sees loading spinner forever]
EX --> CW[CloudWatch<br/>Error rate spike on<br/>classify_intent step]
CW --> INV[Investigation<br/>Sample failed payloads]
INV --> RC[Root Cause<br/>FM wraps JSON in code blocks<br/>or adds preamble text]
style EX fill:#ff6b6b,color:#fff
style RC fill:#ffd43b,color:#333
Root Cause
Foundation models are non-deterministic in output formatting. Even with explicit instructions to "return only JSON," Claude sometimes wraps the output in markdown code fences, adds a brief preamble like "Here is the classification:", or includes trailing explanation text. The chain's json.loads() call on raw FM output fails because it cannot parse these decorations.
Resolution
"""
Resolution: Robust FM output parser with multiple extraction strategies.
Handles markdown blocks, preamble text, and malformed JSON gracefully.
"""
import json
import logging
import re
from typing import Any, Optional
logger = logging.getLogger(__name__)
class FMOutputParseError(Exception):
"""Raised when all parsing strategies fail."""
pass
class RobustFMOutputParser:
"""
Extracts structured data from non-deterministic FM outputs.
Tries multiple strategies in order of specificity.
"""
@staticmethod
def extract_json(
raw_output: str,
default: Any = None,
strict: bool = False,
) -> Any:
"""
Extract JSON from FM output using multiple strategies.
Strategies (in order):
1. Direct parse (output is pure JSON)
2. Markdown code block extraction (```json ... ```)
3. Generic code block extraction (``` ... ```)
4. First JSON object/array extraction via regex
5. Return default if all fail
"""
if not raw_output or not raw_output.strip():
if strict:
raise FMOutputParseError("Empty FM output")
return default
text = raw_output.strip()
# Strategy 1: Direct JSON parse
try:
return json.loads(text)
except json.JSONDecodeError:
pass
# Strategy 2: Extract from ```json ... ``` blocks
json_blocks = re.findall(r"```json\s*\n?(.*?)\n?\s*```", text, re.DOTALL)
for block in json_blocks:
try:
return json.loads(block.strip())
except json.JSONDecodeError:
continue
# Strategy 3: Extract from generic ``` ... ``` blocks
code_blocks = re.findall(r"```\s*\n?(.*?)\n?\s*```", text, re.DOTALL)
for block in code_blocks:
try:
return json.loads(block.strip())
except json.JSONDecodeError:
continue
# Strategy 4: Find first JSON object or array via regex
# Match outermost { ... } or [ ... ]
json_patterns = [
re.compile(r"\{[^{}]*(?:\{[^{}]*\}[^{}]*)*\}", re.DOTALL),
re.compile(r"\[.*?\]", re.DOTALL),
]
for pattern in json_patterns:
matches = pattern.findall(text)
for match in matches:
try:
return json.loads(match)
except json.JSONDecodeError:
continue
# Strategy 5: Try to fix common JSON issues
cleaned = text
# Remove trailing commas before } or ]
cleaned = re.sub(r",\s*([}\]])", r"\1", cleaned)
# Fix single quotes to double quotes
cleaned = cleaned.replace("'", '"')
try:
return json.loads(cleaned)
except json.JSONDecodeError:
pass
if strict:
raise FMOutputParseError(
f"All JSON extraction strategies failed. Raw output: {text[:200]}"
)
logger.warning(
"All JSON extraction strategies failed, returning default. "
"Raw output: %s...", text[:100],
)
return default
@staticmethod
def extract_text(raw_output: str) -> str:
"""Extract clean text from FM output, stripping code blocks."""
text = raw_output.strip()
# Remove code blocks
text = re.sub(r"```\w*\n?.*?\n?```", "", text, flags=re.DOTALL)
# Remove leading/trailing whitespace from each line
lines = [line.strip() for line in text.split("\n")]
return "\n".join(line for line in lines if line)
@staticmethod
def extract_list(raw_output: str, separator: str = "\n") -> list[str]:
"""Extract a list of items from FM output."""
text = RobustFMOutputParser.extract_text(raw_output)
# Try numbered list extraction
numbered = re.findall(r"^\d+[\.\)]\s*(.+)$", text, re.MULTILINE)
if numbered:
return numbered
# Try bullet list extraction
bulleted = re.findall(r"^[-*•]\s*(.+)$", text, re.MULTILINE)
if bulleted:
return bulleted
# Fall back to line splitting
return [line for line in text.split(separator) if line.strip()]
def safe_chain_step_parse(
step_output: Any,
expected_type: str = "json",
default: Any = None,
) -> Any:
"""
Safe parser for use between chain steps.
Wraps RobustFMOutputParser with step-specific error handling.
"""
if step_output is None:
return default
raw = str(step_output)
if expected_type == "json":
return RobustFMOutputParser.extract_json(raw, default=default)
elif expected_type == "text":
return RobustFMOutputParser.extract_text(raw)
elif expected_type == "list":
return RobustFMOutputParser.extract_list(raw)
else:
return raw
Prevention
- Always use
RobustFMOutputParser.extract_json()instead of rawjson.loads()between chain steps. - Add format enforcement in prompts: "Return ONLY a JSON object. No explanation, no code fences, no markdown."
- Add a
transformstep after every prompt step that normalizes the output format before downstream consumption. - Set up CloudWatch alerts on chain step error rates — a spike in parse errors indicates a model behavior change.
- Maintain a fallback default for every JSON extraction so the chain can continue with degraded quality rather than halting.
Scenario 2 — Supervisor Agent Loses Coordination with Agent Squad
Problem Statement
The Supervisor agent delegates a search task to the Search Agent and a recommendation task to the Recommendation Agent in parallel. The Search Agent responds in 800ms, but the Recommendation Agent times out at 2500ms. The Supervisor does not properly handle the partial results: it either waits indefinitely for the timed-out agent, sends duplicate responses when the late agent eventually completes, or drops the successful result entirely.
Detection
graph TB
SUP[Supervisor<br/>Dispatches parallel tasks] --> SA[Search Agent<br/>Responds in 800ms]
SUP --> RA[Recommendation Agent<br/>Times out at 2500ms]
SA --> SUP_WAIT[Supervisor Waiting<br/>For all agents to respond]
RA -->|Timeout| SUP_WAIT
SUP_WAIT --> P1[Problem 1<br/>Supervisor hangs<br/>waiting for RA]
SUP_WAIT --> P2[Problem 2<br/>RA responds late<br/>duplicate delivery]
SUP_WAIT --> P3[Problem 3<br/>Supervisor drops SA result<br/>sends error to user]
P1 --> CW[CloudWatch<br/>Supervisor latency spike]
P2 --> WS[WebSocket<br/>User sees two responses]
P3 --> USR[User<br/>Gets error despite<br/>search working fine]
style P1 fill:#ff6b6b,color:#fff
style P2 fill:#ff6b6b,color:#fff
style P3 fill:#ff6b6b,color:#fff
Root Cause
The Supervisor used asyncio.gather() without return_exceptions=True and without an explicit timeout wrapper. When one agent timed out, the gather raised an exception that cancelled the other coroutine or caused the entire result set to be discarded. There was no mechanism to accept partial results from agents that completed successfully.
Resolution
"""
Resolution: Robust parallel agent coordination with partial result handling.
Supervisor gracefully handles agent timeouts and delivers partial results.
"""
import asyncio
import logging
import time
from dataclasses import dataclass, field
from typing import Any
logger = logging.getLogger(__name__)
@dataclass
class AgentTaskResult:
"""Result from a delegated agent task."""
agent_id: str
success: bool
output: Any = None
error: str | None = None
latency_ms: float = 0.0
timed_out: bool = False
class RobustSupervisorCoordinator:
"""
Coordinates parallel agent execution with graceful timeout handling.
Delivers partial results when some agents time out.
"""
def __init__(
self,
agent_registry: dict[str, Any],
total_budget_ms: float = 2500.0,
per_agent_timeout_ms: float = 2000.0,
min_results_required: int = 1,
):
self.agent_registry = agent_registry
self.total_budget_ms = total_budget_ms
self.per_agent_timeout_ms = per_agent_timeout_ms
self.min_results_required = min_results_required
self._delivered: set[str] = set() # Track delivered correlation IDs
async def dispatch_parallel(
self,
tasks: list[dict[str, Any]],
context: Any,
) -> list[AgentTaskResult]:
"""
Dispatch tasks to agents in parallel with robust timeout handling.
Returns all results (including partial) — never blocks indefinitely.
"""
if not tasks:
return []
results: list[AgentTaskResult] = []
start_time = time.monotonic()
# Create wrapped tasks with individual timeouts
async_tasks = []
for task_spec in tasks:
agent_id = task_spec["agent_id"]
agent = self.agent_registry.get(agent_id)
if agent is None:
results.append(AgentTaskResult(
agent_id=agent_id,
success=False,
error=f"Agent '{agent_id}' not found",
))
continue
async_tasks.append(
self._execute_with_timeout(
agent_id=agent_id,
agent=agent,
message=task_spec.get("message", ""),
context=context,
timeout_ms=min(self.per_agent_timeout_ms, self.total_budget_ms),
)
)
# Execute all tasks with a global timeout
global_timeout = self.total_budget_ms / 1000.0
try:
completed = await asyncio.wait_for(
asyncio.gather(*async_tasks, return_exceptions=True),
timeout=global_timeout,
)
for result in completed:
if isinstance(result, Exception):
results.append(AgentTaskResult(
agent_id="unknown",
success=False,
error=str(result),
))
elif isinstance(result, AgentTaskResult):
results.append(result)
except asyncio.TimeoutError:
logger.warning(
"Global timeout reached (%.0fms). Collecting partial results.",
self.total_budget_ms,
)
# Any tasks that completed before the global timeout are in results
# The rest are timed out
total_elapsed = (time.monotonic() - start_time) * 1000
logger.info(
"Parallel dispatch completed: %d/%d succeeded in %.0fms",
sum(1 for r in results if r.success),
len(tasks),
total_elapsed,
)
return results
async def _execute_with_timeout(
self,
agent_id: str,
agent: Any,
message: str,
context: Any,
timeout_ms: float,
) -> AgentTaskResult:
"""Execute a single agent with timeout protection."""
start = time.monotonic()
try:
result = await asyncio.wait_for(
agent.run(message, context),
timeout=timeout_ms / 1000.0,
)
latency = (time.monotonic() - start) * 1000
return AgentTaskResult(
agent_id=agent_id,
success=True,
output=result.content if hasattr(result, "content") else str(result),
latency_ms=round(latency, 2),
)
except asyncio.TimeoutError:
latency = (time.monotonic() - start) * 1000
logger.warning("Agent '%s' timed out after %.0fms", agent_id, latency)
return AgentTaskResult(
agent_id=agent_id,
success=False,
timed_out=True,
latency_ms=round(latency, 2),
error=f"Timed out after {timeout_ms}ms",
)
except Exception as e:
latency = (time.monotonic() - start) * 1000
logger.error("Agent '%s' failed: %s", agent_id, e)
return AgentTaskResult(
agent_id=agent_id,
success=False,
error=str(e),
latency_ms=round(latency, 2),
)
def aggregate_results(
self,
results: list[AgentTaskResult],
correlation_id: str,
) -> dict[str, Any]:
"""
Aggregate agent results into a unified response.
Handles partial results gracefully — never duplicate delivers.
"""
# Prevent duplicate delivery
if correlation_id in self._delivered:
logger.warning("Duplicate delivery prevented for %s", correlation_id)
return {"error": "Already delivered", "correlation_id": correlation_id}
self._delivered.add(correlation_id)
successful = [r for r in results if r.success]
timed_out = [r for r in results if r.timed_out]
failed = [r for r in results if not r.success and not r.timed_out]
if not successful:
return {
"status": "error",
"message": "All agents failed or timed out",
"details": [r.error for r in results],
}
# Build composite response from successful results
response_parts = [r.output for r in successful if r.output]
composite = "\n\n".join(str(part) for part in response_parts)
# Add notes about timed-out agents
if timed_out:
agent_names = [r.agent_id for r in timed_out]
composite += (
f"\n\n(Note: Some information may be incomplete. "
f"The {', '.join(agent_names)} service(s) are temporarily slow.)"
)
return {
"status": "partial" if timed_out else "complete",
"response": composite,
"agents_succeeded": [r.agent_id for r in successful],
"agents_timed_out": [r.agent_id for r in timed_out],
"agents_failed": [r.agent_id for r in failed],
"total_latency_ms": max((r.latency_ms for r in results), default=0),
}
Prevention
- Always use
return_exceptions=Trueinasyncio.gather()for parallel agent dispatch. - Wrap every agent call in
asyncio.wait_for()with per-agent timeouts. - Track correlation IDs to prevent duplicate response delivery.
- Aggregate partial results — a response with 2 of 3 agents is better than no response.
- Log agent timeout rates per agent type to identify consistently slow specialists.
Scenario 3 — Execution History Exceeds Context Window Limit
Problem Statement
A user has a long conversation with MangaAssist (50+ turns) asking about various manga, comparing titles, and discussing recommendations. The accumulated conversation history exceeds Claude 3's context window when passed as conversation_history to the next Bedrock call, causing a ValidationException: Input is too long. Even before hitting the hard limit, the growing context inflates token costs and pushes latency beyond 3 seconds.
Detection
graph TB
CONV[Long Conversation<br/>50+ turns accumulated] --> CTX[Context Assembly<br/>Full history → prompt]
CTX --> TK[Token Count<br/>Exceeds model limit]
TK --> ERR[Bedrock Error<br/>ValidationException<br/>Input is too long]
ERR --> USR[User<br/>Chat breaks mid-conversation]
CTX --> COST[Cost Escalation<br/>$0.01+ per message<br/>from huge input]
COST --> LAT[Latency Increase<br/>4-5 seconds per response]
LAT --> TO[Timeout<br/>3-second budget breached]
ERR --> CW[CloudWatch<br/>ValidationException spike]
COST --> BILL[Billing Alert<br/>Daily cost 3x expected]
style ERR fill:#ff6b6b,color:#fff
style COST fill:#ffd43b,color:#333
Root Cause
The conversation history was stored verbatim in DynamoDB and passed in full to every Bedrock call. No truncation, summarization, or sliding window was applied. At 50+ turns with manga descriptions, the input exceeded 100K tokens — well beyond the model's capacity and budget constraints.
Resolution
"""
Resolution: Conversation history manager with sliding window and summarization.
Prevents context overflow while maintaining conversation coherence.
"""
import json
import logging
from dataclasses import dataclass, field
from typing import Any
logger = logging.getLogger(__name__)
@dataclass
class ConversationWindow:
"""Manages a bounded conversation history window."""
max_turns: int = 10
max_tokens_estimate: int = 3000
summary: str = ""
recent_turns: list[dict[str, str]] = field(default_factory=list)
total_turns: int = 0
class ConversationHistoryManager:
"""
Manages conversation history with bounded context windows.
Prevents context overflow by summarizing old turns and keeping recent ones.
"""
CHARS_PER_TOKEN_ESTIMATE = 4 # Mixed JP/EN average
def __init__(
self,
bedrock_client: Any,
max_recent_turns: int = 10,
max_context_tokens: int = 3000,
summary_model: str = "anthropic.claude-3-haiku-20240307-v1:0",
):
self.bedrock_client = bedrock_client
self.max_recent_turns = max_recent_turns
self.max_context_tokens = max_context_tokens
self.summary_model = summary_model
def _estimate_tokens(self, text: str) -> int:
return max(1, len(text) // self.CHARS_PER_TOKEN_ESTIMATE)
def _total_tokens(self, turns: list[dict[str, str]]) -> int:
return sum(
self._estimate_tokens(t.get("content", ""))
for t in turns
)
async def prepare_context(
self, full_history: list[dict[str, str]], existing_summary: str = ""
) -> ConversationWindow:
"""
Prepare a bounded conversation context from full history.
Strategy:
1. Keep the last N turns as-is (recent context)
2. Summarize older turns into a concise summary
3. Ensure total tokens stay within budget
"""
total_turns = len(full_history)
if total_turns <= self.max_recent_turns:
# No truncation needed
return ConversationWindow(
max_turns=self.max_recent_turns,
max_tokens_estimate=self._total_tokens(full_history),
summary="",
recent_turns=full_history,
total_turns=total_turns,
)
# Split into old and recent
recent = full_history[-self.max_recent_turns:]
old = full_history[:-self.max_recent_turns]
# Check if recent turns alone exceed budget
recent_tokens = self._total_tokens(recent)
if recent_tokens > self.max_context_tokens:
# Trim recent turns to fit
while recent and self._total_tokens(recent) > self.max_context_tokens:
recent.pop(0)
# Summarize old turns
summary = await self._summarize_turns(old, existing_summary)
return ConversationWindow(
max_turns=self.max_recent_turns,
max_tokens_estimate=self._estimate_tokens(summary) + self._total_tokens(recent),
summary=summary,
recent_turns=recent,
total_turns=total_turns,
)
async def _summarize_turns(
self, turns: list[dict[str, str]], existing_summary: str
) -> str:
"""Summarize older conversation turns using Haiku."""
if not turns:
return existing_summary
turns_text = "\n".join(
f"{t['role']}: {t['content'][:200]}" for t in turns[-20:]
)
prompt = (
"Summarize this manga store chat conversation in 2-3 sentences. "
"Focus on: manga titles discussed, user preferences, and any pending actions.\n\n"
)
if existing_summary:
prompt += f"Previous summary: {existing_summary}\n\n"
prompt += f"New conversation:\n{turns_text}\n\nConcise summary:"
response = await self.bedrock_client.invoke(
model_id=self.summary_model,
prompt=prompt,
max_tokens=200,
)
return response.get("text", existing_summary)
def build_messages(self, window: ConversationWindow) -> list[dict[str, str]]:
"""Build the messages array for Bedrock from a conversation window."""
messages = []
if window.summary:
messages.append({
"role": "user",
"content": f"[Previous conversation summary: {window.summary}]",
})
messages.append({
"role": "assistant",
"content": "I understand the context from our earlier conversation. How can I help you now?",
})
messages.extend(window.recent_turns)
return messages
# --- DynamoDB session size guard ---
MAX_DYNAMODB_ITEM_KB = 400 # DynamoDB 400KB item limit
def check_session_size(session_data: dict[str, Any]) -> dict[str, Any]:
"""Check if session data is approaching DynamoDB item size limit."""
serialized = json.dumps(session_data, ensure_ascii=False)
size_kb = len(serialized.encode("utf-8")) / 1024
return {
"size_kb": round(size_kb, 2),
"limit_kb": MAX_DYNAMODB_ITEM_KB,
"usage_pct": round(size_kb / MAX_DYNAMODB_ITEM_KB * 100, 1),
"needs_compaction": size_kb > MAX_DYNAMODB_ITEM_KB * 0.75,
"exceeds_limit": size_kb > MAX_DYNAMODB_ITEM_KB,
}
Prevention
- Always use
ConversationHistoryManager— never pass raw full history to Bedrock. - Set
max_recent_turns=10as the default window — enough for context without overflow. - Summarize old turns with Haiku (cheap and fast) before they fall off the window.
- Monitor DynamoDB item sizes — alert when session items exceed 75% of the 400KB limit.
- Implement session compaction — periodically compress old messages and store summaries.
Scenario 4 — Parallel Chain Race Condition on Shared State
Problem Statement
Two parallel chain steps (Search Agent and Recommendation Agent) both read and write to the shared ChainState.data dictionary. The Search Agent writes data["manga_results"] = [...] while the Recommendation Agent reads data["manga_results"] expecting it to contain the search results. Due to parallel execution, the Recommendation Agent reads before the Search Agent writes, getting None instead of results. Alternatively, both agents write to the same key, and the last write wins, silently dropping the first agent's output.
Detection
graph TB
PAR[Parallel Execution<br/>Search ∥ Recommendation] --> SA_W[Search Agent<br/>Writes manga_results]
PAR --> RA_R[Recommendation Agent<br/>Reads manga_results]
SA_W -->|Write at t=800ms| STATE[Shared ChainState.data]
RA_R -->|Read at t=200ms| STATE
STATE --> RACE[Race Condition<br/>RA reads before SA writes]
RACE --> P1[Problem<br/>RA gets None,<br/>generates generic recs]
SA_W --> OW[Overwrite Risk<br/>Both write to same key]
OW --> P2[Problem<br/>Last-write-wins<br/>drops SA output]
RACE --> USR[User<br/>Gets irrelevant recommendations<br/>that ignore search context]
style RACE fill:#ff6b6b,color:#fff
style P2 fill:#ff6b6b,color:#fff
Root Cause
The ChainState was passed by reference to parallel steps. Both steps received the same mutable dictionary and operated on it concurrently. Python's asyncio is single-threaded but task switching between await points means one coroutine can read stale data that another coroutine hasn't written yet. Additionally, no namespacing prevented key collisions between agents writing to data.
Resolution
"""
Resolution: Immutable state management for parallel chain execution.
Each parallel step gets an isolated copy; results are merged safely.
"""
import asyncio
import copy
import logging
from dataclasses import dataclass, field
from typing import Any
logger = logging.getLogger(__name__)
@dataclass
class IsolatedStepState:
"""Isolated state snapshot for a single parallel step."""
step_name: str
input_data: dict[str, Any] # Frozen copy of shared state at dispatch time
output_data: dict[str, Any] = field(default_factory=dict) # Step's own outputs
def read(self, key: str, default: Any = None) -> Any:
"""Read from input data (frozen snapshot)."""
return self.input_data.get(key, default)
def write(self, key: str, value: Any) -> None:
"""Write to output data (step-local namespace)."""
namespaced_key = f"{self.step_name}.{key}"
self.output_data[namespaced_key] = value
# Also write un-namespaced for convenience
self.output_data[key] = value
class ParallelStateManager:
"""
Manages state isolation and merging for parallel chain steps.
Prevents race conditions by giving each step an immutable snapshot.
"""
def __init__(self, merge_strategy: str = "namespace"):
"""
Args:
merge_strategy: "namespace" prefixes keys with step name,
"last_write_wins" allows overwrites (not recommended),
"collect" groups all values per key into lists.
"""
self.merge_strategy = merge_strategy
def create_isolated_states(
self,
shared_state_data: dict[str, Any],
step_names: list[str],
) -> dict[str, IsolatedStepState]:
"""Create frozen state copies for each parallel step."""
isolated = {}
for name in step_names:
isolated[name] = IsolatedStepState(
step_name=name,
input_data=copy.deepcopy(shared_state_data),
)
return isolated
def merge_results(
self,
original_data: dict[str, Any],
isolated_states: dict[str, IsolatedStepState],
) -> dict[str, Any]:
"""Merge results from parallel steps back into shared state."""
merged = copy.deepcopy(original_data)
if self.merge_strategy == "namespace":
# Each step's outputs are namespaced: "step_name.key"
for step_name, state in isolated_states.items():
for key, value in state.output_data.items():
if "." not in key:
# Add namespaced version
merged[f"{step_name}.{key}"] = value
else:
merged[key] = value
# Also add the step's complete output under its name
merged[step_name] = state.output_data
elif self.merge_strategy == "collect":
# Collect all values per key into lists
for state in isolated_states.values():
for key, value in state.output_data.items():
if key not in merged:
merged[key] = [value]
elif isinstance(merged[key], list):
merged[key].append(value)
else:
merged[key] = [merged[key], value]
else: # last_write_wins
for state in isolated_states.values():
merged.update(state.output_data)
return merged
async def execute_parallel_steps_safely(
steps: list[dict[str, Any]],
shared_data: dict[str, Any],
state_manager: ParallelStateManager,
) -> dict[str, Any]:
"""
Execute parallel steps with isolated state and safe merging.
Each step receives a frozen copy of shared state.
Results are merged back with namespacing to prevent collisions.
"""
step_names = [s["name"] for s in steps]
isolated_states = state_manager.create_isolated_states(shared_data, step_names)
async def run_step(step_config: dict, iso_state: IsolatedStepState) -> None:
handler = step_config.get("handler")
if handler:
result = await handler(iso_state.input_data)
iso_state.write("result", result)
tasks = [
run_step(step, isolated_states[step["name"]])
for step in steps
]
await asyncio.gather(*tasks, return_exceptions=True)
merged = state_manager.merge_results(shared_data, isolated_states)
logger.info(
"Parallel merge complete: %d steps, %d output keys",
len(steps),
len(merged) - len(shared_data),
)
return merged
Prevention
- Never share mutable state between parallel steps — always use
copy.deepcopy()for isolation. - Namespace all step outputs with the step name prefix to prevent key collisions.
- Use
IsolatedStepStateas the state interface for parallel steps — it enforces read-from-snapshot, write-to-local. - Merge results explicitly after all parallel steps complete — never during execution.
- If one step depends on another's output, they cannot be parallel — use a sequential chain segment instead.
Scenario 5 — Reflection Loop Does Not Converge
Problem Statement
The Reflection Agent evaluates the Recommendation Agent's output, finds it has only 2 recommendations instead of the requested 3, and asks for regeneration. The regenerated output now has 3 recommendations but one title is in English only (missing Japanese). Reflection rejects again. The cycle repeats: each fix introduces a new issue, the quality score oscillates between 0.65 and 0.75 (never reaching the 0.8 threshold), and the 3-second budget is consumed entirely by reflection loops without ever sending a response.
Detection
graph TB
GEN[Generate Recommendations<br/>Iteration 1: 2 titles] --> REF1[Reflection 1<br/>Score: 0.65<br/>Needs 3+ titles]
REF1 --> REGEN1[Regenerate<br/>Iteration 2: 3 titles<br/>but missing JP names]
REGEN1 --> REF2[Reflection 2<br/>Score: 0.70<br/>Needs JP names]
REF2 --> REGEN2[Regenerate<br/>Iteration 3: JP names<br/>but only 2 titles again]
REGEN2 --> REF3[Reflection 3<br/>Score: 0.68<br/>Oscillating!]
REF3 --> TO[Timeout<br/>3-second budget exhausted<br/>No response sent]
TO --> CW[CloudWatch<br/>Reflection loop timeouts]
CW --> INV[Investigation<br/>Reflection iteration count > 3]
INV --> RC[Root Cause<br/>Quality criteria conflict<br/>No convergence guarantee]
style TO fill:#ff6b6b,color:#fff
style RC fill:#ffd43b,color:#333
Root Cause
The reflection loop had no maximum iteration limit, no convergence detection, and no "good enough" acceptance threshold. Each reflection cycle corrected one issue but regressed on another because the regeneration prompt did not carry forward all accumulated feedback — only the most recent critique.
Resolution
"""
Resolution: Bounded reflection loop with convergence detection and
accumulated feedback for MangaAssist quality control.
"""
import logging
import time
from dataclasses import dataclass, field
from typing import Any
logger = logging.getLogger(__name__)
@dataclass
class ReflectionResult:
"""Result of a single reflection iteration."""
iteration: int
quality_score: float
issues: list[str]
accepted: bool
feedback: str
@dataclass
class ReflectionConfig:
"""Configuration for the bounded reflection loop."""
max_iterations: int = 3
target_score: float = 0.8
acceptable_score: float = 0.65 # "Good enough" threshold
convergence_epsilon: float = 0.02 # Min score improvement per iteration
budget_ms: float = 1500.0 # Max time for entire reflection loop
accumulate_feedback: bool = True # Carry forward all feedback
class BoundedReflectionLoop:
"""
Reflection loop with convergence detection and hard limits.
Prevents infinite loops by tracking score trends and enforcing budgets.
"""
def __init__(self, bedrock_client: Any, config: ReflectionConfig | None = None):
self.bedrock_client = bedrock_client
self.config = config or ReflectionConfig()
self.history: list[ReflectionResult] = []
async def evaluate_quality(self, response: str) -> tuple[float, list[str]]:
"""Evaluate response quality and return score + issues list."""
prompt = (
"Evaluate this manga recommendation response for quality. "
"Score 0-1 on these criteria:\n"
"1. Has 3+ manga titles (0.25 points)\n"
"2. Includes Japanese titles (0.25 points)\n"
"3. Has author names (0.25 points)\n"
"4. Includes pricing/availability (0.25 points)\n\n"
f"Response:\n{response}\n\n"
"Return JSON: {\"score\": 0.X, \"issues\": [\"issue1\", ...]}"
)
result = await self.bedrock_client.invoke(
model_id="anthropic.claude-3-haiku-20240307-v1:0",
prompt=prompt,
max_tokens=200,
)
try:
import json
parsed = json.loads(result.get("text", "{}"))
return parsed.get("score", 0.5), parsed.get("issues", [])
except (json.JSONDecodeError, AttributeError):
return 0.5, ["Could not parse quality evaluation"]
def _detect_convergence(self) -> bool:
"""Check if scores are improving or oscillating."""
if len(self.history) < 2:
return False
recent_scores = [r.quality_score for r in self.history[-3:]]
if len(recent_scores) >= 2:
improvement = recent_scores[-1] - recent_scores[-2]
if abs(improvement) < self.config.convergence_epsilon:
logger.info("Reflection converged (delta=%.3f)", improvement)
return True
# Check for oscillation
if len(recent_scores) >= 3:
deltas = [recent_scores[i+1] - recent_scores[i] for i in range(len(recent_scores)-1)]
if all(d1 * d2 < 0 for d1, d2 in zip(deltas, deltas[1:])):
logger.warning("Reflection oscillating — stopping")
return True
return False
def _build_accumulated_feedback(self) -> str:
"""Build feedback that includes all prior issues."""
if not self.config.accumulate_feedback or not self.history:
return ""
all_issues = []
for r in self.history:
all_issues.extend(r.issues)
unique_issues = list(dict.fromkeys(all_issues))
return "Fix ALL of these issues simultaneously:\n" + "\n".join(
f"- {issue}" for issue in unique_issues
)
async def run(
self,
generate_func: Any,
initial_prompt: str,
context: dict[str, Any] | None = None,
) -> dict[str, Any]:
"""
Run the bounded reflection loop.
Returns the best response found, even if target score is not reached.
"""
self.history.clear()
start_time = time.monotonic()
best_response = ""
best_score = 0.0
for iteration in range(self.config.max_iterations):
elapsed_ms = (time.monotonic() - start_time) * 1000
if elapsed_ms >= self.config.budget_ms:
logger.warning(
"Reflection budget exhausted at iteration %d (%.0fms)",
iteration, elapsed_ms,
)
break
# Generate (or regenerate) response
if iteration == 0:
prompt = initial_prompt
else:
feedback = self._build_accumulated_feedback()
prompt = (
f"{initial_prompt}\n\n"
f"IMPORTANT — Previous attempt had quality issues. "
f"{feedback}"
)
response = await generate_func(prompt)
response_text = response if isinstance(response, str) else str(response)
# Evaluate quality
score, issues = await self.evaluate_quality(response_text)
# Track best response
if score > best_score:
best_score = score
best_response = response_text
result = ReflectionResult(
iteration=iteration,
quality_score=score,
issues=issues,
accepted=score >= self.config.target_score,
feedback="\n".join(issues),
)
self.history.append(result)
logger.info(
"Reflection iteration %d: score=%.2f, issues=%d, accepted=%s",
iteration, score, len(issues), result.accepted,
)
# Accept if target reached
if score >= self.config.target_score:
return {
"response": response_text,
"score": score,
"iterations": iteration + 1,
"accepted": True,
"reason": "Target score reached",
}
# Accept if "good enough" and not improving
if score >= self.config.acceptable_score and self._detect_convergence():
return {
"response": best_response,
"score": best_score,
"iterations": iteration + 1,
"accepted": True,
"reason": "Converged at acceptable quality",
}
# Check for oscillation even below acceptable
if self._detect_convergence():
break
# Return best response found
return {
"response": best_response,
"score": best_score,
"iterations": len(self.history),
"accepted": best_score >= self.config.acceptable_score,
"reason": "Max iterations or budget reached — returning best attempt",
}
Prevention
- Hard limit at 3 reflection iterations — after that, return the best response seen.
- Accumulate feedback across iterations so each regeneration addresses ALL prior issues, not just the latest.
- Detect oscillation — if scores alternate up/down for 3+ iterations, stop and return the best.
- Set an "acceptable" threshold (0.65) below the target (0.8) — a "good enough" response is better than no response.
- Budget the reflection loop separately (1500ms max) from the total 3-second budget, leaving time for delivery.
Key Takeaways
| # | Takeaway | MangaAssist Application |
|---|---|---|
| 1 | FM output format is never guaranteed. Always use a robust parser with multiple extraction strategies (direct, code block, regex). | Every chain step that expects JSON from Claude uses RobustFMOutputParser.extract_json() with a fallback default. |
| 2 | Parallel agent coordination must handle partial results — a timeout from one agent should not kill the entire response. | The Supervisor aggregates successful results and notes which agents timed out, delivering partial but useful answers. |
| 3 | Conversation context must be bounded with sliding windows and summarization to prevent context overflow and cost explosion. | ConversationHistoryManager keeps the last 10 turns and summarizes older turns with Haiku, staying within 3K tokens. |
| 4 | Parallel steps need isolated state — shared mutable dictionaries cause race conditions even in single-threaded asyncio. | ParallelStateManager gives each step a frozen snapshot and merges results with namespaced keys after completion. |
| 5 | Reflection loops must be bounded with max iterations, convergence detection, and accumulated feedback to prevent oscillation. | BoundedReflectionLoop stops after 3 iterations or when it detects oscillation, returning the best response found. |