Scenarios and Runbooks — Intelligent Autonomous Systems
MangaAssist context: JP Manga store chatbot on AWS — Bedrock Claude 3 (Sonnet at $3/$15 per 1M tokens input/output, Haiku at $0.25/$1.25), OpenSearch Serverless (vector store), DynamoDB (sessions/products), ECS Fargate (orchestrator), API Gateway WebSocket, ElastiCache Redis. Target: useful answer in under 3 seconds, 1M messages/day scale.
Skill Mapping
| Dimension | Detail |
|---|---|
| Certification | AWS AIP-C01 — AI Practitioner |
| Domain | 2 — Development and Implementation of GenAI Solutions |
| Task | 2.1 — Develop agentic AI solutions using AWS services |
| Skill | 2.1.1 — Develop intelligent autonomous systems with appropriate memory and state management capabilities |
| This File | Five production scenarios with detection flowcharts, root cause analysis, resolution code, and prevention strategies |
Skill Scope Statement
This file presents five real-world failure scenarios that MangaAssist has encountered (or would encounter) in production with autonomous agents, multi-agent routing, MCP tool interactions, and session state management. Each scenario includes: a problem statement, a mermaid detection flowchart, root cause analysis, Python resolution code, and prevention measures. These runbooks are designed for on-call engineers responding to alerts.
Mind Map — Autonomous Agent Failure Modes
mindmap
root((Agent Failure<br/>Scenarios))
Memory Corruption
Redis Deserialization
TTL Race Condition
Stale Context Injection
Multi-Agent Routing Loop
Classifier Oscillation
Confidence Below Threshold
Agent Ping-Pong
MCP Tool Timeout
Cascading Timeout
Hallucinated Result
Partial Data Answer
DynamoDB Session Size
400KB Item Limit
History Accumulation
Serialization Bloat
Agent Squad Deadlock
Circular Dependency
Shared Resource Contention
Supervisor Starvation
Scenario Overview
| # | Scenario | Severity | Blast Radius | Typical Detection Time |
|---|---|---|---|---|
| 1 | Agent memory corruption (Redis deserialization failure) | P1 — Critical | All sessions on affected Redis node | 2-5 minutes via error rate alarm |
| 2 | Multi-agent routing loop (classifier oscillation) | P2 — High | Single user session, but burns tokens | 30-60 seconds via iteration counter |
| 3 | MCP tool timeout causing hallucination | P2 — High | Single user gets wrong answer | Post-hoc via quality audit |
| 4 | DynamoDB session state size limit exceeded | P3 — Medium | Users with very long sessions | Immediate on write failure |
| 5 | Agent Squad coordination deadlock | P1 — Critical | Multiple concurrent sessions | 1-3 minutes via latency spike |
Scenario 1: Agent Memory Corruption (Redis Deserialization Failure)
Problem
During a Redis ElastiCache maintenance window, connections are briefly interrupted. When sessions resume, the Strands Agent loads corrupted or partially-written session data from Redis, causing json.JSONDecodeError on deserialization. The agent crashes or hallucinates because it receives garbage context instead of a clean conversation history.
Detection
flowchart TD
A["CloudWatch Alarm:<br/>manga_session_errors > 50/min"] --> B{"Check error type<br/>in CloudWatch Logs"}
B -->|"JSONDecodeError"| C["Redis Deserialization<br/>Failure Confirmed"]
B -->|"RedisConnectionError"| D["Redis Connectivity<br/>Issue — Different Runbook"]
B -->|"KeyError / TypeError"| E["Schema Mismatch —<br/>Deployment Issue"]
C --> F{"Check Redis<br/>cluster events"}
F -->|"Maintenance window<br/>or failover"| G["ROOT CAUSE:<br/>Partial writes during<br/>Redis failover"]
F -->|"No events"| H["Check for<br/>concurrent write<br/>race condition"]
G --> I["Execute Runbook 1"]
H --> I
Root Cause
Redis does not provide transactional guarantees across multi-key operations. During a failover, a session write (RPUSH for the turn + EXPIRE for the TTL) can be partially applied. The turn data is written but the key expires before the new TTL is set, or the data is truncated mid-write. When the agent reads this corrupted data, json.loads() fails.
Resolution
"""
Runbook 1: Resilient Redis session deserialization with corruption recovery.
Handles partial writes, truncated JSON, and stale data gracefully.
"""
import json
import logging
import time
from typing import Optional
import redis
logger = logging.getLogger("manga_session_recovery")
class ResilientSessionLoader:
"""
Loads session data from Redis with corruption detection and recovery.
Falls back to a clean session if data is unrecoverable.
"""
def __init__(self, redis_client: redis.Redis):
self._redis = redis_client
self._corruption_counter = 0
def load_session(self, session_id: str) -> dict:
"""
Load session with multi-layer corruption protection.
Recovery cascade:
1. Try normal deserialization
2. Try loading from backup key
3. Try partial recovery (trim corrupted tail)
4. Return clean empty session
"""
key = f"manga:session:{session_id}:turns"
# Attempt 1: Normal load
try:
raw_turns = self._redis.lrange(key, 0, -1)
turns = []
for raw in raw_turns:
turns.append(json.loads(raw))
return {"turns": turns, "source": "primary", "recovered": False}
except (json.JSONDecodeError, TypeError) as e:
logger.warning(
"Session %s primary load failed: %s", session_id, str(e)
)
# Attempt 2: Load from backup key (written on every successful save)
backup_key = f"manga:session:{session_id}:backup"
try:
backup_raw = self._redis.get(backup_key)
if backup_raw:
backup_data = json.loads(backup_raw)
logger.info("Session %s recovered from backup", session_id)
# Restore primary from backup
self._restore_from_backup(key, backup_data)
return {"turns": backup_data, "source": "backup", "recovered": True}
except (json.JSONDecodeError, TypeError):
logger.warning("Session %s backup also corrupted", session_id)
# Attempt 3: Partial recovery — load what we can
try:
raw_turns = self._redis.lrange(key, 0, -1)
valid_turns = []
for raw in raw_turns:
try:
valid_turns.append(json.loads(raw))
except json.JSONDecodeError:
continue # Skip corrupted entries
if valid_turns:
logger.info(
"Session %s partially recovered: %d/%d turns",
session_id, len(valid_turns), len(raw_turns),
)
return {
"turns": valid_turns,
"source": "partial_recovery",
"recovered": True,
}
except redis.RedisError:
pass
# Attempt 4: Clean slate
self._corruption_counter += 1
logger.error(
"Session %s unrecoverable — starting fresh (corruption #%d)",
session_id, self._corruption_counter,
)
self._redis.delete(key, backup_key)
return {"turns": [], "source": "clean_slate", "recovered": True}
def save_session_with_backup(
self, session_id: str, turns: list[dict]
) -> None:
"""
Save session data with an atomic backup.
Uses Redis pipeline to minimize partial-write window.
"""
key = f"manga:session:{session_id}:turns"
backup_key = f"manga:session:{session_id}:backup"
ttl = 1800 # 30 minutes
pipe = self._redis.pipeline(transaction=True)
try:
# Write backup first (single key, atomic)
pipe.setex(backup_key, ttl, json.dumps(turns, default=str))
# Clear and rewrite primary
pipe.delete(key)
for turn in turns:
pipe.rpush(key, json.dumps(turn, default=str))
pipe.expire(key, ttl)
pipe.execute()
except redis.RedisError as e:
logger.error("Failed to save session %s: %s", session_id, str(e))
raise
def _restore_from_backup(self, key: str, backup_data: list) -> None:
"""Restore primary session from backup data."""
pipe = self._redis.pipeline(transaction=True)
pipe.delete(key)
for turn in backup_data:
pipe.rpush(key, json.dumps(turn, default=str))
pipe.expire(key, 1800)
pipe.execute()
Prevention
- Use Redis pipelines with transactions for multi-key writes to reduce the partial-write window.
- Maintain a backup key (single JSON blob) updated on every successful save.
- Enable Redis persistence (AOF with
appendfsync everysec) to survive restarts. - Deploy Redis in Multi-AZ with automatic failover; test failover scenarios quarterly.
- Add
try/exceptaround everyjson.loadscall with a fallback path.
Scenario 2: Multi-Agent Routing Loop (Classifier Oscillation)
Problem
A user sends an ambiguous message like "What about that manga?" The Agent Squad classifier routes to ProductSearchAgent (confidence 0.55), which cannot resolve the query and returns a clarification. The classifier then re-evaluates and routes to MangaQAAgent (confidence 0.52), which also fails. The classifier bounces back to ProductSearchAgent, creating an infinite routing loop that burns Haiku tokens at $0.25/1M.
Detection
flowchart TD
A["CloudWatch Metric:<br/>agent_routing_count > 3<br/>for single session turn"] --> B{"Check routing log<br/>for the session"}
B -->|"Same 2 agents alternating"| C["Routing Oscillation<br/>Confirmed"]
B -->|"Different agents each time"| D["Legitimate multi-agent<br/>query — not a loop"]
C --> E{"Check classifier<br/>confidence scores"}
E -->|"All scores < 0.70"| F["ROOT CAUSE:<br/>Ambiguous query with<br/>no confident classification"]
E -->|"Scores fluctuating"| G["ROOT CAUSE:<br/>Agent responses modifying<br/>context and shifting classification"]
F --> H["Execute Runbook 2"]
G --> H
Root Cause
The Agent Squad classifier uses the conversation history (including the previous agent's response) as context. When Agent A returns a clarification, the classifier re-reads the full context and sometimes reclassifies to Agent B. Agent B's response further shifts the context, causing it to reclassify back to Agent A. The confidence never exceeds the threshold because the query is genuinely ambiguous.
Resolution
"""
Runbook 2: Routing loop detection and circuit-breaking for Agent Squad.
Prevents infinite oscillation between agents on ambiguous queries.
"""
import time
import logging
from dataclasses import dataclass, field
from collections import Counter
logger = logging.getLogger("manga_routing_guard")
@dataclass
class RoutingLoopDetector:
"""
Detects and breaks routing loops in the Agent Squad.
Tracks routing decisions per session and enforces limits.
"""
max_routes_per_turn: int = 3
max_same_pair_oscillations: int = 2
# Per-session tracking
_turn_routes: dict[str, list[str]] = field(default_factory=dict)
def record_route(self, session_id: str, agent_name: str) -> None:
"""Record a routing decision."""
if session_id not in self._turn_routes:
self._turn_routes[session_id] = []
self._turn_routes[session_id].append(agent_name)
def is_loop_detected(self, session_id: str) -> bool:
"""Check if a routing loop is happening."""
routes = self._turn_routes.get(session_id, [])
# Check 1: Too many routes in a single turn
if len(routes) > self.max_routes_per_turn:
logger.warning(
"Session %s: routing loop — %d routes in single turn",
session_id, len(routes),
)
return True
# Check 2: Oscillation between same two agents
if len(routes) >= 4:
last_four = routes[-4:]
if last_four[0] == last_four[2] and last_four[1] == last_four[3]:
logger.warning(
"Session %s: oscillation between %s and %s",
session_id, last_four[0], last_four[1],
)
return True
return False
def get_fallback_agent(self, session_id: str) -> str:
"""
When a loop is detected, choose the best fallback agent.
Strategy: Use the agent that appeared most in the routing history.
"""
routes = self._turn_routes.get(session_id, [])
if not routes:
return "ProductSearchAgent" # Default fallback
counter = Counter(routes)
most_common = counter.most_common(1)[0][0]
return most_common
def clear_turn(self, session_id: str) -> None:
"""Clear routing history for a new turn."""
self._turn_routes.pop(session_id, None)
class LoopBreakingOrchestrator:
"""
Wraps the Agent Squad orchestrator with loop detection.
When a loop is detected, forces a single agent and asks for clarification.
"""
CLARIFICATION_PROMPT = (
"I want to help but I'm not sure exactly what you're looking for. "
"Could you tell me more? For example:\n"
"- Are you looking for a specific manga title?\n"
"- Do you want to check an order status?\n"
"- Would you like a recommendation?\n"
)
def __init__(self, orchestrator, loop_detector: RoutingLoopDetector):
self._orchestrator = orchestrator
self._detector = loop_detector
async def route_with_guard(
self, message: str, user_id: str, session_id: str
) -> dict:
"""Route a message with loop detection."""
if self._detector.is_loop_detected(session_id):
fallback_agent = self._detector.get_fallback_agent(session_id)
logger.info(
"Loop detected for session %s — forcing %s with clarification",
session_id, fallback_agent,
)
self._detector.clear_turn(session_id)
return {
"response": self.CLARIFICATION_PROMPT,
"agent": fallback_agent,
"loop_broken": True,
}
# Normal routing
response = await self._orchestrator.route_request(
user_input=message,
user_id=user_id,
session_id=session_id,
)
agent_name = response.metadata.get("agent_name", "unknown")
self._detector.record_route(session_id, agent_name)
return {
"response": str(response),
"agent": agent_name,
"loop_broken": False,
}
Prevention
- Hard cap on routing attempts per turn (max 3) — after 3 routes, force a clarification response.
- Pin the agent for follow-up turns — once an agent is selected, subsequent messages in the same intent go to the same agent unless the user explicitly changes topic.
- Raise the confidence threshold from 0.50 to 0.70 — ambiguous queries below threshold trigger a clarification instead of a best-guess route.
- Exclude previous agent responses from classifier context — the classifier should only see user messages, not agent responses.
Scenario 3: MCP Tool Timeout Causing Hallucination
Problem
The search_manga_catalog MCP tool times out after 800ms because OpenSearch Serverless is experiencing cold-start latency. The agent receives a timeout error but, instead of reporting the failure to the user, the reasoning engine hallucinates manga titles that do not exist in the catalog. The user sees plausible-looking but completely fabricated results.
Detection
flowchart TD
A["User reports: 'I can't find<br/>the manga you suggested'"] --> B["Support team checks<br/>product catalog"]
B -->|"Title does not exist"| C["Hallucination<br/>Confirmed"]
C --> D{"Check reasoning trace<br/>for the session"}
D --> E["Trace shows:<br/>search_manga_catalog TIMEOUT<br/>at iteration 1"]
E --> F{"Did agent continue<br/>reasoning after timeout?"}
F -->|"Yes — generated answer<br/>without tool data"| G["ROOT CAUSE:<br/>Agent reasoning continued<br/>after tool timeout without<br/>grounding data"]
F -->|"No — returned error"| H["Different issue —<br/>check response pipeline"]
G --> I["Execute Runbook 3"]
Root Cause
The Strands Agent receives a timeout error from the MCP tool and logs it as an observation ("Tool timed out"). However, the reasoning engine interprets this as "no results found" rather than "search was not performed." It then uses its parametric knowledge (Claude's training data) to generate manga recommendations, producing titles that may not exist in the MangaAssist catalog.
Resolution
"""
Runbook 3: Prevent hallucination after MCP tool timeout.
Force the agent to acknowledge the tool failure instead of
inventing results from parametric knowledge.
"""
import json
import logging
logger = logging.getLogger("manga_hallucination_guard")
class ToolTimeoutGuard:
"""
Intercepts tool timeout responses and injects explicit instructions
that prevent the agent from hallucinating replacement data.
"""
# Tools where hallucination after timeout is dangerous
GROUNDING_REQUIRED_TOOLS = {
"search_manga_catalog",
"get_manga_details",
"lookup_order",
"check_inventory",
}
TIMEOUT_OBSERVATION_TEMPLATE = (
"TOOL TIMEOUT: The {tool_name} tool did not respond in time. "
"CRITICAL: You MUST NOT generate manga titles, prices, availability, "
"or order information from your own knowledge. Instead, you MUST either:\n"
"1. Retry the tool if budget allows (remaining: {remaining_ms:.0f}ms)\n"
"2. Tell the user: 'I'm having trouble searching right now. "
"Please try again in a moment.'\n"
"3. Offer cached popular picks if available.\n\n"
"DO NOT invent manga titles or details."
)
def transform_timeout_observation(
self,
tool_name: str,
original_error: dict,
remaining_budget_ms: float,
) -> str:
"""
Transform a raw timeout error into a strongly-worded observation
that prevents the agent from hallucinating.
"""
if tool_name not in self.GROUNDING_REQUIRED_TOOLS:
return json.dumps(original_error)
return self.TIMEOUT_OBSERVATION_TEMPLATE.format(
tool_name=tool_name,
remaining_ms=remaining_budget_ms,
)
def validate_response_grounding(
self,
response_text: str,
tool_results: list[dict],
failed_tools: list[str],
) -> dict:
"""
Post-generation check: verify that any manga titles mentioned
in the response actually came from tool results.
Returns validation result with flagged hallucinations.
"""
# Extract titles from successful tool results
grounded_titles = set()
for result in tool_results:
if isinstance(result, dict):
for item in result.get("results", []):
grounded_titles.add(item.get("title", "").lower())
if "title" in result:
grounded_titles.add(result["title"].lower())
# If critical tools failed and response mentions specific titles,
# flag as potential hallucination
if failed_tools and any(
t in self.GROUNDING_REQUIRED_TOOLS for t in failed_tools
):
# Simple check: if response mentions specific manga details
# but no successful search was performed
suspicious_patterns = [
"available", "in stock", "$", "volumes",
"rating", "price", "order",
]
flags = []
response_lower = response_text.lower()
for pattern in suspicious_patterns:
if pattern in response_lower and not grounded_titles:
flags.append(f"Response mentions '{pattern}' but no search succeeded")
if flags:
logger.warning(
"Potential hallucination detected: %s", flags
)
return {
"grounded": False,
"flags": flags,
"action": "REPLACE_WITH_FALLBACK",
}
return {"grounded": True, "flags": [], "action": "PASS"}
Prevention
- Inject explicit anti-hallucination instructions into the timeout observation — tell the agent it MUST NOT generate product data from memory.
- Post-generation grounding check — verify that any product details in the response came from actual tool results.
- System prompt reinforcement — add "Never generate manga titles from your training data. Always use tool results." to the system prompt.
- Separate "tool failure" from "no results" — use distinct observation types so the agent knows the difference.
Scenario 4: DynamoDB Session State Size Limit Exceeded
Problem
A power user has a 2-hour conversation with 150+ turns. The session state, including full tool call history and shared results, exceeds DynamoDB's 400KB item size limit. The PutItem call fails with ValidationException: Item size has exceeded the maximum allowed size, and the session can no longer be saved. Subsequent turns lose context.
Detection
flowchart TD
A["CloudWatch Alarm:<br/>DynamoDB ValidationException<br/>spike on manga_sessions table"] --> B{"Check error message"}
B -->|"Item size exceeded"| C["Session Size Limit<br/>Confirmed"]
B -->|"Other validation error"| D["Different issue"]
C --> E{"Query affected sessions"}
E --> F["Sessions with<br/>turn_count > 100<br/>or active_duration > 1h"]
F --> G["ROOT CAUSE:<br/>Unbounded conversation history<br/>accumulation in session item"]
G --> H["Execute Runbook 4"]
Root Cause
The long-term memory system stores the complete conversation history (all turns, tool call results, and shared context) as a single DynamoDB item. Each turn adds ~1-3KB (user message + agent response + tool results). After 150 turns, the item grows to ~300-450KB, exceeding the 400KB limit. The system has no automatic compaction or overflow mechanism.
Resolution
"""
Runbook 4: DynamoDB session size management with automatic compaction
and overflow to S3 for large sessions.
"""
import json
import sys
import time
import logging
import hashlib
from typing import Any
import boto3
from decimal import Decimal
logger = logging.getLogger("manga_session_size")
dynamodb = boto3.resource("dynamodb", region_name="us-east-1")
s3_client = boto3.client("s3", region_name="us-east-1")
sessions_table = dynamodb.Table("manga_sessions")
MAX_ITEM_SIZE_BYTES = 380_000 # Leave 20KB headroom from 400KB limit
COMPACTION_THRESHOLD_TURNS = 50
OVERFLOW_BUCKET = "manga-session-overflow"
class SessionSizeManager:
"""
Manages DynamoDB session item size with compaction and overflow.
Strategy:
1. After 50 turns, summarize older turns into a compact summary.
2. If still over limit, overflow full history to S3 and keep
only the summary + last 10 turns in DynamoDB.
3. On read, reconstruct from DynamoDB + S3 if needed.
"""
def estimate_item_size(self, item: dict) -> int:
"""Estimate DynamoDB item size in bytes."""
return len(json.dumps(item, default=str).encode("utf-8"))
async def save_session(
self, session_id: str, session_data: dict
) -> None:
"""Save session with automatic size management."""
estimated_size = self.estimate_item_size(session_data)
if estimated_size <= MAX_ITEM_SIZE_BYTES:
# Normal save — fits in DynamoDB
sessions_table.put_item(Item=self._prepare_item(session_data))
return
logger.info(
"Session %s estimated at %d bytes — compacting",
session_id, estimated_size,
)
# Step 1: Compact old turns into a summary
compacted = await self._compact_history(session_data)
compacted_size = self.estimate_item_size(compacted)
if compacted_size <= MAX_ITEM_SIZE_BYTES:
sessions_table.put_item(Item=self._prepare_item(compacted))
logger.info(
"Session %s compacted: %d -> %d bytes",
session_id, estimated_size, compacted_size,
)
return
# Step 2: Overflow to S3
logger.info(
"Session %s still %d bytes after compaction — overflowing to S3",
session_id, compacted_size,
)
await self._overflow_to_s3(session_id, session_data, compacted)
async def _compact_history(self, session_data: dict) -> dict:
"""Summarize old turns, keeping only the last 10 in full."""
turns = session_data.get("turns", [])
if len(turns) <= 10:
return session_data
old_turns = turns[:-10]
recent_turns = turns[-10:]
# Build a summary of old turns
summary_parts = []
topics_seen = set()
for turn in old_turns:
content = turn.get("content", "")[:100]
if turn.get("role") == "user":
summary_parts.append(f"User asked about: {content}")
if turn.get("agent_name"):
topics_seen.add(turn["agent_name"])
summary = (
f"Conversation summary ({len(old_turns)} earlier turns): "
f"Topics covered by agents: {', '.join(topics_seen)}. "
f"Key exchanges: {'; '.join(summary_parts[:10])}"
)
compacted = dict(session_data)
compacted["turns"] = recent_turns
compacted["history_summary"] = summary
compacted["compacted_turn_count"] = len(old_turns)
return compacted
async def _overflow_to_s3(
self,
session_id: str,
full_data: dict,
compacted_data: dict,
) -> None:
"""Overflow full history to S3 and keep compacted data in DynamoDB."""
# Write full history to S3
s3_key = f"sessions/{session_id}/{int(time.time())}.json"
s3_client.put_object(
Bucket=OVERFLOW_BUCKET,
Key=s3_key,
Body=json.dumps(full_data, default=str),
ContentType="application/json",
ServerSideEncryption="aws:kms",
)
# Keep only summary + last 5 turns in DynamoDB
minimal = dict(compacted_data)
minimal["turns"] = compacted_data.get("turns", [])[-5:]
minimal["s3_overflow_key"] = s3_key
minimal["overflow_at"] = time.time()
sessions_table.put_item(Item=self._prepare_item(minimal))
logger.info(
"Session %s overflowed to s3://%s/%s",
session_id, OVERFLOW_BUCKET, s3_key,
)
def _prepare_item(self, data: dict) -> dict:
"""Prepare a dict for DynamoDB (convert floats to Decimal)."""
return json.loads(json.dumps(data, default=str), parse_float=Decimal)
Prevention
- Set a hard limit of 50 full turns in the DynamoDB item — compact older turns automatically.
- Monitor item sizes with a CloudWatch custom metric: log the estimated byte size on every save.
- Use S3 as the overflow tier for full conversation history; keep only a summary and recent turns in DynamoDB.
- Set a session timeout at 30 minutes of inactivity — long sessions should be split into new sessions with a summary handoff.
Scenario 5: Agent Squad Coordination Deadlock
Problem
Two concurrent requests from the same user arrive within milliseconds. Both are routed to ProductSearchAgent and RecommendationAgent respectively. Both agents try to read and write the same session state in Redis simultaneously. Agent A reads the state, Agent B reads the same state, Agent A writes its update, Agent B writes its update (overwriting Agent A's changes). The session state becomes inconsistent, and both agents enter a retry loop trying to reconcile conflicting state.
Detection
flowchart TD
A["CloudWatch Alarm:<br/>p99 latency > 5s on<br/>Agent Squad responses"] --> B{"Check concurrent<br/>request logs"}
B -->|"Multiple requests same<br/>session_id within 100ms"| C["Concurrent Session<br/>Access Confirmed"]
B -->|"Single request"| D["Latency issue —<br/>different runbook"]
C --> E{"Check Redis WATCH<br/>failure logs"}
E -->|"WatchError exceptions"| F["ROOT CAUSE:<br/>Optimistic locking conflict<br/>on session state"]
E -->|"No WATCH errors"| G["ROOT CAUSE:<br/>Last-writer-wins<br/>state corruption"]
F --> H["Execute Runbook 5"]
G --> H
Root Cause
WebSocket connections can deliver multiple messages in rapid succession. When both messages are routed to different agents, they run concurrently on the ECS Fargate task. Both agents read the session state from Redis, modify it independently, and write back. Without distributed locking, the last write wins, discarding the other agent's updates.
Resolution
"""
Runbook 5: Distributed session locking for concurrent Agent Squad access.
Uses Redis WATCH/MULTI for optimistic locking with exponential backoff.
"""
import time
import json
import logging
import asyncio
from typing import Optional, Callable
import redis
logger = logging.getLogger("manga_session_lock")
class DistributedSessionLock:
"""
Optimistic locking for MangaAssist session state using Redis
WATCH/MULTI/EXEC transactions. Prevents concurrent agent updates
from corrupting session state.
"""
MAX_RETRIES = 3
BASE_BACKOFF_MS = 50
def __init__(self, redis_client: redis.Redis):
self._redis = redis_client
async def update_with_lock(
self,
session_id: str,
update_fn: Callable[[dict], dict],
) -> dict:
"""
Perform an atomic read-modify-write on session state.
Uses Redis WATCH to detect concurrent modifications.
If another writer modifies the key between WATCH and EXEC,
the transaction fails and we retry with backoff.
"""
key = f"manga:session:{session_id}:state"
for attempt in range(self.MAX_RETRIES):
try:
# Start watching the key for changes
pipe = self._redis.pipeline(True) # transaction=True
pipe.watch(key)
# Read current state
raw = pipe.get(key)
current_state = json.loads(raw) if raw else {}
# Apply the update function
updated_state = update_fn(current_state)
# Execute the write in a transaction
pipe.multi()
pipe.setex(
key,
1800, # 30-min TTL
json.dumps(updated_state, default=str),
)
pipe.execute()
logger.debug(
"Session %s updated (attempt %d)", session_id, attempt + 1
)
return updated_state
except redis.WatchError:
# Another writer modified the key — retry
backoff_ms = self.BASE_BACKOFF_MS * (2 ** attempt)
logger.info(
"Session %s WATCH conflict (attempt %d) — "
"retrying in %dms",
session_id, attempt + 1, backoff_ms,
)
await asyncio.sleep(backoff_ms / 1000)
except redis.RedisError as e:
logger.error(
"Session %s lock error: %s", session_id, str(e)
)
raise
# All retries exhausted
logger.error(
"Session %s lock failed after %d attempts — "
"falling back to last-writer-wins",
session_id, self.MAX_RETRIES,
)
# Fallback: just write (last-writer-wins is better than failing)
raw = self._redis.get(key)
current_state = json.loads(raw) if raw else {}
updated_state = update_fn(current_state)
self._redis.setex(
key, 1800, json.dumps(updated_state, default=str)
)
return updated_state
class RequestSerializer:
"""
Serializes concurrent requests for the same session.
Uses a per-session asyncio.Lock to ensure only one agent
processes a request at a time per session.
"""
def __init__(self):
self._locks: dict[str, asyncio.Lock] = {}
def get_lock(self, session_id: str) -> asyncio.Lock:
"""Get or create a per-session lock."""
if session_id not in self._locks:
self._locks[session_id] = asyncio.Lock()
return self._locks[session_id]
async def serialize(
self,
session_id: str,
handler: Callable,
*args,
**kwargs,
):
"""
Serialize handler execution for a session.
Second concurrent request waits for the first to complete.
"""
lock = self.get_lock(session_id)
async with lock:
return await handler(*args, **kwargs)
def cleanup(self, session_id: str) -> None:
"""Remove lock when session ends."""
self._locks.pop(session_id, None)
Prevention
- Serialize requests per session — use an asyncio lock so only one agent processes a message per session at a time. The second message waits for the first to complete.
- Use Redis WATCH/MULTI for optimistic locking on session state writes.
- API Gateway request throttling — limit to 1 concurrent request per WebSocket connection.
- Add a
versionfield to session state — each write increments the version; reject writes with stale versions.
Cross-Scenario Decision Tree
flowchart TD
START["Agent System Alert"] --> Q1{"What is the<br/>error type?"}
Q1 -->|"Deserialization / JSON error"| S1["Scenario 1:<br/>Memory Corruption"]
Q1 -->|"High iteration count"| S2["Scenario 2:<br/>Routing Loop"]
Q1 -->|"User reports wrong data"| S3["Scenario 3:<br/>Hallucination"]
Q1 -->|"DynamoDB ValidationException"| S4["Scenario 4:<br/>Session Size"]
Q1 -->|"Latency spike with<br/>concurrent requests"| S5["Scenario 5:<br/>Deadlock"]
S1 --> R1["Load from backup key<br/>→ Partial recovery<br/>→ Clean slate"]
S2 --> R2["Detect oscillation<br/>→ Force clarification<br/>→ Pin agent"]
S3 --> R3["Inject anti-hallucination<br/>→ Post-generation check<br/>→ Grounding validation"]
S4 --> R4["Compact old turns<br/>→ Overflow to S3<br/>→ Monitor item size"]
S5 --> R5["Redis WATCH locking<br/>→ Request serialization<br/>→ Version field"]
Runbook Summary Table
| Scenario | Detection Signal | Immediate Action | Long-Term Fix | Owner |
|---|---|---|---|---|
| 1. Memory Corruption | JSONDecodeError rate spike |
Load from backup key; fall back to clean session | Redis pipeline transactions + backup key on every save | Platform Team |
| 2. Routing Loop | Routing count > 3 per turn | Force clarification response; pin agent | Raise confidence threshold; exclude agent responses from classifier | ML Team |
| 3. Tool Timeout Hallucination | User reports non-existent manga | Replace response with fallback; flag session for review | Anti-hallucination observation injection; post-generation grounding check | AI Safety Team |
| 4. Session Size Exceeded | DynamoDB ValidationException |
Compact old turns; overflow to S3 | Auto-compaction at 50 turns; session timeout at 30 min | Backend Team |
| 5. Coordination Deadlock | p99 latency > 5s with concurrent requests | Restart affected ECS tasks | Per-session request serialization; Redis optimistic locking | Platform Team |
Key Takeaways
-
Memory corruption is inevitable with Redis — always have a recovery cascade (backup key, partial recovery, clean slate). Never assume
json.loads()will succeed. -
Routing loops are an emergent behavior of multi-agent systems — they are not bugs in any single agent but in the interaction between the classifier and agent responses. Detection must happen at the orchestrator level, not the agent level.
-
Tool timeout and hallucination are linked — when a grounding tool fails, the model defaults to its parametric knowledge, which may not match the actual catalog. The fix is both technical (anti-hallucination instructions) and architectural (post-generation grounding checks).
-
DynamoDB's 400KB limit is a hard wall — design for it from day one with compaction and overflow strategies. Do not wait for the first failure in production.
-
Concurrent access is the norm at 1M messages/day — especially on WebSocket connections where users send rapid follow-up messages. Per-session locking (not per-request) is essential for state consistency.
Previous file: 02-mcp-agent-tool-interactions.md Back to overview: 01-autonomous-agent-architecture.md