LOCAL PREVIEW View on GitHub

Advanced GenAI Architecture — Strands Agents & Multi-Agent Systems

MangaAssist context: JP Manga store chatbot on AWS — Bedrock Claude 3 (Sonnet at $3/$15 per 1M tokens input/output, Haiku at $0.25/$1.25), OpenSearch Serverless (vector store), DynamoDB (sessions/products), ECS Fargate (orchestrator), API Gateway WebSocket, ElastiCache Redis. Target: useful answer in under 3 seconds, 1M messages/day scale.

Skill Mapping

Attribute Detail
Domain 2 — Implementation & Integration of GenAI Applications
Task 2.5 — Application Integration Patterns
Skill 2.5.5 — Advanced GenAI Applications
Focus Strands Agents, Agent Squad, agent design patterns, supervisor/planner-executor, prompt chaining
MangaAssist Scope Agent lifecycle, multi-agent coordination for manga search, recommendations, and order management

Mind Map

mindmap
  root((Advanced GenAI<br/>Architecture))
    Strands Agents
      Agent Lifecycle
        Initialization
        Planning
        Execution
        Reflection
        Termination
      Tool Integration
        Bedrock Tools
        OpenSearch Tools
        DynamoDB Tools
        Redis Cache Tools
      Memory Management
        Short-Term Context
        Long-Term Preferences
        Episodic Memory
    Agent Squad
      Multi-Agent Patterns
        Supervisor Pattern
        Planner-Executor Pattern
        Peer-to-Peer Collaboration
        Hierarchical Delegation
      Coordination
        Message Passing
        Shared State
        Conflict Resolution
        Consensus Protocol
      Routing
        Intent Classification
        Skill-Based Routing
        Load Balancing
        Fallback Chains
    Prompt Chaining
      Chain Types
        Sequential Chains
        Parallel Chains
        Conditional Chains
        Map-Reduce Chains
      Chain Management
        State Propagation
        Error Recovery
        Timeout Handling
        Cost Budgeting
    Design Patterns
      Supervisor Agent
        Task Decomposition
        Agent Selection
        Result Aggregation
        Quality Control
      Planner-Executor
        Plan Generation
        Step Execution
        Plan Revision
        Completion Check
      Reflection Loop
        Self-Evaluation
        Output Correction
        Confidence Scoring
        Iteration Limits

Architecture Overview

graph TB
    subgraph UserLayer["User Interface"]
        WS[API Gateway WebSocket]
        USR[Manga Customer]
    end

    subgraph SupervisorLayer["Supervisor Agent"]
        SUP[Supervisor Agent<br/>Intent Classification<br/>Task Decomposition]
        PM[Plan Manager<br/>Step Tracking]
    end

    subgraph AgentSquad["Agent Squad"]
        SA[Search Agent<br/>OpenSearch Vector<br/>Manga Discovery]
        RA[Recommendation Agent<br/>Personalized Picks<br/>Collaborative Filtering]
        OA[Order Agent<br/>DynamoDB Queries<br/>Status / Returns]
        CA[Conversation Agent<br/>Greetings / FAQ<br/>Small Talk]
    end

    subgraph ToolLayer["Tool Layer"]
        BK[Bedrock Claude 3<br/>Sonnet / Haiku]
        OS[OpenSearch Serverless<br/>Vector Store]
        DDB[DynamoDB<br/>Sessions / Products]
        RC[ElastiCache Redis<br/>Cache / State]
    end

    subgraph ReflectionLayer["Reflection & Quality"]
        RF[Reflection Agent<br/>Output Validation]
        QC[Quality Checker<br/>Safety / Relevance]
    end

    USR -->|WebSocket| WS
    WS --> SUP
    SUP -->|Route| SA
    SUP -->|Route| RA
    SUP -->|Route| OA
    SUP -->|Route| CA
    SUP --> PM

    SA --> BK
    SA --> OS
    RA --> BK
    RA --> DDB
    OA --> DDB
    CA --> BK

    SA --> RF
    RA --> RF
    OA --> RF
    RF --> QC
    QC -->|Approved| WS

    SA --> RC
    RA --> RC

    style SUP fill:#232f3e,color:#ff9900
    style BK fill:#232f3e,color:#ff9900
    style OS fill:#232f3e,color:#ff9900

