LOCAL PREVIEW View on GitHub

Scenarios and Runbooks — Intelligent Autonomous Systems

MangaAssist context: JP Manga store chatbot on AWS — Bedrock Claude 3 (Sonnet at $3/$15 per 1M tokens input/output, Haiku at $0.25/$1.25), OpenSearch Serverless (vector store), DynamoDB (sessions/products), ECS Fargate (orchestrator), API Gateway WebSocket, ElastiCache Redis. Target: useful answer in under 3 seconds, 1M messages/day scale.


Skill Mapping

Dimension Detail
Certification AWS AIP-C01 — AI Practitioner
Domain 2 — Development and Implementation of GenAI Solutions
Task 2.1 — Develop agentic AI solutions using AWS services
Skill 2.1.1 — Develop intelligent autonomous systems with appropriate memory and state management capabilities
This File Five production scenarios with detection flowcharts, root cause analysis, resolution code, and prevention strategies

Skill Scope Statement

This file presents five real-world failure scenarios that MangaAssist has encountered (or would encounter) in production with autonomous agents, multi-agent routing, MCP tool interactions, and session state management. Each scenario includes: a problem statement, a mermaid detection flowchart, root cause analysis, Python resolution code, and prevention measures. These runbooks are designed for on-call engineers responding to alerts.


Mind Map — Autonomous Agent Failure Modes

mindmap
  root((Agent Failure<br/>Scenarios))
    Memory Corruption
      Redis Deserialization
      TTL Race Condition
      Stale Context Injection
    Multi-Agent Routing Loop
      Classifier Oscillation
      Confidence Below Threshold
      Agent Ping-Pong
    MCP Tool Timeout
      Cascading Timeout
      Hallucinated Result
      Partial Data Answer
    DynamoDB Session Size
      400KB Item Limit
      History Accumulation
      Serialization Bloat
    Agent Squad Deadlock
      Circular Dependency
      Shared Resource Contention
      Supervisor Starvation

Scenario Overview

# Scenario Severity Blast Radius Typical Detection Time
1 Agent memory corruption (Redis deserialization failure) P1 — Critical All sessions on affected Redis node 2-5 minutes via error rate alarm
2 Multi-agent routing loop (classifier oscillation) P2 — High Single user session, but burns tokens 30-60 seconds via iteration counter
3 MCP tool timeout causing hallucination P2 — High Single user gets wrong answer Post-hoc via quality audit
4 DynamoDB session state size limit exceeded P3 — Medium Users with very long sessions Immediate on write failure
5 Agent Squad coordination deadlock P1 — Critical Multiple concurrent sessions 1-3 minutes via latency spike

Scenario 1: Agent Memory Corruption (Redis Deserialization Failure)

Problem

During a Redis ElastiCache maintenance window, connections are briefly interrupted. When sessions resume, the Strands Agent loads corrupted or partially-written session data from Redis, causing json.JSONDecodeError on deserialization. The agent crashes or hallucinates because it receives garbage context instead of a clean conversation history.

Detection

flowchart TD
    A["CloudWatch Alarm:<br/>manga_session_errors > 50/min"] --> B{"Check error type<br/>in CloudWatch Logs"}
    B -->|"JSONDecodeError"| C["Redis Deserialization<br/>Failure Confirmed"]
    B -->|"RedisConnectionError"| D["Redis Connectivity<br/>Issue — Different Runbook"]
    B -->|"KeyError / TypeError"| E["Schema Mismatch —<br/>Deployment Issue"]
    C --> F{"Check Redis<br/>cluster events"}
    F -->|"Maintenance window<br/>or failover"| G["ROOT CAUSE:<br/>Partial writes during<br/>Redis failover"]
    F -->|"No events"| H["Check for<br/>concurrent write<br/>race condition"]
    G --> I["Execute Runbook 1"]
    H --> I

Root Cause

Redis does not provide transactional guarantees across multi-key operations. During a failover, a session write (RPUSH for the turn + EXPIRE for the TTL) can be partially applied. The turn data is written but the key expires before the new TTL is set, or the data is truncated mid-write. When the agent reads this corrupted data, json.loads() fails.

Resolution

"""
Runbook 1: Resilient Redis session deserialization with corruption recovery.
Handles partial writes, truncated JSON, and stale data gracefully.
"""

