Advanced GenAI Architecture — Strands Agents & Multi-Agent Systems
MangaAssist context: JP Manga store chatbot on AWS — Bedrock Claude 3 (Sonnet at $3/$15 per 1M tokens input/output, Haiku at $0.25/$1.25), OpenSearch Serverless (vector store), DynamoDB (sessions/products), ECS Fargate (orchestrator), API Gateway WebSocket, ElastiCache Redis. Target: useful answer in under 3 seconds, 1M messages/day scale.
Skill Mapping
| Attribute |
Detail |
| Domain |
2 — Implementation & Integration of GenAI Applications |
| Task |
2.5 — Application Integration Patterns |
| Skill |
2.5.5 — Advanced GenAI Applications |
| Focus |
Strands Agents, Agent Squad, agent design patterns, supervisor/planner-executor, prompt chaining |
| MangaAssist Scope |
Agent lifecycle, multi-agent coordination for manga search, recommendations, and order management |
Mind Map
mindmap
root((Advanced GenAI<br/>Architecture))
Strands Agents
Agent Lifecycle
Initialization
Planning
Execution
Reflection
Termination
Tool Integration
Bedrock Tools
OpenSearch Tools
DynamoDB Tools
Redis Cache Tools
Memory Management
Short-Term Context
Long-Term Preferences
Episodic Memory
Agent Squad
Multi-Agent Patterns
Supervisor Pattern
Planner-Executor Pattern
Peer-to-Peer Collaboration
Hierarchical Delegation
Coordination
Message Passing
Shared State
Conflict Resolution
Consensus Protocol
Routing
Intent Classification
Skill-Based Routing
Load Balancing
Fallback Chains
Prompt Chaining
Chain Types
Sequential Chains
Parallel Chains
Conditional Chains
Map-Reduce Chains
Chain Management
State Propagation
Error Recovery
Timeout Handling
Cost Budgeting
Design Patterns
Supervisor Agent
Task Decomposition
Agent Selection
Result Aggregation
Quality Control
Planner-Executor
Plan Generation
Step Execution
Plan Revision
Completion Check
Reflection Loop
Self-Evaluation
Output Correction
Confidence Scoring
Iteration Limits
Architecture Overview
graph TB
subgraph UserLayer["User Interface"]
WS[API Gateway WebSocket]
USR[Manga Customer]
end
subgraph SupervisorLayer["Supervisor Agent"]
SUP[Supervisor Agent<br/>Intent Classification<br/>Task Decomposition]
PM[Plan Manager<br/>Step Tracking]
end
subgraph AgentSquad["Agent Squad"]
SA[Search Agent<br/>OpenSearch Vector<br/>Manga Discovery]
RA[Recommendation Agent<br/>Personalized Picks<br/>Collaborative Filtering]
OA[Order Agent<br/>DynamoDB Queries<br/>Status / Returns]
CA[Conversation Agent<br/>Greetings / FAQ<br/>Small Talk]
end
subgraph ToolLayer["Tool Layer"]
BK[Bedrock Claude 3<br/>Sonnet / Haiku]
OS[OpenSearch Serverless<br/>Vector Store]
DDB[DynamoDB<br/>Sessions / Products]
RC[ElastiCache Redis<br/>Cache / State]
end
subgraph ReflectionLayer["Reflection & Quality"]
RF[Reflection Agent<br/>Output Validation]
QC[Quality Checker<br/>Safety / Relevance]
end
USR -->|WebSocket| WS
WS --> SUP
SUP -->|Route| SA
SUP -->|Route| RA
SUP -->|Route| OA
SUP -->|Route| CA
SUP --> PM
SA --> BK
SA --> OS
RA --> BK
RA --> DDB
OA --> DDB
CA --> BK
SA --> RF
RA --> RF
OA --> RF
RF --> QC
QC -->|Approved| WS
SA --> RC
RA --> RC
style SUP fill:#232f3e,color:#ff9900
style BK fill:#232f3e,color:#ff9900
style OS fill:#232f3e,color:#ff9900
Strands Agent Framework
Agent Lifecycle
stateDiagram-v2
[*] --> Initialized: Create Agent
Initialized --> Planning: Receive Task
Planning --> Executing: Plan Ready
Executing --> Reflecting: Step Complete
Reflecting --> Executing: Needs More Steps
Reflecting --> Responding: Quality OK
Responding --> [*]: Response Sent
Executing --> ErrorHandling: Step Failed
ErrorHandling --> Planning: Retry with New Plan
ErrorHandling --> Responding: Graceful Degradation
Planning --> TimedOut: Budget Exceeded
Executing --> TimedOut: Budget Exceeded
TimedOut --> Responding: Partial Response
Core Agent Implementation
"""
Strands Agent framework for MangaAssist multi-agent chatbot.
Implements agent lifecycle, tool integration, and coordination patterns.
"""
import asyncio
import json
import logging
import time
import uuid
from abc import ABC, abstractmethod
from dataclasses import dataclass, field
from enum import Enum
from typing import Any, Callable, Optional
logger = logging.getLogger(__name__)
class AgentState(Enum):
INITIALIZED = "initialized"
PLANNING = "planning"
EXECUTING = "executing"
REFLECTING = "reflecting"
RESPONDING = "responding"
ERROR = "error"
TIMED_OUT = "timed_out"
class AgentRole(Enum):
SUPERVISOR = "supervisor"
SEARCH = "search"
RECOMMENDATION = "recommendation"
ORDER = "order"
CONVERSATION = "conversation"
REFLECTION = "reflection"
@dataclass
class AgentMessage:
"""A message exchanged between agents."""
sender: str
receiver: str
content: Any
message_type: str = "task" # task, result, error, control
correlation_id: str = field(default_factory=lambda: str(uuid.uuid4())[:8])
timestamp: float = field(default_factory=time.time)
metadata: dict[str, Any] = field(default_factory=dict)
@dataclass
class ExecutionStep:
"""A single step in an agent's execution plan."""
step_id: int
action: str
tool: str | None = None
parameters: dict[str, Any] = field(default_factory=dict)
result: Any = None
status: str = "pending" # pending, running, completed, failed
latency_ms: float = 0.0
@dataclass
class AgentContext:
"""Shared context available to agents during execution."""
session_id: str
user_id: str
conversation_history: list[dict[str, str]] = field(default_factory=list)
user_preferences: dict[str, Any] = field(default_factory=dict)
execution_budget_ms: float = 2500.0 # Leave 500ms for WebSocket delivery
tokens_budget: int = 2048
model_preference: str = "anthropic.claude-3-haiku-20240307-v1:0"
@dataclass
class Tool:
"""A tool available to an agent for executing actions."""
name: str
description: str
handler: Callable
parameters_schema: dict[str, Any] = field(default_factory=dict)
class BaseAgent(ABC):
"""Abstract base class for all MangaAssist agents."""
def __init__(
self,
agent_id: str,
role: AgentRole,
bedrock_client: Any,
tools: list[Tool] | None = None,
):
self.agent_id = agent_id
self.role = role
self.bedrock_client = bedrock_client
self.tools = {t.name: t for t in (tools or [])}
self.state = AgentState.INITIALIZED
self.execution_history: list[ExecutionStep] = []
self._start_time: float | None = None
def _elapsed_ms(self) -> float:
if self._start_time is None:
return 0.0
return (time.monotonic() - self._start_time) * 1000
def _budget_remaining_ms(self, context: AgentContext) -> float:
return max(0, context.execution_budget_ms - self._elapsed_ms())
@abstractmethod
async def plan(self, task: str, context: AgentContext) -> list[ExecutionStep]:
"""Generate an execution plan for the given task."""
...
@abstractmethod
async def execute_step(
self, step: ExecutionStep, context: AgentContext
) -> Any:
"""Execute a single step from the plan."""
...
@abstractmethod
async def reflect(
self, results: list[Any], context: AgentContext
) -> tuple[bool, str]:
"""Reflect on execution results. Returns (is_complete, assessment)."""
...
async def run(self, task: str, context: AgentContext) -> AgentMessage:
"""Execute the full agent lifecycle."""
self._start_time = time.monotonic()
self.state = AgentState.PLANNING
try:
# Planning phase
steps = await self.plan(task, context)
self.execution_history = steps
# Execution phase
self.state = AgentState.EXECUTING
results = []
for step in steps:
if self._budget_remaining_ms(context) < 200:
logger.warning(
"Agent %s budget exhausted at step %d/%d",
self.agent_id, step.step_id, len(steps),
)
self.state = AgentState.TIMED_OUT
break
step.status = "running"
step_start = time.monotonic()
try:
result = await self.execute_step(step, context)
step.result = result
step.status = "completed"
step.latency_ms = (time.monotonic() - step_start) * 1000
results.append(result)
except Exception as e:
step.status = "failed"
step.result = str(e)
step.latency_ms = (time.monotonic() - step_start) * 1000
logger.error("Agent %s step %d failed: %s", self.agent_id, step.step_id, e)
# Reflection phase
if self.state != AgentState.TIMED_OUT:
self.state = AgentState.REFLECTING
is_complete, assessment = await self.reflect(results, context)
if not is_complete and self._budget_remaining_ms(context) > 500:
# Re-plan and execute remaining work
additional_steps = await self.plan(
f"Continue: {assessment}", context
)
for step in additional_steps:
if self._budget_remaining_ms(context) < 200:
break
result = await self.execute_step(step, context)
results.append(result)
# Build response
self.state = AgentState.RESPONDING
response_text = self._format_response(results)
return AgentMessage(
sender=self.agent_id,
receiver="supervisor",
content=response_text,
message_type="result",
metadata={
"role": self.role.value,
"steps_executed": len([s for s in self.execution_history if s.status == "completed"]),
"total_latency_ms": round(self._elapsed_ms(), 2),
"timed_out": self.state == AgentState.TIMED_OUT,
},
)
except Exception as e:
self.state = AgentState.ERROR
logger.exception("Agent %s failed: %s", self.agent_id, e)
return AgentMessage(
sender=self.agent_id,
receiver="supervisor",
content=f"Agent error: {str(e)}",
message_type="error",
)
def _format_response(self, results: list[Any]) -> str:
"""Format execution results into a response string."""
parts = [str(r) for r in results if r is not None]
return "\n".join(parts) if parts else "No results generated."
Supervisor Agent Pattern
graph TB
subgraph Supervisor["Supervisor Agent"]
IC[Intent Classifier<br/>Haiku — fast routing]
TD[Task Decomposer<br/>Split complex queries]
AR[Agent Router<br/>Select specialist agent]
RA[Result Aggregator<br/>Merge agent outputs]
end
subgraph Intents["Detected Intents"]
I1[SEARCH<br/>Find manga by query]
I2[RECOMMEND<br/>Personalized picks]
I3[ORDER<br/>Status / returns]
I4[CHAT<br/>Greeting / FAQ]
I5[COMPLEX<br/>Multi-intent query]
end
subgraph Agents["Specialist Agents"]
SA[Search Agent]
RCA[Recommendation Agent]
OA[Order Agent]
CNA[Conversation Agent]
end
IC --> I1
IC --> I2
IC --> I3
IC --> I4
IC --> I5
I1 --> AR
I2 --> AR
I3 --> AR
I4 --> AR
I5 --> TD
TD --> AR
AR --> SA
AR --> RCA
AR --> OA
AR --> CNA
SA --> RA
RCA --> RA
OA --> RA
CNA --> RA
style Supervisor fill:#232f3e,color:#ff9900
Supervisor Agent Implementation
"""
Supervisor Agent for MangaAssist — coordinates the agent squad.
Handles intent classification, task decomposition, and result aggregation.
"""
class SupervisorAgent(BaseAgent):
"""
Top-level coordinator that routes user queries to specialist agents.
Uses Haiku for fast intent classification (< 500ms).
"""
INTENT_PROMPT = """Classify the user's intent for a Japanese manga store chatbot.
Return EXACTLY one JSON object with these fields:
- "intent": one of ["search", "recommend", "order", "chat", "complex"]
- "sub_intents": array of intents if complex (otherwise empty array)
- "confidence": float 0-1
- "requires_context": boolean — does this need conversation history?
User message: {message}
Previous context: {context_summary}
Return only the JSON object, no explanation."""
def __init__(
self,
bedrock_client: Any,
agent_registry: dict[str, "BaseAgent"],
):
super().__init__(
agent_id="supervisor",
role=AgentRole.SUPERVISOR,
bedrock_client=bedrock_client,
)
self.agent_registry = agent_registry
self._intent_model = "anthropic.claude-3-haiku-20240307-v1:0"
async def classify_intent(
self, message: str, context: AgentContext
) -> dict[str, Any]:
"""Classify user intent using Haiku for speed."""
context_summary = (
json.dumps(context.conversation_history[-3:])
if context.conversation_history
else "No prior context"
)
prompt = self.INTENT_PROMPT.format(
message=message,
context_summary=context_summary,
)
response = await self.bedrock_client.invoke(
model_id=self._intent_model,
prompt=prompt,
max_tokens=200,
)
try:
return json.loads(response["text"])
except json.JSONDecodeError:
logger.warning("Failed to parse intent classification, defaulting to chat")
return {
"intent": "chat",
"sub_intents": [],
"confidence": 0.5,
"requires_context": False,
}
async def plan(self, task: str, context: AgentContext) -> list[ExecutionStep]:
"""Create an execution plan based on intent classification."""
classification = await self.classify_intent(task, context)
intent = classification["intent"]
steps = []
if intent == "complex":
# Decompose into sub-tasks, execute in parallel where possible
for i, sub_intent in enumerate(classification.get("sub_intents", ["chat"])):
steps.append(ExecutionStep(
step_id=i,
action=f"delegate_{sub_intent}",
tool=sub_intent,
parameters={"message": task, "sub_intent": sub_intent},
))
else:
steps.append(ExecutionStep(
step_id=0,
action=f"delegate_{intent}",
tool=intent,
parameters={"message": task},
))
return steps
async def execute_step(
self, step: ExecutionStep, context: AgentContext
) -> Any:
"""Delegate step execution to the appropriate specialist agent."""
agent_key = step.tool or "conversation"
agent_map = {
"search": "search",
"recommend": "recommendation",
"order": "order",
"chat": "conversation",
}
agent_name = agent_map.get(agent_key, "conversation")
agent = self.agent_registry.get(agent_name)
if agent is None:
logger.warning("No agent found for '%s', falling back to conversation", agent_name)
agent = self.agent_registry.get("conversation")
if agent is None:
return "I'm sorry, I'm having trouble processing your request right now."
# Delegate to specialist with remaining budget
remaining_budget = self._budget_remaining_ms(context)
delegated_context = AgentContext(
session_id=context.session_id,
user_id=context.user_id,
conversation_history=context.conversation_history,
user_preferences=context.user_preferences,
execution_budget_ms=remaining_budget - 200, # Reserve for aggregation
tokens_budget=context.tokens_budget,
model_preference=context.model_preference,
)
result = await agent.run(step.parameters.get("message", ""), delegated_context)
return result.content
async def reflect(
self, results: list[Any], context: AgentContext
) -> tuple[bool, str]:
"""Check if all sub-tasks produced satisfactory results."""
if not results:
return False, "No results from any agent"
empty_results = [r for r in results if not r or r == "No results generated."]
if len(empty_results) == len(results):
return False, "All agents returned empty results"
return True, "Results collected successfully"
async def handle_message(
self, message: str, context: AgentContext
) -> str:
"""Main entry point: handle a user message end-to-end."""
result = await self.run(message, context)
return result.content
class SearchAgent(BaseAgent):
"""Specialist agent for manga search queries."""
SEARCH_PROMPT = """You are a helpful manga store assistant. Based on the search query,
generate a natural response with relevant manga information.
Search query: {query}
Search results: {results}
Respond in the same language as the query. Include title (Japanese and romaji),
author, volume count, and price if available. Keep response concise."""
def __init__(self, bedrock_client: Any, opensearch_client: Any):
super().__init__(
agent_id="search_agent",
role=AgentRole.SEARCH,
bedrock_client=bedrock_client,
)
self.opensearch_client = opensearch_client
async def plan(self, task: str, context: AgentContext) -> list[ExecutionStep]:
return [
ExecutionStep(step_id=0, action="vector_search", tool="opensearch", parameters={"query": task}),
ExecutionStep(step_id=1, action="format_results", tool="bedrock", parameters={"query": task}),
]
async def execute_step(self, step: ExecutionStep, context: AgentContext) -> Any:
if step.action == "vector_search":
# Execute OpenSearch vector search
results = await self._search_manga(step.parameters["query"])
return results
elif step.action == "format_results":
# Format results using Bedrock
search_results = self.execution_history[0].result if self.execution_history else []
return await self._format_results(step.parameters["query"], search_results)
return None
async def _search_manga(self, query: str) -> list[dict[str, Any]]:
"""Search manga catalog using OpenSearch vector search."""
# In production, this would call OpenSearch Serverless
return [{"title": query, "status": "mock_search_result"}]
async def _format_results(self, query: str, results: list[dict]) -> str:
"""Format search results into a natural language response."""
prompt = self.SEARCH_PROMPT.format(
query=query, results=json.dumps(results, ensure_ascii=False)
)
response = await self.bedrock_client.invoke(
model_id="anthropic.claude-3-haiku-20240307-v1:0",
prompt=prompt,
max_tokens=500,
)
return response["text"]
async def reflect(self, results: list[Any], context: AgentContext) -> tuple[bool, str]:
if results and any(r for r in results if r):
return True, "Search results found and formatted"
return False, "No search results to display"
class RecommendationAgent(BaseAgent):
"""Specialist agent for personalized manga recommendations."""
RECOMMEND_PROMPT = """You are a manga recommendation expert. Based on the user's
preferences and request, suggest relevant manga titles.
User request: {request}
User preferences: {preferences}
Purchase history: {history}
Provide 3-5 recommendations with reasons. Include Japanese titles.
Match the user's language."""
def __init__(self, bedrock_client: Any, dynamodb_client: Any):
super().__init__(
agent_id="recommendation_agent",
role=AgentRole.RECOMMENDATION,
bedrock_client=bedrock_client,
)
self.dynamodb_client = dynamodb_client
async def plan(self, task: str, context: AgentContext) -> list[ExecutionStep]:
return [
ExecutionStep(step_id=0, action="fetch_preferences", tool="dynamodb",
parameters={"user_id": context.user_id}),
ExecutionStep(step_id=1, action="generate_recommendations", tool="bedrock",
parameters={"request": task}),
]
async def execute_step(self, step: ExecutionStep, context: AgentContext) -> Any:
if step.action == "fetch_preferences":
return context.user_preferences
elif step.action == "generate_recommendations":
prefs = self.execution_history[0].result if self.execution_history else {}
prompt = self.RECOMMEND_PROMPT.format(
request=step.parameters["request"],
preferences=json.dumps(prefs, ensure_ascii=False),
history="[]",
)
response = await self.bedrock_client.invoke(
model_id="anthropic.claude-3-sonnet-20240229-v1:0",
prompt=prompt,
max_tokens=800,
)
return response["text"]
return None
async def reflect(self, results: list[Any], context: AgentContext) -> tuple[bool, str]:
if results and len(results) >= 2 and results[-1]:
return True, "Recommendations generated"
return False, "Failed to generate recommendations"
Planner-Executor Pattern
graph TB
subgraph Planner["Planner Agent"]
PG[Plan Generator<br/>Sonnet — complex reasoning]
PS[Plan Store<br/>Versioned plans in DynamoDB]
PR[Plan Reviser<br/>Adapt on failure]
end
subgraph Executor["Executor Agent"]
SE[Step Executor<br/>Haiku — fast actions]
SR[Step Reporter<br/>Status updates]
SV[Step Validator<br/>Output checks]
end
subgraph PlanLifecycle["Plan Lifecycle"]
P1[Draft Plan] --> P2[Validate Plan]
P2 --> P3[Execute Steps]
P3 --> P4[Report Results]
P4 --> P5{Complete?}
P5 -->|No| P6[Revise Plan]
P6 --> P3
P5 -->|Yes| P7[Final Response]
end
PG --> P1
PS --> P2
SE --> P3
SR --> P4
SV --> P5
PR --> P6
style PG fill:#339af0,color:#fff
style SE fill:#51cf66,color:#fff
Planner-Executor Implementation
"""
Planner-Executor pattern for complex multi-step manga queries.
Planner uses Sonnet for reasoning; Executor uses Haiku for speed.
"""
@dataclass
class ExecutionPlan:
"""A multi-step plan generated by the Planner agent."""
plan_id: str
goal: str
steps: list[ExecutionStep]
version: int = 1
status: str = "draft" # draft, validated, executing, completed, failed
created_at: float = field(default_factory=time.time)
class PlannerAgent:
"""
Generates execution plans using Sonnet for complex reasoning.
Plans are structured, versioned, and revisable.
"""
PLAN_PROMPT = """You are a planning agent for a manga store chatbot. Given the user's
complex query, create a step-by-step execution plan.
User query: {query}
Available tools: {tools}
Constraints: Must complete within {budget_ms}ms, max {max_steps} steps.
Return a JSON array of steps, each with:
- "step_id": integer
- "action": string description
- "tool": tool name to use
- "parameters": dict of parameters
- "depends_on": array of step_ids this depends on (for parallel execution)
Return only the JSON array."""
def __init__(self, bedrock_client: Any, available_tools: list[str]):
self.bedrock_client = bedrock_client
self.available_tools = available_tools
async def generate_plan(
self, query: str, context: AgentContext
) -> ExecutionPlan:
"""Generate an execution plan for a complex query."""
prompt = self.PLAN_PROMPT.format(
query=query,
tools=", ".join(self.available_tools),
budget_ms=int(context.execution_budget_ms),
max_steps=5,
)
response = await self.bedrock_client.invoke(
model_id="anthropic.claude-3-sonnet-20240229-v1:0",
prompt=prompt,
max_tokens=500,
)
try:
steps_data = json.loads(response["text"])
steps = [
ExecutionStep(
step_id=s["step_id"],
action=s["action"],
tool=s.get("tool"),
parameters=s.get("parameters", {}),
)
for s in steps_data
]
except (json.JSONDecodeError, KeyError):
# Fallback: single-step plan
steps = [ExecutionStep(
step_id=0,
action="direct_response",
tool="bedrock",
parameters={"query": query},
)]
return ExecutionPlan(
plan_id=str(uuid.uuid4())[:8],
goal=query,
steps=steps,
status="validated",
)
async def revise_plan(
self,
plan: ExecutionPlan,
failed_step: ExecutionStep,
error: str,
context: AgentContext,
) -> ExecutionPlan:
"""Revise a plan after a step failure."""
remaining_steps = [
s for s in plan.steps if s.status == "pending"
]
prompt = (
f"Step {failed_step.step_id} failed: {error}. "
f"Original goal: {plan.goal}. "
f"Remaining steps: {len(remaining_steps)}. "
f"Budget remaining: {context.execution_budget_ms}ms. "
"Generate an alternative approach. Return JSON array of steps."
)
response = await self.bedrock_client.invoke(
model_id="anthropic.claude-3-haiku-20240307-v1:0",
prompt=prompt,
max_tokens=300,
)
try:
new_steps = json.loads(response["text"])
plan.steps = [
ExecutionStep(step_id=s["step_id"], action=s["action"], tool=s.get("tool"))
for s in new_steps
]
except (json.JSONDecodeError, KeyError):
pass # Keep existing remaining steps
plan.version += 1
plan.status = "validated"
return plan
class ExecutorAgent:
"""
Executes plan steps using Haiku for speed.
Reports progress and validates step outputs.
"""
def __init__(
self,
bedrock_client: Any,
tool_handlers: dict[str, Callable],
):
self.bedrock_client = bedrock_client
self.tool_handlers = tool_handlers
async def execute_plan(
self, plan: ExecutionPlan, context: AgentContext
) -> list[Any]:
"""Execute all steps in a plan, respecting dependencies."""
plan.status = "executing"
results: dict[int, Any] = {}
# Group steps by dependency level for parallel execution
levels = self._topological_sort(plan.steps)
for level in levels:
if context.execution_budget_ms <= 0:
logger.warning("Plan execution budget exhausted")
break
# Execute independent steps in parallel
tasks = [
self._execute_single_step(step, results, context)
for step in level
]
level_results = await asyncio.gather(*tasks, return_exceptions=True)
for step, result in zip(level, level_results):
if isinstance(result, Exception):
step.status = "failed"
step.result = str(result)
results[step.step_id] = None
else:
results[step.step_id] = result
plan.status = "completed"
return [results.get(s.step_id) for s in plan.steps]
async def _execute_single_step(
self,
step: ExecutionStep,
prior_results: dict[int, Any],
context: AgentContext,
) -> Any:
"""Execute a single plan step."""
step.status = "running"
start = time.monotonic()
handler = self.tool_handlers.get(step.tool or "")
if handler:
result = await handler(step.parameters, prior_results)
else:
# Default: use Bedrock to handle the step
prompt = f"Execute action: {step.action}. Parameters: {json.dumps(step.parameters)}"
response = await self.bedrock_client.invoke(
model_id="anthropic.claude-3-haiku-20240307-v1:0",
prompt=prompt,
max_tokens=500,
)
result = response["text"]
step.status = "completed"
step.latency_ms = (time.monotonic() - start) * 1000
step.result = result
return result
def _topological_sort(
self, steps: list[ExecutionStep]
) -> list[list[ExecutionStep]]:
"""Sort steps into dependency levels for parallel execution."""
# Simple: all steps are sequential for now
return [[step] for step in steps]
Agent Coordination Patterns
graph LR
subgraph Patterns["Coordination Patterns"]
SP[Supervisor<br/>Central coordinator]
PP[Peer-to-Peer<br/>Direct messaging]
HP[Hierarchical<br/>Multi-level delegation]
BP[Blackboard<br/>Shared state]
end
subgraph UseCases["MangaAssist Use Cases"]
U1[Simple Query<br/>→ Single agent]
U2[Multi-Intent<br/>→ Parallel agents]
U3[Complex Research<br/>→ Plan-Execute]
U4[Conversational<br/>→ Context chain]
end
SP --> U1
SP --> U2
HP --> U3
BP --> U4
style SP fill:#339af0,color:#fff
style HP fill:#51cf66,color:#fff
Message Bus for Agent Communication
"""
Agent message bus for MangaAssist multi-agent coordination.
Enables asynchronous message passing between agents.
"""
class AgentMessageBus:
"""
Central message bus for agent-to-agent communication.
Supports direct messaging, broadcast, and topic-based pub/sub.
"""
def __init__(self):
self._queues: dict[str, asyncio.Queue] = {}
self._subscribers: dict[str, list[str]] = {} # topic -> [agent_ids]
self._message_log: list[AgentMessage] = []
def register_agent(self, agent_id: str) -> None:
"""Register an agent with the message bus."""
if agent_id not in self._queues:
self._queues[agent_id] = asyncio.Queue()
def subscribe(self, agent_id: str, topic: str) -> None:
"""Subscribe an agent to a topic."""
if topic not in self._subscribers:
self._subscribers[topic] = []
if agent_id not in self._subscribers[topic]:
self._subscribers[topic].append(agent_id)
async def send(self, message: AgentMessage) -> None:
"""Send a message to a specific agent."""
self._message_log.append(message)
queue = self._queues.get(message.receiver)
if queue:
await queue.put(message)
else:
logger.warning("No queue for agent '%s'", message.receiver)
async def broadcast(self, sender: str, topic: str, content: Any) -> None:
"""Broadcast a message to all subscribers of a topic."""
subscribers = self._subscribers.get(topic, [])
for sub_id in subscribers:
if sub_id != sender:
msg = AgentMessage(
sender=sender,
receiver=sub_id,
content=content,
message_type="broadcast",
)
await self.send(msg)
async def receive(self, agent_id: str, timeout: float = 5.0) -> AgentMessage | None:
"""Receive a message for a specific agent with timeout."""
queue = self._queues.get(agent_id)
if not queue:
return None
try:
return await asyncio.wait_for(queue.get(), timeout=timeout)
except asyncio.TimeoutError:
return None
def get_message_log(self) -> list[dict[str, Any]]:
"""Get the full message log for debugging."""
return [
{
"sender": m.sender,
"receiver": m.receiver,
"type": m.message_type,
"correlation_id": m.correlation_id,
"timestamp": m.timestamp,
"content_preview": str(m.content)[:100],
}
for m in self._message_log
]
Key Takeaways
| # |
Takeaway |
MangaAssist Application |
| 1 |
The Supervisor pattern is the primary coordination model for MangaAssist — a central agent classifies intent and routes to specialists. |
Haiku classifies intent in < 500ms; the supervisor delegates to Search, Recommendation, Order, or Conversation agents. |
| 2 |
Agent budgets must cascade — the supervisor reserves time for aggregation, passes the remainder to specialists. |
The 3-second total budget becomes: 500ms intent classification + 2000ms specialist execution + 500ms WebSocket delivery. |
| 3 |
Planner-Executor separates reasoning from action — Sonnet plans, Haiku executes. |
Complex multi-step queries (e.g., "find a manga like Attack on Titan and add volume 1 to my cart") get a Sonnet plan, then each step runs on Haiku. |
| 4 |
Agent reflection enables self-correction but must be bounded to prevent infinite loops. |
Reflection runs once after execution; if it fails, the agent returns a partial response rather than re-planning indefinitely. |
| 5 |
Message buses decouple agents so they can be developed, tested, and scaled independently. |
Each agent in the squad has its own message queue; the supervisor orchestrates via async message passing. |
| 6 |
Parallel execution of independent sub-tasks is critical for complex queries within the 3-second budget. |
A "search + recommend" complex query runs both agents in parallel via asyncio.gather(), cutting total time nearly in half. |
| 7 |
Agent state machines make lifecycle management explicit and debuggable — every agent transitions through PLAN → EXECUTE → REFLECT → RESPOND. |
CloudWatch logs include agent state transitions, making it easy to trace where a query stalled or failed. |