Strands Agent Framework

Agent Lifecycle

stateDiagram-v2
    [*] --> Initialized: Create Agent
    Initialized --> Planning: Receive Task
    Planning --> Executing: Plan Ready
    Executing --> Reflecting: Step Complete
    Reflecting --> Executing: Needs More Steps
    Reflecting --> Responding: Quality OK
    Responding --> [*]: Response Sent

    Executing --> ErrorHandling: Step Failed
    ErrorHandling --> Planning: Retry with New Plan
    ErrorHandling --> Responding: Graceful Degradation

    Planning --> TimedOut: Budget Exceeded
    Executing --> TimedOut: Budget Exceeded
    TimedOut --> Responding: Partial Response

Core Agent Implementation

"""
Strands Agent framework for MangaAssist multi-agent chatbot.
Implements agent lifecycle, tool integration, and coordination patterns.
"""

import asyncio
import json
import logging
import time
import uuid
from abc import ABC, abstractmethod
from dataclasses import dataclass, field
from enum import Enum
from typing import Any, Callable, Optional

logger = logging.getLogger(__name__)


class AgentState(Enum):
    INITIALIZED = "initialized"
    PLANNING = "planning"
    EXECUTING = "executing"
    REFLECTING = "reflecting"
    RESPONDING = "responding"
    ERROR = "error"
    TIMED_OUT = "timed_out"


class AgentRole(Enum):
    SUPERVISOR = "supervisor"
    SEARCH = "search"
    RECOMMENDATION = "recommendation"
    ORDER = "order"
    CONVERSATION = "conversation"
    REFLECTION = "reflection"


@dataclass
class AgentMessage:
    """A message exchanged between agents."""
    sender: str
    receiver: str
    content: Any
    message_type: str = "task"  # task, result, error, control
    correlation_id: str = field(default_factory=lambda: str(uuid.uuid4())[:8])
    timestamp: float = field(default_factory=time.time)
    metadata: dict[str, Any] = field(default_factory=dict)


@dataclass
class ExecutionStep:
    """A single step in an agent's execution plan."""
    step_id: int
    action: str
    tool: str | None = None
    parameters: dict[str, Any] = field(default_factory=dict)
    result: Any = None
    status: str = "pending"  # pending, running, completed, failed
    latency_ms: float = 0.0


@dataclass
class AgentContext:
    """Shared context available to agents during execution."""
    session_id: str
    user_id: str
    conversation_history: list[dict[str, str]] = field(default_factory=list)
    user_preferences: dict[str, Any] = field(default_factory=dict)
    execution_budget_ms: float = 2500.0  # Leave 500ms for WebSocket delivery
    tokens_budget: int = 2048
    model_preference: str = "anthropic.claude-3-haiku-20240307-v1:0"


@dataclass
class Tool:
    """A tool available to an agent for executing actions."""
    name: str
    description: str
    handler: Callable
    parameters_schema: dict[str, Any] = field(default_factory=dict)