import json
import logging
import time
from typing import Optional

import redis

logger = logging.getLogger("manga_session_recovery")


class ResilientSessionLoader:
    """
    Loads session data from Redis with corruption detection and recovery.
    Falls back to a clean session if data is unrecoverable.
    """

    def __init__(self, redis_client: redis.Redis):
        self._redis = redis_client
        self._corruption_counter = 0

    def load_session(self, session_id: str) -> dict:
        """
        Load session with multi-layer corruption protection.

        Recovery cascade:
        1. Try normal deserialization
        2. Try loading from backup key
        3. Try partial recovery (trim corrupted tail)
        4. Return clean empty session
        """
        key = f"manga:session:{session_id}:turns"

        # Attempt 1: Normal load
        try:
            raw_turns = self._redis.lrange(key, 0, -1)
            turns = []
            for raw in raw_turns:
                turns.append(json.loads(raw))
            return {"turns": turns, "source": "primary", "recovered": False}
        except (json.JSONDecodeError, TypeError) as e:
            logger.warning(
                "Session %s primary load failed: %s", session_id, str(e)
            )

        # Attempt 2: Load from backup key (written on every successful save)
        backup_key = f"manga:session:{session_id}:backup"
        try:
            backup_raw = self._redis.get(backup_key)
            if backup_raw:
                backup_data = json.loads(backup_raw)
                logger.info("Session %s recovered from backup", session_id)
                # Restore primary from backup
                self._restore_from_backup(key, backup_data)
                return {"turns": backup_data, "source": "backup", "recovered": True}
        except (json.JSONDecodeError, TypeError):
            logger.warning("Session %s backup also corrupted", session_id)

        # Attempt 3: Partial recovery — load what we can
        try:
            raw_turns = self._redis.lrange(key, 0, -1)
            valid_turns = []
            for raw in raw_turns:
                try:
                    valid_turns.append(json.loads(raw))
                except json.JSONDecodeError:
                    continue  # Skip corrupted entries
            if valid_turns:
                logger.info(
                    "Session %s partially recovered: %d/%d turns",
                    session_id, len(valid_turns), len(raw_turns),
                )
                return {
                    "turns": valid_turns,
                    "source": "partial_recovery",
                    "recovered": True,
                }
        except redis.RedisError:
            pass

        # Attempt 4: Clean slate
        self._corruption_counter += 1
        logger.error(
            "Session %s unrecoverable — starting fresh (corruption #%d)",
            session_id, self._corruption_counter,
        )
        self._redis.delete(key, backup_key)
        return {"turns": [], "source": "clean_slate", "recovered": True}

    def save_session_with_backup(
        self, session_id: str, turns: list[dict]
    ) -> None:
        """
        Save session data with an atomic backup.
        Uses Redis pipeline to minimize partial-write window.
        """
        key = f"manga:session:{session_id}:turns"
        backup_key = f"manga:session:{session_id}:backup"
        ttl = 1800  # 30 minutes

        pipe = self._redis.pipeline(transaction=True)
        try:
            # Write backup first (single key, atomic)
            pipe.setex(backup_key, ttl, json.dumps(turns, default=str))
            # Clear and rewrite primary
            pipe.delete(key)
            for turn in turns:
                pipe.rpush(key, json.dumps(turn, default=str))
            pipe.expire(key, ttl)
            pipe.execute()
        except redis.RedisError as e:
            logger.error("Failed to save session %s: %s", session_id, str(e))
            raise

    def _restore_from_backup(self, key: str, backup_data: list) -> None:
        """Restore primary session from backup data."""
        pipe = self._redis.pipeline(transaction=True)
        pipe.delete(key)
        for turn in backup_data:
            pipe.rpush(key, json.dumps(turn, default=str))
        pipe.expire(key, 1800)
        pipe.execute()

Prevention

  • Use Redis pipelines with transactions for multi-key writes to reduce the partial-write window.
  • Maintain a backup key (single JSON blob) updated on every successful save.
  • Enable Redis persistence (AOF with appendfsync everysec) to survive restarts.
  • Deploy Redis in Multi-AZ with automatic failover; test failover scenarios quarterly.
  • Add try/except around every json.loads call with a fallback path.

Scenario 2: Multi-Agent Routing Loop (Classifier Oscillation)