class BaseAgent(ABC):
    """Abstract base class for all MangaAssist agents."""

    def __init__(
        self,
        agent_id: str,
        role: AgentRole,
        bedrock_client: Any,
        tools: list[Tool] | None = None,
    ):
        self.agent_id = agent_id
        self.role = role
        self.bedrock_client = bedrock_client
        self.tools = {t.name: t for t in (tools or [])}
        self.state = AgentState.INITIALIZED
        self.execution_history: list[ExecutionStep] = []
        self._start_time: float | None = None

    def _elapsed_ms(self) -> float:
        if self._start_time is None:
            return 0.0
        return (time.monotonic() - self._start_time) * 1000

    def _budget_remaining_ms(self, context: AgentContext) -> float:
        return max(0, context.execution_budget_ms - self._elapsed_ms())

    @abstractmethod
    async def plan(self, task: str, context: AgentContext) -> list[ExecutionStep]:
        """Generate an execution plan for the given task."""
        ...

    @abstractmethod
    async def execute_step(
        self, step: ExecutionStep, context: AgentContext
    ) -> Any:
        """Execute a single step from the plan."""
        ...

    @abstractmethod
    async def reflect(
        self, results: list[Any], context: AgentContext
    ) -> tuple[bool, str]:
        """Reflect on execution results. Returns (is_complete, assessment)."""
        ...

    async def run(self, task: str, context: AgentContext) -> AgentMessage:
        """Execute the full agent lifecycle."""
        self._start_time = time.monotonic()
        self.state = AgentState.PLANNING

        try:
            # Planning phase
            steps = await self.plan(task, context)
            self.execution_history = steps

            # Execution phase
            self.state = AgentState.EXECUTING
            results = []
            for step in steps:
                if self._budget_remaining_ms(context) < 200:
                    logger.warning(
                        "Agent %s budget exhausted at step %d/%d",
                        self.agent_id, step.step_id, len(steps),
                    )
                    self.state = AgentState.TIMED_OUT
                    break

                step.status = "running"
                step_start = time.monotonic()
                try:
                    result = await self.execute_step(step, context)
                    step.result = result
                    step.status = "completed"
                    step.latency_ms = (time.monotonic() - step_start) * 1000
                    results.append(result)
                except Exception as e:
                    step.status = "failed"
                    step.result = str(e)
                    step.latency_ms = (time.monotonic() - step_start) * 1000
                    logger.error("Agent %s step %d failed: %s", self.agent_id, step.step_id, e)

            # Reflection phase
            if self.state != AgentState.TIMED_OUT:
                self.state = AgentState.REFLECTING
                is_complete, assessment = await self.reflect(results, context)

                if not is_complete and self._budget_remaining_ms(context) > 500:
                    # Re-plan and execute remaining work
                    additional_steps = await self.plan(
                        f"Continue: {assessment}", context
                    )
                    for step in additional_steps:
                        if self._budget_remaining_ms(context) < 200:
                            break
                        result = await self.execute_step(step, context)
                        results.append(result)

            # Build response
            self.state = AgentState.RESPONDING
            response_text = self._format_response(results)

            return AgentMessage(
                sender=self.agent_id,
                receiver="supervisor",
                content=response_text,
                message_type="result",
                metadata={
                    "role": self.role.value,
                    "steps_executed": len([s for s in self.execution_history if s.status == "completed"]),
                    "total_latency_ms": round(self._elapsed_ms(), 2),
                    "timed_out": self.state == AgentState.TIMED_OUT,
                },
            )

        except Exception as e:
            self.state = AgentState.ERROR
            logger.exception("Agent %s failed: %s", self.agent_id, e)
            return AgentMessage(
                sender=self.agent_id,
                receiver="supervisor",
                content=f"Agent error: {str(e)}",
                message_type="error",
            )

    def _format_response(self, results: list[Any]) -> str:
        """Format execution results into a response string."""
        parts = [str(r) for r in results if r is not None]
        return "\n".join(parts) if parts else "No results generated."

Supervisor Agent Pattern

graph TB
    subgraph Supervisor["Supervisor Agent"]
        IC[Intent Classifier<br/>Haiku — fast routing]
        TD[Task Decomposer<br/>Split complex queries]
        AR[Agent Router<br/>Select specialist agent]
        RA[Result Aggregator<br/>Merge agent outputs]
    end

    subgraph Intents["Detected Intents"]
        I1[SEARCH<br/>Find manga by query]
        I2[RECOMMEND<br/>Personalized picks]
        I3[ORDER<br/>Status / returns]
        I4[CHAT<br/>Greeting / FAQ]
        I5[COMPLEX<br/>Multi-intent query]
    end

    subgraph Agents["Specialist Agents"]
        SA[Search Agent]
        RCA[Recommendation Agent]
        OA[Order Agent]
        CNA[Conversation Agent]
    end

    IC --> I1
    IC --> I2
    IC --> I3
    IC --> I4
    IC --> I5
    I1 --> AR
    I2 --> AR
    I3 --> AR
    I4 --> AR
    I5 --> TD
    TD --> AR
    AR --> SA
    AR --> RCA
    AR --> OA
    AR --> CNA
    SA --> RA
    RCA --> RA
    OA --> RA
    CNA --> RA

    style Supervisor fill:#232f3e,color:#ff9900

Supervisor Agent Implementation

"""
Supervisor Agent for MangaAssist — coordinates the agent squad.
Handles intent classification, task decomposition, and result aggregation.
"""


class SupervisorAgent(BaseAgent):
    """
    Top-level coordinator that routes user queries to specialist agents.
    Uses Haiku for fast intent classification (< 500ms).
    """

    INTENT_PROMPT = """Classify the user's intent for a Japanese manga store chatbot.
Return EXACTLY one JSON object with these fields:
- "intent": one of ["search", "recommend", "order", "chat", "complex"]
- "sub_intents": array of intents if complex (otherwise empty array)
- "confidence": float 0-1
- "requires_context": boolean — does this need conversation history?

User message: {message}

Previous context: {context_summary}

Return only the JSON object, no explanation."""

    def __init__(
        self,
        bedrock_client: Any,
        agent_registry: dict[str, "BaseAgent"],
    ):
        super().__init__(
            agent_id="supervisor",
            role=AgentRole.SUPERVISOR,
            bedrock_client=bedrock_client,
        )
        self.agent_registry = agent_registry
        self._intent_model = "anthropic.claude-3-haiku-20240307-v1:0"

    async def classify_intent(
        self, message: str, context: AgentContext
    ) -> dict[str, Any]:
        """Classify user intent using Haiku for speed."""
        context_summary = (
            json.dumps(context.conversation_history[-3:])
            if context.conversation_history
            else "No prior context"
        )
        prompt = self.INTENT_PROMPT.format(
            message=message,
            context_summary=context_summary,
        )
        response = await self.bedrock_client.invoke(
            model_id=self._intent_model,
            prompt=prompt,
            max_tokens=200,
        )
        try:
            return json.loads(response["text"])
        except json.JSONDecodeError:
            logger.warning("Failed to parse intent classification, defaulting to chat")
            return {
                "intent": "chat",
                "sub_intents": [],
                "confidence": 0.5,
                "requires_context": False,
            }

    async def plan(self, task: str, context: AgentContext) -> list[ExecutionStep]:
        """Create an execution plan based on intent classification."""
        classification = await self.classify_intent(task, context)
        intent = classification["intent"]
        steps = []

        if intent == "complex":
            # Decompose into sub-tasks, execute in parallel where possible
            for i, sub_intent in enumerate(classification.get("sub_intents", ["chat"])):
                steps.append(ExecutionStep(
                    step_id=i,
                    action=f"delegate_{sub_intent}",
                    tool=sub_intent,
                    parameters={"message": task, "sub_intent": sub_intent},
                ))
        else:
            steps.append(ExecutionStep(
                step_id=0,
                action=f"delegate_{intent}",
                tool=intent,
                parameters={"message": task},
            ))

        return steps

    async def execute_step(
        self, step: ExecutionStep, context: AgentContext
    ) -> Any:
        """Delegate step execution to the appropriate specialist agent."""
        agent_key = step.tool or "conversation"
        agent_map = {
            "search": "search",
            "recommend": "recommendation",
            "order": "order",
            "chat": "conversation",
        }
        agent_name = agent_map.get(agent_key, "conversation")
        agent = self.agent_registry.get(agent_name)

        if agent is None:
            logger.warning("No agent found for '%s', falling back to conversation", agent_name)
            agent = self.agent_registry.get("conversation")

        if agent is None:
            return "I'm sorry, I'm having trouble processing your request right now."

        # Delegate to specialist with remaining budget
        remaining_budget = self._budget_remaining_ms(context)
        delegated_context = AgentContext(
            session_id=context.session_id,
            user_id=context.user_id,
            conversation_history=context.conversation_history,
            user_preferences=context.user_preferences,
            execution_budget_ms=remaining_budget - 200,  # Reserve for aggregation
            tokens_budget=context.tokens_budget,
            model_preference=context.model_preference,
        )

        result = await agent.run(step.parameters.get("message", ""), delegated_context)
        return result.content

    async def reflect(
        self, results: list[Any], context: AgentContext
    ) -> tuple[bool, str]:
        """Check if all sub-tasks produced satisfactory results."""
        if not results:
            return False, "No results from any agent"
        empty_results = [r for r in results if not r or r == "No results generated."]
        if len(empty_results) == len(results):
            return False, "All agents returned empty results"
        return True, "Results collected successfully"

    async def handle_message(
        self, message: str, context: AgentContext
    ) -> str:
        """Main entry point: handle a user message end-to-end."""
        result = await self.run(message, context)
        return result.content