Problem

A user sends an ambiguous message like "What about that manga?" The Agent Squad classifier routes to ProductSearchAgent (confidence 0.55), which cannot resolve the query and returns a clarification. The classifier then re-evaluates and routes to MangaQAAgent (confidence 0.52), which also fails. The classifier bounces back to ProductSearchAgent, creating an infinite routing loop that burns Haiku tokens at $0.25/1M.

Detection

flowchart TD
    A["CloudWatch Metric:<br/>agent_routing_count > 3<br/>for single session turn"] --> B{"Check routing log<br/>for the session"}
    B -->|"Same 2 agents alternating"| C["Routing Oscillation<br/>Confirmed"]
    B -->|"Different agents each time"| D["Legitimate multi-agent<br/>query — not a loop"]
    C --> E{"Check classifier<br/>confidence scores"}
    E -->|"All scores < 0.70"| F["ROOT CAUSE:<br/>Ambiguous query with<br/>no confident classification"]
    E -->|"Scores fluctuating"| G["ROOT CAUSE:<br/>Agent responses modifying<br/>context and shifting classification"]
    F --> H["Execute Runbook 2"]
    G --> H

Root Cause

The Agent Squad classifier uses the conversation history (including the previous agent's response) as context. When Agent A returns a clarification, the classifier re-reads the full context and sometimes reclassifies to Agent B. Agent B's response further shifts the context, causing it to reclassify back to Agent A. The confidence never exceeds the threshold because the query is genuinely ambiguous.

Resolution

"""
Runbook 2: Routing loop detection and circuit-breaking for Agent Squad.
Prevents infinite oscillation between agents on ambiguous queries.
"""

import time
import logging
from dataclasses import dataclass, field
from collections import Counter

logger = logging.getLogger("manga_routing_guard")


@dataclass
class RoutingLoopDetector:
    """
    Detects and breaks routing loops in the Agent Squad.
    Tracks routing decisions per session and enforces limits.
    """
    max_routes_per_turn: int = 3
    max_same_pair_oscillations: int = 2

    # Per-session tracking
    _turn_routes: dict[str, list[str]] = field(default_factory=dict)

    def record_route(self, session_id: str, agent_name: str) -> None:
        """Record a routing decision."""
        if session_id not in self._turn_routes:
            self._turn_routes[session_id] = []
        self._turn_routes[session_id].append(agent_name)

    def is_loop_detected(self, session_id: str) -> bool:
        """Check if a routing loop is happening."""
        routes = self._turn_routes.get(session_id, [])

        # Check 1: Too many routes in a single turn
        if len(routes) > self.max_routes_per_turn:
            logger.warning(
                "Session %s: routing loop — %d routes in single turn",
                session_id, len(routes),
            )
            return True

        # Check 2: Oscillation between same two agents
        if len(routes) >= 4:
            last_four = routes[-4:]
            if last_four[0] == last_four[2] and last_four[1] == last_four[3]:
                logger.warning(
                    "Session %s: oscillation between %s and %s",
                    session_id, last_four[0], last_four[1],
                )
                return True

        return False

    def get_fallback_agent(self, session_id: str) -> str:
        """
        When a loop is detected, choose the best fallback agent.
        Strategy: Use the agent that appeared most in the routing history.
        """
        routes = self._turn_routes.get(session_id, [])
        if not routes:
            return "ProductSearchAgent"  # Default fallback

        counter = Counter(routes)
        most_common = counter.most_common(1)[0][0]
        return most_common

    def clear_turn(self, session_id: str) -> None:
        """Clear routing history for a new turn."""
        self._turn_routes.pop(session_id, None)


class LoopBreakingOrchestrator:
    """
    Wraps the Agent Squad orchestrator with loop detection.
    When a loop is detected, forces a single agent and asks for clarification.
    """

    CLARIFICATION_PROMPT = (
        "I want to help but I'm not sure exactly what you're looking for. "
        "Could you tell me more? For example:\n"
        "- Are you looking for a specific manga title?\n"
        "- Do you want to check an order status?\n"
        "- Would you like a recommendation?\n"
    )

    def __init__(self, orchestrator, loop_detector: RoutingLoopDetector):
        self._orchestrator = orchestrator
        self._detector = loop_detector

    async def route_with_guard(
        self, message: str, user_id: str, session_id: str
    ) -> dict:
        """Route a message with loop detection."""
        if self._detector.is_loop_detected(session_id):
            fallback_agent = self._detector.get_fallback_agent(session_id)
            logger.info(
                "Loop detected for session %s — forcing %s with clarification",
                session_id, fallback_agent,
            )
            self._detector.clear_turn(session_id)
            return {
                "response": self.CLARIFICATION_PROMPT,
                "agent": fallback_agent,
                "loop_broken": True,
            }

        # Normal routing
        response = await self._orchestrator.route_request(
            user_input=message,
            user_id=user_id,
            session_id=session_id,
        )
        agent_name = response.metadata.get("agent_name", "unknown")
        self._detector.record_route(session_id, agent_name)

        return {
            "response": str(response),
            "agent": agent_name,
            "loop_broken": False,
        }

Prevention

  • Hard cap on routing attempts per turn (max 3) — after 3 routes, force a clarification response.
  • Pin the agent for follow-up turns — once an agent is selected, subsequent messages in the same intent go to the same agent unless the user explicitly changes topic.
  • Raise the confidence threshold from 0.50 to 0.70 — ambiguous queries below threshold trigger a clarification instead of a best-guess route.
  • Exclude previous agent responses from classifier context — the classifier should only see user messages, not agent responses.

Scenario 3: MCP Tool Timeout Causing Hallucination

Problem

The search_manga_catalog MCP tool times out after 800ms because OpenSearch Serverless is experiencing cold-start latency. The agent receives a timeout error but, instead of reporting the failure to the user, the reasoning engine hallucinates manga titles that do not exist in the catalog. The user sees plausible-looking but completely fabricated results.

Detection

flowchart TD
    A["User reports: 'I can't find<br/>the manga you suggested'"] --> B["Support team checks<br/>product catalog"]
    B -->|"Title does not exist"| C["Hallucination<br/>Confirmed"]
    C --> D{"Check reasoning trace<br/>for the session"}
    D --> E["Trace shows:<br/>search_manga_catalog TIMEOUT<br/>at iteration 1"]
    E --> F{"Did agent continue<br/>reasoning after timeout?"}
    F -->|"Yes — generated answer<br/>without tool data"| G["ROOT CAUSE:<br/>Agent reasoning continued<br/>after tool timeout without<br/>grounding data"]
    F -->|"No — returned error"| H["Different issue —<br/>check response pipeline"]
    G --> I["Execute Runbook 3"]

Root Cause

The Strands Agent receives a timeout error from the MCP tool and logs it as an observation ("Tool timed out"). However, the reasoning engine interprets this as "no results found" rather than "search was not performed." It then uses its parametric knowledge (Claude's training data) to generate manga recommendations, producing titles that may not exist in the MangaAssist catalog.

Resolution

"""
Runbook 3: Prevent hallucination after MCP tool timeout.
Force the agent to acknowledge the tool failure instead of
inventing results from parametric knowledge.
"""

import json
import logging

logger = logging.getLogger("manga_hallucination_guard")


class ToolTimeoutGuard:
    """
    Intercepts tool timeout responses and injects explicit instructions
    that prevent the agent from hallucinating replacement data.
    """

    # Tools where hallucination after timeout is dangerous
    GROUNDING_REQUIRED_TOOLS = {
        "search_manga_catalog",
        "get_manga_details",
        "lookup_order",
        "check_inventory",
    }

    TIMEOUT_OBSERVATION_TEMPLATE = (
        "TOOL TIMEOUT: The {tool_name} tool did not respond in time. "
        "CRITICAL: You MUST NOT generate manga titles, prices, availability, "
        "or order information from your own knowledge. Instead, you MUST either:\n"
        "1. Retry the tool if budget allows (remaining: {remaining_ms:.0f}ms)\n"
        "2. Tell the user: 'I'm having trouble searching right now. "
        "Please try again in a moment.'\n"
        "3. Offer cached popular picks if available.\n\n"
        "DO NOT invent manga titles or details."
    )

    def transform_timeout_observation(
        self,
        tool_name: str,
        original_error: dict,
        remaining_budget_ms: float,
    ) -> str:
        """
        Transform a raw timeout error into a strongly-worded observation
        that prevents the agent from hallucinating.
        """
        if tool_name not in self.GROUNDING_REQUIRED_TOOLS:
            return json.dumps(original_error)

        return self.TIMEOUT_OBSERVATION_TEMPLATE.format(
            tool_name=tool_name,
            remaining_ms=remaining_budget_ms,
        )

    def validate_response_grounding(
        self,
        response_text: str,
        tool_results: list[dict],
        failed_tools: list[str],
    ) -> dict:
        """
        Post-generation check: verify that any manga titles mentioned
        in the response actually came from tool results.

        Returns validation result with flagged hallucinations.
        """
        # Extract titles from successful tool results
        grounded_titles = set()
        for result in tool_results:
            if isinstance(result, dict):
                for item in result.get("results", []):
                    grounded_titles.add(item.get("title", "").lower())
                if "title" in result:
                    grounded_titles.add(result["title"].lower())

        # If critical tools failed and response mentions specific titles,
        # flag as potential hallucination
        if failed_tools and any(
            t in self.GROUNDING_REQUIRED_TOOLS for t in failed_tools
        ):
            # Simple check: if response mentions specific manga details
            # but no successful search was performed
            suspicious_patterns = [
                "available", "in stock", "$", "volumes",
                "rating", "price", "order",
            ]
            flags = []
            response_lower = response_text.lower()
            for pattern in suspicious_patterns:
                if pattern in response_lower and not grounded_titles:
                    flags.append(f"Response mentions '{pattern}' but no search succeeded")

            if flags:
                logger.warning(
                    "Potential hallucination detected: %s", flags
                )
                return {
                    "grounded": False,
                    "flags": flags,
                    "action": "REPLACE_WITH_FALLBACK",
                }

        return {"grounded": True, "flags": [], "action": "PASS"}

Prevention

  • Inject explicit anti-hallucination instructions into the timeout observation — tell the agent it MUST NOT generate product data from memory.
  • Post-generation grounding check — verify that any product details in the response came from actual tool results.
  • System prompt reinforcement — add "Never generate manga titles from your training data. Always use tool results." to the system prompt.
  • Separate "tool failure" from "no results" — use distinct observation types so the agent knows the difference.

Scenario 4: DynamoDB Session State Size Limit Exceeded

Problem

A power user has a 2-hour conversation with 150+ turns. The session state, including full tool call history and shared results, exceeds DynamoDB's 400KB item size limit. The PutItem call fails with ValidationException: Item size has exceeded the maximum allowed size, and the session can no longer be saved. Subsequent turns lose context.

Detection

flowchart TD
    A["CloudWatch Alarm:<br/>DynamoDB ValidationException<br/>spike on manga_sessions table"] --> B{"Check error message"}
    B -->|"Item size exceeded"| C["Session Size Limit<br/>Confirmed"]
    B -->|"Other validation error"| D["Different issue"]
    C --> E{"Query affected sessions"}
    E --> F["Sessions with<br/>turn_count > 100<br/>or active_duration > 1h"]
    F --> G["ROOT CAUSE:<br/>Unbounded conversation history<br/>accumulation in session item"]
    G --> H["Execute Runbook 4"]

Root Cause

The long-term memory system stores the complete conversation history (all turns, tool call results, and shared context) as a single DynamoDB item. Each turn adds ~1-3KB (user message + agent response + tool results). After 150 turns, the item grows to ~300-450KB, exceeding the 400KB limit. The system has no automatic compaction or overflow mechanism.

Resolution

"""
Runbook 4: DynamoDB session size management with automatic compaction
and overflow to S3 for large sessions.
"""

import json
import sys
import time
import logging
import hashlib
from typing import Any

import boto3
from decimal import Decimal

logger = logging.getLogger("manga_session_size")

dynamodb = boto3.resource("dynamodb", region_name="us-east-1")
s3_client = boto3.client("s3", region_name="us-east-1")
sessions_table = dynamodb.Table("manga_sessions")

MAX_ITEM_SIZE_BYTES = 380_000  # Leave 20KB headroom from 400KB limit
COMPACTION_THRESHOLD_TURNS = 50
OVERFLOW_BUCKET = "manga-session-overflow"


class SessionSizeManager:
    """
    Manages DynamoDB session item size with compaction and overflow.

    Strategy:
    1. After 50 turns, summarize older turns into a compact summary.
    2. If still over limit, overflow full history to S3 and keep
       only the summary + last 10 turns in DynamoDB.
    3. On read, reconstruct from DynamoDB + S3 if needed.
    """

    def estimate_item_size(self, item: dict) -> int:
        """Estimate DynamoDB item size in bytes."""
        return len(json.dumps(item, default=str).encode("utf-8"))

    async def save_session(
        self, session_id: str, session_data: dict
    ) -> None:
        """Save session with automatic size management."""
        estimated_size = self.estimate_item_size(session_data)

        if estimated_size <= MAX_ITEM_SIZE_BYTES:
            # Normal save — fits in DynamoDB
            sessions_table.put_item(Item=self._prepare_item(session_data))
            return

        logger.info(
            "Session %s estimated at %d bytes — compacting",
            session_id, estimated_size,
        )

        # Step 1: Compact old turns into a summary
        compacted = await self._compact_history(session_data)
        compacted_size = self.estimate_item_size(compacted)

        if compacted_size <= MAX_ITEM_SIZE_BYTES:
            sessions_table.put_item(Item=self._prepare_item(compacted))
            logger.info(
                "Session %s compacted: %d -> %d bytes",
                session_id, estimated_size, compacted_size,
            )
            return

        # Step 2: Overflow to S3
        logger.info(
            "Session %s still %d bytes after compaction — overflowing to S3",
            session_id, compacted_size,
        )
        await self._overflow_to_s3(session_id, session_data, compacted)

    async def _compact_history(self, session_data: dict) -> dict:
        """Summarize old turns, keeping only the last 10 in full."""
        turns = session_data.get("turns", [])
        if len(turns) <= 10:
            return session_data

        old_turns = turns[:-10]
        recent_turns = turns[-10:]

        # Build a summary of old turns
        summary_parts = []
        topics_seen = set()
        for turn in old_turns:
            content = turn.get("content", "")[:100]
            if turn.get("role") == "user":
                summary_parts.append(f"User asked about: {content}")
            if turn.get("agent_name"):
                topics_seen.add(turn["agent_name"])

        summary = (
            f"Conversation summary ({len(old_turns)} earlier turns): "
            f"Topics covered by agents: {', '.join(topics_seen)}. "
            f"Key exchanges: {'; '.join(summary_parts[:10])}"
        )

        compacted = dict(session_data)
        compacted["turns"] = recent_turns
        compacted["history_summary"] = summary
        compacted["compacted_turn_count"] = len(old_turns)
        return compacted

    async def _overflow_to_s3(
        self,
        session_id: str,
        full_data: dict,
        compacted_data: dict,
    ) -> None:
        """Overflow full history to S3 and keep compacted data in DynamoDB."""
        # Write full history to S3
        s3_key = f"sessions/{session_id}/{int(time.time())}.json"
        s3_client.put_object(
            Bucket=OVERFLOW_BUCKET,
            Key=s3_key,
            Body=json.dumps(full_data, default=str),
            ContentType="application/json",
            ServerSideEncryption="aws:kms",
        )

        # Keep only summary + last 5 turns in DynamoDB
        minimal = dict(compacted_data)
        minimal["turns"] = compacted_data.get("turns", [])[-5:]
        minimal["s3_overflow_key"] = s3_key
        minimal["overflow_at"] = time.time()

        sessions_table.put_item(Item=self._prepare_item(minimal))
        logger.info(
            "Session %s overflowed to s3://%s/%s",
            session_id, OVERFLOW_BUCKET, s3_key,
        )

    def _prepare_item(self, data: dict) -> dict:
        """Prepare a dict for DynamoDB (convert floats to Decimal)."""
        return json.loads(json.dumps(data, default=str), parse_float=Decimal)

Prevention

  • Set a hard limit of 50 full turns in the DynamoDB item — compact older turns automatically.
  • Monitor item sizes with a CloudWatch custom metric: log the estimated byte size on every save.
  • Use S3 as the overflow tier for full conversation history; keep only a summary and recent turns in DynamoDB.
  • Set a session timeout at 30 minutes of inactivity — long sessions should be split into new sessions with a summary handoff.

Scenario 5: Agent Squad Coordination Deadlock

Problem

Two concurrent requests from the same user arrive within milliseconds. Both are routed to ProductSearchAgent and RecommendationAgent respectively. Both agents try to read and write the same session state in Redis simultaneously. Agent A reads the state, Agent B reads the same state, Agent A writes its update, Agent B writes its update (overwriting Agent A's changes). The session state becomes inconsistent, and both agents enter a retry loop trying to reconcile conflicting state.

Detection

flowchart TD
    A["CloudWatch Alarm:<br/>p99 latency > 5s on<br/>Agent Squad responses"] --> B{"Check concurrent<br/>request logs"}
    B -->|"Multiple requests same<br/>session_id within 100ms"| C["Concurrent Session<br/>Access Confirmed"]
    B -->|"Single request"| D["Latency issue —<br/>different runbook"]
    C --> E{"Check Redis WATCH<br/>failure logs"}
    E -->|"WatchError exceptions"| F["ROOT CAUSE:<br/>Optimistic locking conflict<br/>on session state"]
    E -->|"No WATCH errors"| G["ROOT CAUSE:<br/>Last-writer-wins<br/>state corruption"]
    F --> H["Execute Runbook 5"]
    G --> H

Root Cause

WebSocket connections can deliver multiple messages in rapid succession. When both messages are routed to different agents, they run concurrently on the ECS Fargate task. Both agents read the session state from Redis, modify it independently, and write back. Without distributed locking, the last write wins, discarding the other agent's updates.

Resolution

"""
Runbook 5: Distributed session locking for concurrent Agent Squad access.
Uses Redis WATCH/MULTI for optimistic locking with exponential backoff.
"""

import time
import json
import logging
import asyncio
from typing import Optional, Callable

import redis

logger = logging.getLogger("manga_session_lock")


class DistributedSessionLock:
    """
    Optimistic locking for MangaAssist session state using Redis
    WATCH/MULTI/EXEC transactions. Prevents concurrent agent updates
    from corrupting session state.
    """

    MAX_RETRIES = 3
    BASE_BACKOFF_MS = 50

    def __init__(self, redis_client: redis.Redis):
        self._redis = redis_client

    async def update_with_lock(
        self,
        session_id: str,
        update_fn: Callable[[dict], dict],
    ) -> dict:
        """
        Perform an atomic read-modify-write on session state.

        Uses Redis WATCH to detect concurrent modifications.
        If another writer modifies the key between WATCH and EXEC,
        the transaction fails and we retry with backoff.
        """
        key = f"manga:session:{session_id}:state"

        for attempt in range(self.MAX_RETRIES):
            try:
                # Start watching the key for changes
                pipe = self._redis.pipeline(True)  # transaction=True
                pipe.watch(key)

                # Read current state
                raw = pipe.get(key)
                current_state = json.loads(raw) if raw else {}

                # Apply the update function
                updated_state = update_fn(current_state)

                # Execute the write in a transaction
                pipe.multi()
                pipe.setex(
                    key,
                    1800,  # 30-min TTL
                    json.dumps(updated_state, default=str),
                )
                pipe.execute()

                logger.debug(
                    "Session %s updated (attempt %d)", session_id, attempt + 1
                )
                return updated_state

            except redis.WatchError:
                # Another writer modified the key — retry
                backoff_ms = self.BASE_BACKOFF_MS * (2 ** attempt)
                logger.info(
                    "Session %s WATCH conflict (attempt %d) — "
                    "retrying in %dms",
                    session_id, attempt + 1, backoff_ms,
                )
                await asyncio.sleep(backoff_ms / 1000)

            except redis.RedisError as e:
                logger.error(
                    "Session %s lock error: %s", session_id, str(e)
                )
                raise

        # All retries exhausted
        logger.error(
            "Session %s lock failed after %d attempts — "
            "falling back to last-writer-wins",
            session_id, self.MAX_RETRIES,
        )
        # Fallback: just write (last-writer-wins is better than failing)
        raw = self._redis.get(key)
        current_state = json.loads(raw) if raw else {}
        updated_state = update_fn(current_state)
        self._redis.setex(
            key, 1800, json.dumps(updated_state, default=str)
        )
        return updated_state


class RequestSerializer:
    """
    Serializes concurrent requests for the same session.
    Uses a per-session asyncio.Lock to ensure only one agent
    processes a request at a time per session.
    """

    def __init__(self):
        self._locks: dict[str, asyncio.Lock] = {}

    def get_lock(self, session_id: str) -> asyncio.Lock:
        """Get or create a per-session lock."""
        if session_id not in self._locks:
            self._locks[session_id] = asyncio.Lock()
        return self._locks[session_id]

    async def serialize(
        self,
        session_id: str,
        handler: Callable,
        *args,
        **kwargs,
    ):
        """
        Serialize handler execution for a session.
        Second concurrent request waits for the first to complete.
        """
        lock = self.get_lock(session_id)
        async with lock:
            return await handler(*args, **kwargs)

    def cleanup(self, session_id: str) -> None:
        """Remove lock when session ends."""
        self._locks.pop(session_id, None)

Prevention

  • Serialize requests per session — use an asyncio lock so only one agent processes a message per session at a time. The second message waits for the first to complete.
  • Use Redis WATCH/MULTI for optimistic locking on session state writes.
  • API Gateway request throttling — limit to 1 concurrent request per WebSocket connection.
  • Add a version field to session state — each write increments the version; reject writes with stale versions.

Cross-Scenario Decision Tree

flowchart TD
    START["Agent System Alert"] --> Q1{"What is the<br/>error type?"}
    Q1 -->|"Deserialization / JSON error"| S1["Scenario 1:<br/>Memory Corruption"]
    Q1 -->|"High iteration count"| S2["Scenario 2:<br/>Routing Loop"]
    Q1 -->|"User reports wrong data"| S3["Scenario 3:<br/>Hallucination"]
    Q1 -->|"DynamoDB ValidationException"| S4["Scenario 4:<br/>Session Size"]
    Q1 -->|"Latency spike with<br/>concurrent requests"| S5["Scenario 5:<br/>Deadlock"]
    S1 --> R1["Load from backup key<br/>→ Partial recovery<br/>→ Clean slate"]
    S2 --> R2["Detect oscillation<br/>→ Force clarification<br/>→ Pin agent"]
    S3 --> R3["Inject anti-hallucination<br/>→ Post-generation check<br/>→ Grounding validation"]
    S4 --> R4["Compact old turns<br/>→ Overflow to S3<br/>→ Monitor item size"]
    S5 --> R5["Redis WATCH locking<br/>→ Request serialization<br/>→ Version field"]

Runbook Summary Table

Scenario Detection Signal Immediate Action Long-Term Fix Owner
1. Memory Corruption JSONDecodeError rate spike Load from backup key; fall back to clean session Redis pipeline transactions + backup key on every save Platform Team
2. Routing Loop Routing count > 3 per turn Force clarification response; pin agent Raise confidence threshold; exclude agent responses from classifier ML Team
3. Tool Timeout Hallucination User reports non-existent manga Replace response with fallback; flag session for review Anti-hallucination observation injection; post-generation grounding check AI Safety Team
4. Session Size Exceeded DynamoDB ValidationException Compact old turns; overflow to S3 Auto-compaction at 50 turns; session timeout at 30 min Backend Team
5. Coordination Deadlock p99 latency > 5s with concurrent requests Restart affected ECS tasks Per-session request serialization; Redis optimistic locking Platform Team

Key Takeaways

  1. Memory corruption is inevitable with Redis — always have a recovery cascade (backup key, partial recovery, clean slate). Never assume json.loads() will succeed.

  2. Routing loops are an emergent behavior of multi-agent systems — they are not bugs in any single agent but in the interaction between the classifier and agent responses. Detection must happen at the orchestrator level, not the agent level.

  3. Tool timeout and hallucination are linked — when a grounding tool fails, the model defaults to its parametric knowledge, which may not match the actual catalog. The fix is both technical (anti-hallucination instructions) and architectural (post-generation grounding checks).

  4. DynamoDB's 400KB limit is a hard wall — design for it from day one with compaction and overflow strategies. Do not wait for the first failure in production.

  5. Concurrent access is the norm at 1M messages/day — especially on WebSocket connections where users send rapid follow-up messages. Per-session locking (not per-request) is essential for state consistency.


Previous file: 02-mcp-agent-tool-interactions.md Back to overview: 01-autonomous-agent-architecture.md