class SearchAgent(BaseAgent):
    """Specialist agent for manga search queries."""

    SEARCH_PROMPT = """You are a helpful manga store assistant. Based on the search query,
generate a natural response with relevant manga information.

Search query: {query}
Search results: {results}

Respond in the same language as the query. Include title (Japanese and romaji),
author, volume count, and price if available. Keep response concise."""

    def __init__(self, bedrock_client: Any, opensearch_client: Any):
        super().__init__(
            agent_id="search_agent",
            role=AgentRole.SEARCH,
            bedrock_client=bedrock_client,
        )
        self.opensearch_client = opensearch_client

    async def plan(self, task: str, context: AgentContext) -> list[ExecutionStep]:
        return [
            ExecutionStep(step_id=0, action="vector_search", tool="opensearch", parameters={"query": task}),
            ExecutionStep(step_id=1, action="format_results", tool="bedrock", parameters={"query": task}),
        ]

    async def execute_step(self, step: ExecutionStep, context: AgentContext) -> Any:
        if step.action == "vector_search":
            # Execute OpenSearch vector search
            results = await self._search_manga(step.parameters["query"])
            return results
        elif step.action == "format_results":
            # Format results using Bedrock
            search_results = self.execution_history[0].result if self.execution_history else []
            return await self._format_results(step.parameters["query"], search_results)
        return None

    async def _search_manga(self, query: str) -> list[dict[str, Any]]:
        """Search manga catalog using OpenSearch vector search."""
        # In production, this would call OpenSearch Serverless
        return [{"title": query, "status": "mock_search_result"}]

    async def _format_results(self, query: str, results: list[dict]) -> str:
        """Format search results into a natural language response."""
        prompt = self.SEARCH_PROMPT.format(
            query=query, results=json.dumps(results, ensure_ascii=False)
        )
        response = await self.bedrock_client.invoke(
            model_id="anthropic.claude-3-haiku-20240307-v1:0",
            prompt=prompt,
            max_tokens=500,
        )
        return response["text"]

    async def reflect(self, results: list[Any], context: AgentContext) -> tuple[bool, str]:
        if results and any(r for r in results if r):
            return True, "Search results found and formatted"
        return False, "No search results to display"


class RecommendationAgent(BaseAgent):
    """Specialist agent for personalized manga recommendations."""

    RECOMMEND_PROMPT = """You are a manga recommendation expert. Based on the user's
preferences and request, suggest relevant manga titles.

User request: {request}
User preferences: {preferences}
Purchase history: {history}

Provide 3-5 recommendations with reasons. Include Japanese titles.
Match the user's language."""

    def __init__(self, bedrock_client: Any, dynamodb_client: Any):
        super().__init__(
            agent_id="recommendation_agent",
            role=AgentRole.RECOMMENDATION,
            bedrock_client=bedrock_client,
        )
        self.dynamodb_client = dynamodb_client

    async def plan(self, task: str, context: AgentContext) -> list[ExecutionStep]:
        return [
            ExecutionStep(step_id=0, action="fetch_preferences", tool="dynamodb",
                         parameters={"user_id": context.user_id}),
            ExecutionStep(step_id=1, action="generate_recommendations", tool="bedrock",
                         parameters={"request": task}),
        ]

    async def execute_step(self, step: ExecutionStep, context: AgentContext) -> Any:
        if step.action == "fetch_preferences":
            return context.user_preferences
        elif step.action == "generate_recommendations":
            prefs = self.execution_history[0].result if self.execution_history else {}
            prompt = self.RECOMMEND_PROMPT.format(
                request=step.parameters["request"],
                preferences=json.dumps(prefs, ensure_ascii=False),
                history="[]",
            )
            response = await self.bedrock_client.invoke(
                model_id="anthropic.claude-3-sonnet-20240229-v1:0",
                prompt=prompt,
                max_tokens=800,
            )
            return response["text"]
        return None

    async def reflect(self, results: list[Any], context: AgentContext) -> tuple[bool, str]:
        if results and len(results) >= 2 and results[-1]:
            return True, "Recommendations generated"
        return False, "Failed to generate recommendations"

Planner-Executor Pattern

graph TB
    subgraph Planner["Planner Agent"]
        PG[Plan Generator<br/>Sonnet — complex reasoning]
        PS[Plan Store<br/>Versioned plans in DynamoDB]
        PR[Plan Reviser<br/>Adapt on failure]
    end

    subgraph Executor["Executor Agent"]
        SE[Step Executor<br/>Haiku — fast actions]
        SR[Step Reporter<br/>Status updates]
        SV[Step Validator<br/>Output checks]
    end

    subgraph PlanLifecycle["Plan Lifecycle"]
        P1[Draft Plan] --> P2[Validate Plan]
        P2 --> P3[Execute Steps]
        P3 --> P4[Report Results]
        P4 --> P5{Complete?}
        P5 -->|No| P6[Revise Plan]
        P6 --> P3
        P5 -->|Yes| P7[Final Response]
    end

    PG --> P1
    PS --> P2
    SE --> P3
    SR --> P4
    SV --> P5
    PR --> P6

    style PG fill:#339af0,color:#fff
    style SE fill:#51cf66,color:#fff

Planner-Executor Implementation

"""
Planner-Executor pattern for complex multi-step manga queries.
Planner uses Sonnet for reasoning; Executor uses Haiku for speed.
"""


@dataclass
class ExecutionPlan:
    """A multi-step plan generated by the Planner agent."""
    plan_id: str
    goal: str
    steps: list[ExecutionStep]
    version: int = 1
    status: str = "draft"  # draft, validated, executing, completed, failed
    created_at: float = field(default_factory=time.time)


class PlannerAgent:
    """
    Generates execution plans using Sonnet for complex reasoning.
    Plans are structured, versioned, and revisable.
    """

    PLAN_PROMPT = """You are a planning agent for a manga store chatbot. Given the user's
complex query, create a step-by-step execution plan.

User query: {query}
Available tools: {tools}
Constraints: Must complete within {budget_ms}ms, max {max_steps} steps.

Return a JSON array of steps, each with:
- "step_id": integer
- "action": string description
- "tool": tool name to use
- "parameters": dict of parameters
- "depends_on": array of step_ids this depends on (for parallel execution)

Return only the JSON array."""

    def __init__(self, bedrock_client: Any, available_tools: list[str]):
        self.bedrock_client = bedrock_client
        self.available_tools = available_tools

    async def generate_plan(
        self, query: str, context: AgentContext
    ) -> ExecutionPlan:
        """Generate an execution plan for a complex query."""
        prompt = self.PLAN_PROMPT.format(
            query=query,
            tools=", ".join(self.available_tools),
            budget_ms=int(context.execution_budget_ms),
            max_steps=5,
        )
        response = await self.bedrock_client.invoke(
            model_id="anthropic.claude-3-sonnet-20240229-v1:0",
            prompt=prompt,
            max_tokens=500,
        )

        try:
            steps_data = json.loads(response["text"])
            steps = [
                ExecutionStep(
                    step_id=s["step_id"],
                    action=s["action"],
                    tool=s.get("tool"),
                    parameters=s.get("parameters", {}),
                )
                for s in steps_data
            ]
        except (json.JSONDecodeError, KeyError):
            # Fallback: single-step plan
            steps = [ExecutionStep(
                step_id=0,
                action="direct_response",
                tool="bedrock",
                parameters={"query": query},
            )]

        return ExecutionPlan(
            plan_id=str(uuid.uuid4())[:8],
            goal=query,
            steps=steps,
            status="validated",
        )

    async def revise_plan(
        self,
        plan: ExecutionPlan,
        failed_step: ExecutionStep,
        error: str,
        context: AgentContext,
    ) -> ExecutionPlan:
        """Revise a plan after a step failure."""
        remaining_steps = [
            s for s in plan.steps if s.status == "pending"
        ]
        prompt = (
            f"Step {failed_step.step_id} failed: {error}. "
            f"Original goal: {plan.goal}. "
            f"Remaining steps: {len(remaining_steps)}. "
            f"Budget remaining: {context.execution_budget_ms}ms. "
            "Generate an alternative approach. Return JSON array of steps."
        )
        response = await self.bedrock_client.invoke(
            model_id="anthropic.claude-3-haiku-20240307-v1:0",
            prompt=prompt,
            max_tokens=300,
        )

        try:
            new_steps = json.loads(response["text"])
            plan.steps = [
                ExecutionStep(step_id=s["step_id"], action=s["action"], tool=s.get("tool"))
                for s in new_steps
            ]
        except (json.JSONDecodeError, KeyError):
            pass  # Keep existing remaining steps

        plan.version += 1
        plan.status = "validated"
        return plan


class ExecutorAgent:
    """
    Executes plan steps using Haiku for speed.
    Reports progress and validates step outputs.
    """

    def __init__(
        self,
        bedrock_client: Any,
        tool_handlers: dict[str, Callable],
    ):
        self.bedrock_client = bedrock_client
        self.tool_handlers = tool_handlers

    async def execute_plan(
        self, plan: ExecutionPlan, context: AgentContext
    ) -> list[Any]:
        """Execute all steps in a plan, respecting dependencies."""
        plan.status = "executing"
        results: dict[int, Any] = {}

        # Group steps by dependency level for parallel execution
        levels = self._topological_sort(plan.steps)

        for level in levels:
            if context.execution_budget_ms <= 0:
                logger.warning("Plan execution budget exhausted")
                break

            # Execute independent steps in parallel
            tasks = [
                self._execute_single_step(step, results, context)
                for step in level
            ]
            level_results = await asyncio.gather(*tasks, return_exceptions=True)

            for step, result in zip(level, level_results):
                if isinstance(result, Exception):
                    step.status = "failed"
                    step.result = str(result)
                    results[step.step_id] = None
                else:
                    results[step.step_id] = result

        plan.status = "completed"
        return [results.get(s.step_id) for s in plan.steps]

    async def _execute_single_step(
        self,
        step: ExecutionStep,
        prior_results: dict[int, Any],
        context: AgentContext,
    ) -> Any:
        """Execute a single plan step."""
        step.status = "running"
        start = time.monotonic()

        handler = self.tool_handlers.get(step.tool or "")
        if handler:
            result = await handler(step.parameters, prior_results)
        else:
            # Default: use Bedrock to handle the step
            prompt = f"Execute action: {step.action}. Parameters: {json.dumps(step.parameters)}"
            response = await self.bedrock_client.invoke(
                model_id="anthropic.claude-3-haiku-20240307-v1:0",
                prompt=prompt,
                max_tokens=500,
            )
            result = response["text"]

        step.status = "completed"
        step.latency_ms = (time.monotonic() - start) * 1000
        step.result = result
        return result

    def _topological_sort(
        self, steps: list[ExecutionStep]
    ) -> list[list[ExecutionStep]]:
        """Sort steps into dependency levels for parallel execution."""
        # Simple: all steps are sequential for now
        return [[step] for step in steps]

Agent Coordination Patterns

graph LR
    subgraph Patterns["Coordination Patterns"]
        SP[Supervisor<br/>Central coordinator]
        PP[Peer-to-Peer<br/>Direct messaging]
        HP[Hierarchical<br/>Multi-level delegation]
        BP[Blackboard<br/>Shared state]
    end

    subgraph UseCases["MangaAssist Use Cases"]
        U1[Simple Query<br/>→ Single agent]
        U2[Multi-Intent<br/>→ Parallel agents]
        U3[Complex Research<br/>→ Plan-Execute]
        U4[Conversational<br/>→ Context chain]
    end

    SP --> U1
    SP --> U2
    HP --> U3
    BP --> U4

    style SP fill:#339af0,color:#fff
    style HP fill:#51cf66,color:#fff

Message Bus for Agent Communication

"""
Agent message bus for MangaAssist multi-agent coordination.
Enables asynchronous message passing between agents.
"""


class AgentMessageBus:
    """
    Central message bus for agent-to-agent communication.
    Supports direct messaging, broadcast, and topic-based pub/sub.
    """

    def __init__(self):
        self._queues: dict[str, asyncio.Queue] = {}
        self._subscribers: dict[str, list[str]] = {}  # topic -> [agent_ids]
        self._message_log: list[AgentMessage] = []

    def register_agent(self, agent_id: str) -> None:
        """Register an agent with the message bus."""
        if agent_id not in self._queues:
            self._queues[agent_id] = asyncio.Queue()

    def subscribe(self, agent_id: str, topic: str) -> None:
        """Subscribe an agent to a topic."""
        if topic not in self._subscribers:
            self._subscribers[topic] = []
        if agent_id not in self._subscribers[topic]:
            self._subscribers[topic].append(agent_id)

    async def send(self, message: AgentMessage) -> None:
        """Send a message to a specific agent."""
        self._message_log.append(message)
        queue = self._queues.get(message.receiver)
        if queue:
            await queue.put(message)
        else:
            logger.warning("No queue for agent '%s'", message.receiver)

    async def broadcast(self, sender: str, topic: str, content: Any) -> None:
        """Broadcast a message to all subscribers of a topic."""
        subscribers = self._subscribers.get(topic, [])
        for sub_id in subscribers:
            if sub_id != sender:
                msg = AgentMessage(
                    sender=sender,
                    receiver=sub_id,
                    content=content,
                    message_type="broadcast",
                )
                await self.send(msg)

    async def receive(self, agent_id: str, timeout: float = 5.0) -> AgentMessage | None:
        """Receive a message for a specific agent with timeout."""
        queue = self._queues.get(agent_id)
        if not queue:
            return None
        try:
            return await asyncio.wait_for(queue.get(), timeout=timeout)
        except asyncio.TimeoutError:
            return None

    def get_message_log(self) -> list[dict[str, Any]]:
        """Get the full message log for debugging."""
        return [
            {
                "sender": m.sender,
                "receiver": m.receiver,
                "type": m.message_type,
                "correlation_id": m.correlation_id,
                "timestamp": m.timestamp,
                "content_preview": str(m.content)[:100],
            }
            for m in self._message_log
        ]

Key Takeaways

# Takeaway MangaAssist Application
1 The Supervisor pattern is the primary coordination model for MangaAssist — a central agent classifies intent and routes to specialists. Haiku classifies intent in < 500ms; the supervisor delegates to Search, Recommendation, Order, or Conversation agents.
2 Agent budgets must cascade — the supervisor reserves time for aggregation, passes the remainder to specialists. The 3-second total budget becomes: 500ms intent classification + 2000ms specialist execution + 500ms WebSocket delivery.
3 Planner-Executor separates reasoning from action — Sonnet plans, Haiku executes. Complex multi-step queries (e.g., "find a manga like Attack on Titan and add volume 1 to my cart") get a Sonnet plan, then each step runs on Haiku.
4 Agent reflection enables self-correction but must be bounded to prevent infinite loops. Reflection runs once after execution; if it fails, the agent returns a partial response rather than re-planning indefinitely.
5 Message buses decouple agents so they can be developed, tested, and scaled independently. Each agent in the squad has its own message queue; the supervisor orchestrates via async message passing.
6 Parallel execution of independent sub-tasks is critical for complex queries within the 3-second budget. A "search + recommend" complex query runs both agents in parallel via asyncio.gather(), cutting total time nearly in half.
7 Agent state machines make lifecycle management explicit and debuggable — every agent transitions through PLAN → EXECUTE → REFLECT → RESPOND. CloudWatch logs include agent state transitions, making it easy to trace where a query stalled or failed.