Autonomous Agent Architecture for Intelligent Systems
MangaAssist context: JP Manga store chatbot on AWS — Bedrock Claude 3 (Sonnet at $3/$15 per 1M tokens input/output, Haiku at $0.25/$1.25), OpenSearch Serverless (vector store), DynamoDB (sessions/products), ECS Fargate (orchestrator), API Gateway WebSocket, ElastiCache Redis. Target: useful answer in under 3 seconds, 1M messages/day scale.
Skill Mapping
| Dimension | Detail |
|---|---|
| Certification | AWS AIP-C01 — AI Practitioner |
| Domain | 2 — Development and Implementation of GenAI Solutions |
| Task | 2.1 — Develop agentic AI solutions using AWS services |
| Skill | 2.1.1 — Develop intelligent autonomous systems with appropriate memory and state management capabilities (for example, by using Strands Agents and AWS Agent Squad for multi-agent systems, MCP for agent-tool interactions) |
| This File | Autonomous agent architecture: Strands Agents, AWS Agent Squad multi-agent orchestration, memory patterns, and state management |
Skill Scope Statement
This skill requires the practitioner to architect and implement intelligent autonomous systems that can reason, plan, and execute multi-step tasks. The core capabilities include: configuring Strands Agents with tools, memory, and system prompts; orchestrating multi-agent systems with AWS Agent Squad for specialized task delegation; implementing short-term memory (Redis) and long-term memory (DynamoDB) patterns; managing conversation state, task execution state, and environment context; and integrating the MCP protocol for standardized agent-tool interactions. MangaAssist demonstrates every pattern at 1M messages/day scale.
Mind Map — Autonomous Agent Architecture
mindmap
root((Skill 2.1.1<br/>Autonomous Agents))
Strands Agents Architecture
Agent Definition
System Prompt Configuration
Tool Binding
Model Selection (Sonnet vs Haiku)
Agent Lifecycle
Initialize → Plan → Act → Observe → Reflect
ReAct Loop Execution
Tool Call Dispatch
Built-in Capabilities
Structured Output Parsing
Streaming Response Support
Conversation History Management
AWS Agent Squad
Multi-Agent Coordination
Orchestrator Agent (Supervisor)
Specialized Sub-Agents
Agent Registry and Discovery
Routing Strategies
Classifier-Based Routing
Intent-Based Dispatch
Confidence-Threshold Fallback
Sub-Agent Definitions
ProductSearchAgent
OrderStatusAgent
RecommendationAgent
MangaQAAgent
Agent Memory Patterns
Short-Term Session Memory
ElastiCache Redis
TTL-Based Expiration (30 min)
Sliding Window Context
Token-Budget Trimming
Long-Term Persistent Memory
DynamoDB Sessions Table
User Preference History
Interaction Summaries
Cross-Session Continuity
Memory Retrieval
Recency-Weighted Recall
Semantic Similarity Search
Importance-Scored Filtering
State Management
Conversation State
Turn History
Active Intent Tracking
Slot Filling Progress
Task Execution State
Step Queue and Progress
Tool Call Results Cache
Retry and Rollback State
Environment Context
User Profile and Preferences
Cart and Session Metadata
Time-of-Day and Locale
MCP Protocol
Tool Definitions (JSON Schema)
Parameter Validation
Response Envelope Format
Error Propagation
Discovery Endpoint
Architecture Flowchart — MangaAssist Agent Request Lifecycle
flowchart TB
subgraph Client["Client Layer"]
USER["User via WebSocket"]
end
subgraph Gateway["API Gateway"]
WS["WebSocket API<br/>$connect / $default / $disconnect"]
end
subgraph Orchestrator["ECS Fargate — Agent Orchestrator"]
RECV["Message Receiver"]
SQUAD["AWS Agent Squad<br/>Supervisor Agent"]
ROUTER["Intent Classifier<br/>(Haiku — $0.25/1M)"]
subgraph SubAgents["Specialized Sub-Agents"]
PSA["ProductSearchAgent<br/>Strands Agent"]
OSA["OrderStatusAgent<br/>Strands Agent"]
RCA["RecommendationAgent<br/>Strands Agent"]
MQA["MangaQAAgent<br/>Strands Agent"]
end
MERGE["Response Merger"]
end
subgraph Memory["Memory Layer"]
REDIS["ElastiCache Redis<br/>Session Memory<br/>TTL 30 min"]
DDB["DynamoDB<br/>manga_sessions table<br/>Long-Term Memory"]
end
subgraph Tools["Tool Layer (MCP)"]
SEARCH_TOOL["OpenSearch Serverless<br/>Vector Search Tool"]
ORDER_TOOL["DynamoDB<br/>Order Lookup Tool"]
RECO_TOOL["Bedrock Sonnet<br/>Recommendation Tool"]
KB_TOOL["Bedrock KB<br/>Manga Knowledge Tool"]
end
subgraph Models["Foundation Models"]
SONNET["Claude 3 Sonnet<br/>Complex Reasoning<br/>$3 / $15 per 1M"]
HAIKU["Claude 3 Haiku<br/>Classification & Simple<br/>$0.25 / $1.25 per 1M"]
end
USER -->|WebSocket message| WS
WS -->|Lambda authorizer + route| RECV
RECV -->|Load session state| REDIS
RECV -->|Load user profile| DDB
RECV -->|User query + context| SQUAD
SQUAD -->|Classify intent| ROUTER
ROUTER -->|product_search| PSA
ROUTER -->|order_status| OSA
ROUTER -->|recommendation| RCA
ROUTER -->|manga_knowledge| MQA
PSA -->|MCP tool call| SEARCH_TOOL
OSA -->|MCP tool call| ORDER_TOOL
RCA -->|MCP tool call| RECO_TOOL
MQA -->|MCP tool call| KB_TOOL
PSA -->|Reasoning| SONNET
OSA -->|Simple lookup| HAIKU
RCA -->|Reasoning| SONNET
MQA -->|Simple Q&A| HAIKU
PSA --> MERGE
OSA --> MERGE
RCA --> MERGE
MQA --> MERGE
MERGE -->|Save turn| REDIS
MERGE -->|Persist summary| DDB
MERGE -->|Stream response| WS
WS -->|WebSocket frame| USER
1. Strands Agents — Core Agent Configuration for MangaAssist
1.1 What Is Strands Agents?
Strands Agents is an open-source Python SDK from AWS that implements the ReAct (Reason + Act) loop for building autonomous agents. Each agent has a system prompt, a set of tools, a memory buffer, and a model provider. The agent iterates: it reasons about the user query, decides which tool to call, observes the result, and repeats until it can produce a final answer.
For MangaAssist, each sub-agent is a Strands Agent with domain-specific tools and prompts.
1.2 Agent Definition — ProductSearchAgent
"""
MangaAssist ProductSearchAgent — Strands Agent configuration.
Handles all product discovery, search, and catalog queries.
"""
import json
import logging
from typing import Any
from strands import Agent
from strands.models.bedrock import BedrockModel
from strands.tools import tool
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)
# ---------------------------------------------------------------------------
# Tool Definitions (exposed via MCP to the agent)
# ---------------------------------------------------------------------------
@tool
def search_manga_catalog(
query: str,
genre: str = "",
author: str = "",
max_results: int = 5,
) -> dict[str, Any]:
"""
Search the MangaAssist product catalog using OpenSearch Serverless
vector search. Returns ranked manga titles with metadata.
Args:
query: Natural-language search query from the user.
genre: Optional genre filter (e.g., 'shonen', 'seinen', 'shojo').
author: Optional author name filter.
max_results: Maximum number of results to return (1-20).
Returns:
dict with 'results' list containing manga items and 'total_hits' count.
"""
import boto3
from opensearchpy import OpenSearch, RequestsHttpConnection
from requests_aws4auth import AWS4Auth
# Build the embedding for semantic search
bedrock_runtime = boto3.client("bedrock-runtime", region_name="us-east-1")
embedding_response = bedrock_runtime.invoke_model(
modelId="amazon.titan-embed-text-v2:0",
contentType="application/json",
accept="application/json",
body=json.dumps({"inputText": query}),
)
embedding = json.loads(embedding_response["body"].read())["embedding"]
# Connect to OpenSearch Serverless
credentials = boto3.Session().get_credentials()
aws_auth = AWS4Auth(
credentials.access_key,
credentials.secret_key,
"us-east-1",
"aoss",
session_token=credentials.token,
)
client = OpenSearch(
hosts=[{"host": "manga-search.us-east-1.aoss.amazonaws.com", "port": 443}],
http_auth=aws_auth,
use_ssl=True,
connection_class=RequestsHttpConnection,
)
# Build the hybrid query: kNN vector + keyword filters
must_clauses = []
if genre:
must_clauses.append({"term": {"genre.keyword": genre.lower()}})
if author:
must_clauses.append({"match": {"author": author}})
search_body = {
"size": min(max_results, 20),
"query": {
"bool": {
"must": must_clauses,
"should": [
{
"knn": {
"title_embedding": {
"vector": embedding,
"k": max_results,
}
}
}
],
}
},
"_source": [
"title", "author", "genre", "price", "volumes",
"rating", "synopsis", "cover_url", "in_stock",
],
}
response = client.search(index="manga-catalog", body=search_body)
hits = response.get("hits", {}).get("hits", [])
results = []
for hit in hits:
source = hit["_source"]
source["relevance_score"] = hit.get("_score", 0.0)
results.append(source)
return {
"results": results,
"total_hits": response["hits"]["total"]["value"],
"query_used": query,
}
@tool
def get_manga_details(title_id: str) -> dict[str, Any]:
"""
Retrieve full details for a specific manga title from DynamoDB.
Args:
title_id: The unique identifier for the manga title.
Returns:
dict with complete manga metadata including price, availability,
volumes, author biography, and related titles.
"""
import boto3
from boto3.dynamodb.conditions import Key
table = boto3.resource("dynamodb", region_name="us-east-1").Table(
"manga_products"
)
response = table.get_item(Key={"title_id": title_id})
if "Item" not in response:
return {"error": f"Manga title '{title_id}' not found", "found": False}
item = response["Item"]
# Convert Decimal types for JSON serialization
return {
"found": True,
"title": item["title"],
"author": item["author"],
"genre": item["genre"],
"price_usd": float(item["price_usd"]),
"volumes_available": int(item["volumes_available"]),
"total_volumes": int(item["total_volumes"]),
"rating": float(item["rating"]),
"synopsis": item["synopsis"],
"publisher": item["publisher"],
"in_stock": item["in_stock"],
"related_titles": item.get("related_titles", []),
}
# ---------------------------------------------------------------------------
# Agent Configuration
# ---------------------------------------------------------------------------
PRODUCT_SEARCH_SYSTEM_PROMPT = """You are the ProductSearchAgent for MangaAssist,
a Japanese manga store chatbot. Your role is to help customers find manga titles
they are looking for.
## Your Capabilities
- Search the manga catalog by title, genre, author, or description
- Retrieve detailed information about specific manga titles
- Provide availability and pricing information
- Suggest alternatives when a title is out of stock
## Behavioral Rules
1. Always search before answering — never guess about catalog availability.
2. Present results in a concise, scannable format.
3. Include price (USD) and stock status in every product mention.
4. If the user asks in Japanese, respond in Japanese.
5. Limit tool calls to 3 per turn to stay within the 3-second budget.
6. For ambiguous queries, ask one clarifying question rather than guessing.
## Response Format
Use markdown with bullet points. Include title, author, price, and stock status.
"""
# Model: use Sonnet for complex reasoning about product recommendations
sonnet_model = BedrockModel(
model_id="anthropic.claude-3-sonnet-20240229-v1:0",
region_name="us-east-1",
temperature=0.3,
max_tokens=1024,
)
product_search_agent = Agent(
model=sonnet_model,
system_prompt=PRODUCT_SEARCH_SYSTEM_PROMPT,
tools=[search_manga_catalog, get_manga_details],
# Memory: last 10 turns kept in context (short-term)
max_turns=10,
)
def handle_product_query(user_message: str, session_context: dict) -> str:
"""
Entry point for product search queries. Injects session context
and invokes the Strands Agent.
"""
# Prepend session context so the agent knows the user's history
context_prefix = ""
if session_context.get("recent_searches"):
recent = ", ".join(session_context["recent_searches"][-3:])
context_prefix = f"[User recently searched for: {recent}]\n\n"
if session_context.get("preferred_genres"):
genres = ", ".join(session_context["preferred_genres"])
context_prefix += f"[User prefers genres: {genres}]\n\n"
augmented_message = context_prefix + user_message
result = product_search_agent(augmented_message)
return str(result)
1.3 Agent Definition — OrderStatusAgent
"""
MangaAssist OrderStatusAgent — lightweight Strands Agent using Haiku
for fast, inexpensive order lookups.
"""
from strands import Agent
from strands.models.bedrock import BedrockModel
from strands.tools import tool
from typing import Any
@tool
def lookup_order(order_id: str) -> dict[str, Any]:
"""
Look up an order by ID from the DynamoDB orders table.
Args:
order_id: The customer's order ID (format: MNG-XXXXXXXX).
Returns:
dict with order status, items, shipping info, and estimated delivery.
"""
import boto3
table = boto3.resource("dynamodb", region_name="us-east-1").Table(
"manga_orders"
)
response = table.get_item(Key={"order_id": order_id})
if "Item" not in response:
return {"found": False, "error": f"Order {order_id} not found"}
item = response["Item"]
return {
"found": True,
"order_id": item["order_id"],
"status": item["status"],
"items": item["items"],
"total_usd": float(item["total_usd"]),
"shipping_method": item["shipping_method"],
"tracking_number": item.get("tracking_number", "Not yet assigned"),
"estimated_delivery": item.get("estimated_delivery", "Pending"),
"placed_at": item["placed_at"],
}
@tool
def list_recent_orders(user_id: str, limit: int = 5) -> dict[str, Any]:
"""
List a user's recent orders.
Args:
user_id: The authenticated user's ID.
limit: Number of recent orders to retrieve (1-10).
Returns:
dict with list of recent orders, each with order_id, status, date, total.
"""
import boto3
from boto3.dynamodb.conditions import Key
table = boto3.resource("dynamodb", region_name="us-east-1").Table(
"manga_orders"
)
response = table.query(
IndexName="user_id-placed_at-index",
KeyConditionExpression=Key("user_id").eq(user_id),
ScanIndexForward=False, # Most recent first
Limit=min(limit, 10),
)
orders = []
for item in response.get("Items", []):
orders.append({
"order_id": item["order_id"],
"status": item["status"],
"total_usd": float(item["total_usd"]),
"placed_at": item["placed_at"],
"item_count": len(item["items"]),
})
return {"orders": orders, "count": len(orders)}
ORDER_STATUS_SYSTEM_PROMPT = """You are the OrderStatusAgent for MangaAssist.
You help customers check their order status, track shipments, and view order history.
## Rules
1. Always verify the order ID format (MNG-XXXXXXXX) before lookup.
2. If the user doesn't provide an order ID, list their recent orders first.
3. Be empathetic if an order is delayed — offer to escalate.
4. Never reveal internal system details or database schemas.
5. Keep responses brief — order status should be 2-3 sentences max.
"""
# Use Haiku for order lookups — simple retrieval, no complex reasoning needed
haiku_model = BedrockModel(
model_id="anthropic.claude-3-haiku-20240307-v1:0",
region_name="us-east-1",
temperature=0.1,
max_tokens=512,
)
order_status_agent = Agent(
model=haiku_model,
system_prompt=ORDER_STATUS_SYSTEM_PROMPT,
tools=[lookup_order, list_recent_orders],
max_turns=5,
)
2. AWS Agent Squad — Multi-Agent Orchestration
2.1 What Is AWS Agent Squad?
AWS Agent Squad (formerly Multi-Agent Orchestrator) is an AWS framework for coordinating multiple specialized agents under a single supervisor. The supervisor classifies user intents, routes to the right sub-agent, and merges responses. It supports:
- Classifier-based routing: An LLM (Haiku) classifies the intent and picks the best agent.
- Agent registry: Sub-agents are registered with descriptions, and the classifier uses those descriptions to route.
- Conversation memory: Shared memory across agents so context is not lost during hand-offs.
2.2 MangaAssist Agent Squad Configuration
"""
MangaAssist AWS Agent Squad — Supervisor orchestrating four specialized agents.
Production configuration for ECS Fargate deployment.
"""
import logging
import time
from dataclasses import dataclass, field
from typing import Any
from multi_agent_orchestrator.orchestrator import MultiAgentOrchestrator
from multi_agent_orchestrator.classifiers import BedrockClassifier
from multi_agent_orchestrator.agents import (
BedrockLLMAgent,
BedrockLLMAgentOptions,
AgentResponse,
)
from multi_agent_orchestrator.storage import DynamoDbChatStorage
logger = logging.getLogger(__name__)
# ---------------------------------------------------------------------------
# Shared Memory Store (DynamoDB-backed for cross-session persistence)
# ---------------------------------------------------------------------------
chat_storage = DynamoDbChatStorage(
table_name="manga_agent_conversations",
region="us-east-1",
ttl_key="expires_at",
ttl_duration=86400, # 24-hour conversation retention
)
# ---------------------------------------------------------------------------
# Intent Classifier — Haiku for cost-efficiency at 1M msgs/day
# ---------------------------------------------------------------------------
# At 1M messages/day, classifier cost:
# Input: ~100 tokens/msg * 1M = 100M tokens * $0.25/1M = $25/day
# Output: ~20 tokens/msg * 1M = 20M tokens * $1.25/1M = $25/day
# Total classifier cost: ~$50/day
# ---------------------------------------------------------------------------
classifier = BedrockClassifier(
model_id="anthropic.claude-3-haiku-20240307-v1:0",
region="us-east-1",
inference_config={
"temperature": 0.0, # Deterministic classification
"maxTokens": 100,
},
)
# ---------------------------------------------------------------------------
# Sub-Agent Definitions
# ---------------------------------------------------------------------------
product_search_agent_config = BedrockLLMAgent(
BedrockLLMAgentOptions(
name="ProductSearchAgent",
description=(
"Handles all product discovery queries: searching manga titles, "
"browsing by genre or author, checking prices and availability, "
"finding specific manga volumes. Use for any question about "
"what manga is available or product details."
),
model_id="anthropic.claude-3-sonnet-20240229-v1:0",
region="us-east-1",
inference_config={
"temperature": 0.3,
"maxTokens": 1024,
},
streaming=True,
save_chat=True,
)
)
order_status_agent_config = BedrockLLMAgent(
BedrockLLMAgentOptions(
name="OrderStatusAgent",
description=(
"Handles order-related queries: checking order status, tracking "
"shipments, viewing order history, reporting delivery issues. "
"Use for any question about existing or past orders."
),
model_id="anthropic.claude-3-haiku-20240307-v1:0",
region="us-east-1",
inference_config={
"temperature": 0.1,
"maxTokens": 512,
},
streaming=False, # Order lookups are fast, no need to stream
save_chat=True,
)
)
recommendation_agent_config = BedrockLLMAgent(
BedrockLLMAgentOptions(
name="RecommendationAgent",
description=(
"Provides personalized manga recommendations based on user "
"preferences, reading history, and similar titles. Use when "
"the user asks for suggestions, 'what should I read next', "
"or wants similar manga to something they liked."
),
model_id="anthropic.claude-3-sonnet-20240229-v1:0",
region="us-east-1",
inference_config={
"temperature": 0.7, # Higher creativity for recommendations
"maxTokens": 1024,
},
streaming=True,
save_chat=True,
)
)
manga_qa_agent_config = BedrockLLMAgent(
BedrockLLMAgentOptions(
name="MangaQAAgent",
description=(
"Answers general knowledge questions about manga: plot summaries, "
"character information, author backgrounds, genre explanations, "
"manga vs anime differences. Use for informational questions "
"that don't involve searching the product catalog."
),
model_id="anthropic.claude-3-haiku-20240307-v1:0",
region="us-east-1",
inference_config={
"temperature": 0.3,
"maxTokens": 768,
},
streaming=True,
save_chat=True,
)
)
# ---------------------------------------------------------------------------
# Orchestrator Assembly
# ---------------------------------------------------------------------------
def build_manga_orchestrator() -> MultiAgentOrchestrator:
"""
Build and return the fully configured MangaAssist Agent Squad orchestrator.
Call this once at ECS task startup and reuse across requests.
"""
orchestrator = MultiAgentOrchestrator(
classifier=classifier,
storage=chat_storage,
options={
"LOG_AGENT_CHAT": True,
"LOG_CLASSIFIER_CHAT": True,
"LOG_CLASSIFIER_RAW_OUTPUT": False,
"LOG_CLASSIFIER_OUTPUT": True,
"LOG_EXECUTION_TIMES": True,
"MAX_MESSAGE_PAIRS_PER_AGENT": 10,
},
)
orchestrator.add_agent(product_search_agent_config)
orchestrator.add_agent(order_status_agent_config)
orchestrator.add_agent(recommendation_agent_config)
orchestrator.add_agent(manga_qa_agent_config)
logger.info("MangaAssist Agent Squad initialized with 4 sub-agents")
return orchestrator
# ---------------------------------------------------------------------------
# Request Handler — called per WebSocket message
# ---------------------------------------------------------------------------
@dataclass
class AgentRequestContext:
"""Context passed with every agent request."""
user_id: str
session_id: str
locale: str = "en-US"
timestamp: float = field(default_factory=time.time)
user_tier: str = "standard" # standard | premium | vip
cart_items: list[str] = field(default_factory=list)
async def handle_user_message(
orchestrator: MultiAgentOrchestrator,
message: str,
context: AgentRequestContext,
) -> AgentResponse:
"""
Route a user message through the Agent Squad and return the response.
The orchestrator:
1. Classifies the intent using Haiku ($0.25/1M tokens)
2. Routes to the best sub-agent
3. Sub-agent reasons and calls tools as needed
4. Returns the merged response
"""
start_time = time.time()
response = await orchestrator.route_request(
user_input=message,
user_id=context.user_id,
session_id=context.session_id,
additional_params={
"locale": context.locale,
"user_tier": context.user_tier,
"cart_items": context.cart_items,
},
)
elapsed_ms = (time.time() - start_time) * 1000
logger.info(
"Agent response in %.0f ms | agent=%s | user=%s | session=%s",
elapsed_ms,
response.metadata.get("agent_name", "unknown"),
context.user_id,
context.session_id,
)
# Alert if exceeding 3-second SLA
if elapsed_ms > 3000:
logger.warning(
"SLA BREACH: Response took %.0f ms (limit: 3000 ms) | agent=%s",
elapsed_ms,
response.metadata.get("agent_name", "unknown"),
)
return response
2.3 Multi-Agent Routing Logic — How the Orchestrator Decides
The Agent Squad classifier uses Haiku to analyze the user message plus recent conversation history, then selects the best sub-agent. The routing decision is based on:
"""
MangaAssist intent classification and routing logic.
Demonstrates the internal classifier prompt and fallback strategy.
"""
from dataclasses import dataclass
from enum import Enum
class AgentIntent(str, Enum):
PRODUCT_SEARCH = "product_search"
ORDER_STATUS = "order_status"
RECOMMENDATION = "recommendation"
MANGA_QA = "manga_qa"
AMBIGUOUS = "ambiguous"
@dataclass
class ClassificationResult:
intent: AgentIntent
confidence: float
selected_agent: str
reasoning: str
# The classifier prompt template used internally by Agent Squad
CLASSIFIER_PROMPT_TEMPLATE = """You are a routing classifier for MangaAssist,
a Japanese manga store chatbot. Given the user's message and conversation
history, determine which specialized agent should handle this request.
Available agents:
{agent_descriptions}
Conversation history:
{conversation_history}
User message: {user_message}
Respond with ONLY the agent name and confidence (0.0-1.0).
Format: AgentName|confidence
Example: ProductSearchAgent|0.95
"""
class MangaAssistRouter:
"""
Custom routing logic that wraps Agent Squad's classifier with
MangaAssist-specific overrides and fallback strategies.
"""
# Keywords that bypass the classifier for instant routing
FAST_ROUTE_PATTERNS = {
AgentIntent.ORDER_STATUS: [
"order", "tracking", "shipment", "delivery", "MNG-",
"when will", "where is my",
],
AgentIntent.PRODUCT_SEARCH: [
"how much", "price", "in stock", "available", "buy",
"volumes", "find", "search",
],
AgentIntent.RECOMMENDATION: [
"recommend", "suggest", "similar to", "like", "should I read",
"what's good",
],
}
CONFIDENCE_THRESHOLD = 0.70 # Below this, use fallback strategy
def __init__(self):
self._routing_metrics = {
intent.value: 0 for intent in AgentIntent
}
def fast_route(self, message: str) -> ClassificationResult | None:
"""
Pattern-based fast routing for obvious intents.
Avoids a classifier LLM call (~50ms savings + cost savings).
"""
message_lower = message.lower()
for intent, patterns in self.FAST_ROUTE_PATTERNS.items():
for pattern in patterns:
if pattern.lower() in message_lower:
self._routing_metrics[intent.value] += 1
return ClassificationResult(
intent=intent,
confidence=0.90,
selected_agent=self._intent_to_agent(intent),
reasoning=f"Fast-route match on pattern: '{pattern}'",
)
return None
def apply_fallback_strategy(
self,
classification: ClassificationResult,
conversation_history: list[dict],
) -> ClassificationResult:
"""
If the classifier confidence is below threshold, apply fallbacks:
1. Use the same agent as the last turn (conversation continuity)
2. Default to ProductSearchAgent (most common intent)
"""
if classification.confidence >= self.CONFIDENCE_THRESHOLD:
return classification
# Fallback 1: Continue with the same agent as last turn
if conversation_history:
last_agent = conversation_history[-1].get("agent_name")
if last_agent:
return ClassificationResult(
intent=classification.intent,
confidence=0.75,
selected_agent=last_agent,
reasoning=(
f"Low confidence ({classification.confidence:.2f}), "
f"continuing with last agent: {last_agent}"
),
)
# Fallback 2: Default to ProductSearchAgent
return ClassificationResult(
intent=AgentIntent.PRODUCT_SEARCH,
confidence=0.60,
selected_agent="ProductSearchAgent",
reasoning="Low confidence, defaulting to ProductSearchAgent",
)
@staticmethod
def _intent_to_agent(intent: AgentIntent) -> str:
mapping = {
AgentIntent.PRODUCT_SEARCH: "ProductSearchAgent",
AgentIntent.ORDER_STATUS: "OrderStatusAgent",
AgentIntent.RECOMMENDATION: "RecommendationAgent",
AgentIntent.MANGA_QA: "MangaQAAgent",
}
return mapping.get(intent, "ProductSearchAgent")
3. Agent Memory Implementation
3.1 Short-Term Memory — ElastiCache Redis
Short-term memory holds the current conversation context. It must be fast (sub-millisecond reads), automatically expire (TTL), and support the token budget for the LLM context window.
"""
MangaAssist short-term session memory using ElastiCache Redis.
Handles turn history, sliding window, and token-budget trimming.
"""
import json
import hashlib
import logging
import time
from dataclasses import dataclass, field, asdict
from typing import Any
import redis
import tiktoken
logger = logging.getLogger(__name__)
# Token counting for Claude 3 context window management
# Claude 3 Sonnet context: 200K tokens, but we budget 4K for conversation history
ENCODER = tiktoken.encoding_for_model("gpt-4") # Approximate for Claude
MAX_HISTORY_TOKENS = 4096
MAX_TURNS = 20
SESSION_TTL_SECONDS = 1800 # 30 minutes sliding window
@dataclass
class ConversationTurn:
role: str # "user" or "assistant"
content: str
timestamp: float
agent_name: str = ""
tool_calls: list[dict] = field(default_factory=list)
token_count: int = 0
def __post_init__(self):
if self.token_count == 0:
self.token_count = len(ENCODER.encode(self.content))
class RedisSessionMemory:
"""
Short-term conversation memory backed by ElastiCache Redis.
Design decisions:
- Each session is a Redis list of JSON-serialized turns.
- TTL is refreshed on every read/write (sliding window).
- Token budget trimming removes oldest turns when context is too large.
- Separate hash for session metadata (user_id, locale, cart, etc.).
"""
def __init__(
self,
redis_host: str = "manga-redis.xxxxx.use1.cache.amazonaws.com",
redis_port: int = 6379,
redis_db: int = 0,
use_ssl: bool = True,
):
self._client = redis.Redis(
host=redis_host,
port=redis_port,
db=redis_db,
ssl=use_ssl,
decode_responses=True,
socket_connect_timeout=2,
socket_timeout=1,
retry_on_timeout=True,
)
logger.info("Redis session memory connected to %s:%d", redis_host, redis_port)
def _session_key(self, session_id: str) -> str:
return f"manga:session:{session_id}:turns"
def _metadata_key(self, session_id: str) -> str:
return f"manga:session:{session_id}:meta"
async def add_turn(
self, session_id: str, turn: ConversationTurn
) -> None:
"""Append a turn and enforce token budget."""
key = self._session_key(session_id)
self._client.rpush(key, json.dumps(asdict(turn)))
self._client.expire(key, SESSION_TTL_SECONDS) # Refresh TTL
# Enforce max turns
current_length = self._client.llen(key)
if current_length > MAX_TURNS:
# Remove oldest turns beyond the limit
excess = current_length - MAX_TURNS
for _ in range(excess):
self._client.lpop(key)
# Enforce token budget
await self._trim_to_token_budget(session_id)
async def get_history(self, session_id: str) -> list[ConversationTurn]:
"""Retrieve all turns for a session, refreshing TTL."""
key = self._session_key(session_id)
self._client.expire(key, SESSION_TTL_SECONDS) # Sliding window
raw_turns = self._client.lrange(key, 0, -1)
turns = []
for raw in raw_turns:
data = json.loads(raw)
turns.append(ConversationTurn(**data))
return turns
async def get_formatted_history(self, session_id: str) -> list[dict]:
"""Return history in the format expected by Claude's messages API."""
turns = await self.get_history(session_id)
return [
{"role": turn.role, "content": turn.content}
for turn in turns
]
async def _trim_to_token_budget(self, session_id: str) -> None:
"""Remove oldest turns until total tokens fit within budget."""
key = self._session_key(session_id)
raw_turns = self._client.lrange(key, 0, -1)
total_tokens = 0
turns_data = []
for raw in raw_turns:
data = json.loads(raw)
total_tokens += data.get("token_count", 0)
turns_data.append(data)
while total_tokens > MAX_HISTORY_TOKENS and len(turns_data) > 2:
removed = turns_data.pop(0)
total_tokens -= removed.get("token_count", 0)
self._client.lpop(key)
logger.debug(
"Session %s: %d turns, %d tokens (budget: %d)",
session_id, len(turns_data), total_tokens, MAX_HISTORY_TOKENS,
)
async def set_metadata(
self, session_id: str, metadata: dict[str, Any]
) -> None:
"""Store session metadata (user profile, locale, cart, etc.)."""
key = self._metadata_key(session_id)
self._client.hset(key, mapping={
k: json.dumps(v) if isinstance(v, (dict, list)) else str(v)
for k, v in metadata.items()
})
self._client.expire(key, SESSION_TTL_SECONDS)
async def get_metadata(self, session_id: str) -> dict[str, Any]:
"""Retrieve session metadata."""
key = self._metadata_key(session_id)
self._client.expire(key, SESSION_TTL_SECONDS)
raw = self._client.hgetall(key)
result = {}
for k, v in raw.items():
try:
result[k] = json.loads(v)
except (json.JSONDecodeError, TypeError):
result[k] = v
return result
async def clear_session(self, session_id: str) -> None:
"""Delete all data for a session."""
self._client.delete(
self._session_key(session_id),
self._metadata_key(session_id),
)
3.2 Long-Term Memory — DynamoDB Persistent State
Long-term memory persists across sessions. It stores user preferences, interaction summaries, and historical context for personalization.
"""
MangaAssist long-term memory using DynamoDB.
Stores user preferences, interaction summaries, and cross-session context.
"""
import json
import logging
import time
import uuid
from dataclasses import dataclass, field
from decimal import Decimal
from typing import Any
import boto3
from boto3.dynamodb.conditions import Key, Attr
logger = logging.getLogger(__name__)
@dataclass
class UserMemoryRecord:
"""A single memory record for a user."""
memory_id: str = field(default_factory=lambda: str(uuid.uuid4()))
user_id: str = ""
memory_type: str = "" # "preference", "interaction_summary", "feedback"
content: str = ""
metadata: dict[str, Any] = field(default_factory=dict)
importance_score: float = 0.5 # 0.0 - 1.0
created_at: float = field(default_factory=time.time)
accessed_at: float = field(default_factory=time.time)
access_count: int = 0
ttl: int = 0 # 0 = no expiry
class DynamoDBLongTermMemory:
"""
Long-term user memory backed by DynamoDB.
Table schema:
PK: user_id (String)
SK: memory_id (String)
GSI1: user_id + memory_type (for filtered queries)
GSI2: user_id + importance_score (for ranked retrieval)
Capacity: On-demand (pay-per-request) for unpredictable access patterns.
"""
def __init__(
self,
table_name: str = "manga_user_memory",
region: str = "us-east-1",
):
self._table = boto3.resource(
"dynamodb", region_name=region
).Table(table_name)
self._table_name = table_name
logger.info("DynamoDB long-term memory connected: %s", table_name)
async def store_memory(self, record: UserMemoryRecord) -> None:
"""Store or update a memory record."""
item = {
"user_id": record.user_id,
"memory_id": record.memory_id,
"memory_type": record.memory_type,
"content": record.content,
"metadata": json.loads(json.dumps(record.metadata, default=str)),
"importance_score": Decimal(str(record.importance_score)),
"created_at": Decimal(str(record.created_at)),
"accessed_at": Decimal(str(record.accessed_at)),
"access_count": record.access_count,
}
if record.ttl > 0:
item["expires_at"] = int(time.time()) + record.ttl
self._table.put_item(Item=item)
logger.debug("Stored memory %s for user %s", record.memory_id, record.user_id)
async def recall_memories(
self,
user_id: str,
memory_type: str | None = None,
limit: int = 10,
min_importance: float = 0.0,
) -> list[UserMemoryRecord]:
"""
Retrieve memories for a user, optionally filtered by type and importance.
Results are ordered by importance_score descending.
"""
if memory_type:
response = self._table.query(
IndexName="user_id-memory_type-index",
KeyConditionExpression=(
Key("user_id").eq(user_id) &
Key("memory_type").eq(memory_type)
),
FilterExpression=Attr("importance_score").gte(
Decimal(str(min_importance))
),
Limit=limit,
)
else:
response = self._table.query(
KeyConditionExpression=Key("user_id").eq(user_id),
FilterExpression=Attr("importance_score").gte(
Decimal(str(min_importance))
),
ScanIndexForward=False,
Limit=limit,
)
records = []
for item in response.get("Items", []):
record = UserMemoryRecord(
memory_id=item["memory_id"],
user_id=item["user_id"],
memory_type=item["memory_type"],
content=item["content"],
metadata=item.get("metadata", {}),
importance_score=float(item["importance_score"]),
created_at=float(item["created_at"]),
accessed_at=float(item["accessed_at"]),
access_count=int(item.get("access_count", 0)),
)
records.append(record)
# Update access tracking
self._table.update_item(
Key={"user_id": user_id, "memory_id": item["memory_id"]},
UpdateExpression=(
"SET accessed_at = :now, access_count = access_count + :one"
),
ExpressionAttributeValues={
":now": Decimal(str(time.time())),
":one": 1,
},
)
# Sort by importance descending
records.sort(key=lambda r: r.importance_score, reverse=True)
return records[:limit]
async def store_interaction_summary(
self,
user_id: str,
session_id: str,
summary: str,
topics: list[str],
sentiment: str,
) -> None:
"""
Store a compressed summary of a conversation session.
Called when a session ends or times out.
"""
record = UserMemoryRecord(
user_id=user_id,
memory_type="interaction_summary",
content=summary,
metadata={
"session_id": session_id,
"topics": topics,
"sentiment": sentiment,
},
importance_score=0.6,
ttl=90 * 86400, # 90 days retention
)
await self.store_memory(record)
async def store_user_preference(
self,
user_id: str,
preference_key: str,
preference_value: Any,
) -> None:
"""
Store or update a user preference (e.g., favorite genre, language).
Preferences have high importance and no expiry.
"""
# Check if preference already exists
existing = await self.recall_memories(
user_id=user_id,
memory_type="preference",
limit=50,
)
for record in existing:
if record.metadata.get("preference_key") == preference_key:
# Update existing preference
record.content = str(preference_value)
record.importance_score = 0.9
record.accessed_at = time.time()
await self.store_memory(record)
return
# Create new preference
record = UserMemoryRecord(
user_id=user_id,
memory_type="preference",
content=str(preference_value),
metadata={"preference_key": preference_key},
importance_score=0.9,
)
await self.store_memory(record)
async def build_user_context(self, user_id: str) -> str:
"""
Build a context string from the user's long-term memory
to inject into agent system prompts for personalization.
"""
preferences = await self.recall_memories(
user_id=user_id,
memory_type="preference",
limit=10,
)
summaries = await self.recall_memories(
user_id=user_id,
memory_type="interaction_summary",
limit=5,
min_importance=0.5,
)
context_parts = []
if preferences:
pref_lines = []
for p in preferences:
key = p.metadata.get("preference_key", "unknown")
pref_lines.append(f" - {key}: {p.content}")
context_parts.append("User Preferences:\n" + "\n".join(pref_lines))
if summaries:
summary_lines = []
for s in summaries:
topics = ", ".join(s.metadata.get("topics", []))
summary_lines.append(f" - Topics: {topics} | {s.content}")
context_parts.append(
"Recent Interactions:\n" + "\n".join(summary_lines)
)
return "\n\n".join(context_parts) if context_parts else ""
4. State Management Patterns
4.1 Conversation State Tracking
"""
MangaAssist conversation state management.
Tracks active intents, slot filling, and multi-turn dialogue state.
"""
import time
from dataclasses import dataclass, field
from enum import Enum
from typing import Any
class ConversationPhase(str, Enum):
GREETING = "greeting"
INTENT_GATHERING = "intent_gathering"
SLOT_FILLING = "slot_filling"
TOOL_EXECUTION = "tool_execution"
RESPONSE_DELIVERY = "response_delivery"
FOLLOW_UP = "follow_up"
FAREWELL = "farewell"
class SlotStatus(str, Enum):
EMPTY = "empty"
PARTIALLY_FILLED = "partially_filled"
CONFIRMED = "confirmed"
@dataclass
class DialogueSlot:
"""A slot in a slot-filling dialogue (e.g., genre, price range)."""
name: str
value: Any = None
status: SlotStatus = SlotStatus.EMPTY
required: bool = False
prompt: str = "" # Question to ask if slot is empty
@dataclass
class ConversationState:
"""
Full conversation state for a MangaAssist session.
Serialized to Redis after every turn.
"""
session_id: str
user_id: str
phase: ConversationPhase = ConversationPhase.GREETING
active_intent: str = ""
active_agent: str = ""
slots: dict[str, DialogueSlot] = field(default_factory=dict)
turn_count: int = 0
last_activity: float = field(default_factory=time.time)
pending_clarification: str = ""
context_flags: dict[str, bool] = field(default_factory=dict)
def advance_phase(self, new_phase: ConversationPhase) -> None:
"""Transition to a new conversation phase."""
self.phase = new_phase
self.last_activity = time.time()
def set_intent(self, intent: str, agent: str) -> None:
"""Set the active intent and responsible agent."""
self.active_intent = intent
self.active_agent = agent
self.phase = ConversationPhase.INTENT_GATHERING
def add_slot(self, slot: DialogueSlot) -> None:
"""Register a slot for the current intent."""
self.slots[slot.name] = slot
def fill_slot(self, name: str, value: Any) -> None:
"""Fill a slot with a value."""
if name in self.slots:
self.slots[name].value = value
self.slots[name].status = SlotStatus.CONFIRMED
def get_missing_slots(self) -> list[DialogueSlot]:
"""Return all required slots that are not yet confirmed."""
return [
slot for slot in self.slots.values()
if slot.required and slot.status != SlotStatus.CONFIRMED
]
def all_slots_filled(self) -> bool:
"""Check if all required slots are filled."""
return len(self.get_missing_slots()) == 0
def increment_turn(self) -> None:
"""Advance the turn counter."""
self.turn_count += 1
self.last_activity = time.time()
def is_stale(self, timeout_seconds: int = 1800) -> bool:
"""Check if the conversation has been idle too long."""
return (time.time() - self.last_activity) > timeout_seconds
def to_context_string(self) -> str:
"""Serialize state to a human-readable context string for the agent."""
lines = [
f"Phase: {self.phase.value}",
f"Intent: {self.active_intent or 'none'}",
f"Turn: {self.turn_count}",
]
filled_slots = {
name: slot.value
for name, slot in self.slots.items()
if slot.status == SlotStatus.CONFIRMED
}
if filled_slots:
lines.append(f"Confirmed slots: {filled_slots}")
missing = self.get_missing_slots()
if missing:
lines.append(
f"Missing required slots: {[s.name for s in missing]}"
)
return " | ".join(lines)
4.2 Task Execution State
"""
MangaAssist task execution state — tracks multi-step tool execution
within a single agent turn.
"""
import time
import uuid
from dataclasses import dataclass, field
from enum import Enum
from typing import Any
class TaskStepStatus(str, Enum):
PENDING = "pending"
RUNNING = "running"
COMPLETED = "completed"
FAILED = "failed"
SKIPPED = "skipped"
RETRYING = "retrying"
@dataclass
class TaskStep:
"""A single step in a multi-step task execution."""
step_id: str = field(default_factory=lambda: str(uuid.uuid4())[:8])
tool_name: str = ""
parameters: dict[str, Any] = field(default_factory=dict)
status: TaskStepStatus = TaskStepStatus.PENDING
result: Any = None
error: str = ""
started_at: float = 0.0
completed_at: float = 0.0
retry_count: int = 0
max_retries: int = 2
@property
def duration_ms(self) -> float:
if self.started_at and self.completed_at:
return (self.completed_at - self.started_at) * 1000
return 0.0
def can_retry(self) -> bool:
return self.retry_count < self.max_retries
@dataclass
class TaskExecutionState:
"""
Tracks the execution state of a multi-step task within an agent turn.
For example: "Find manga similar to Naruto" might involve:
1. Search for Naruto to get its metadata
2. Extract genre and themes
3. Search for similar titles
4. Rank and format results
"""
task_id: str = field(default_factory=lambda: str(uuid.uuid4()))
description: str = ""
steps: list[TaskStep] = field(default_factory=list)
current_step_index: int = 0
started_at: float = field(default_factory=time.time)
completed_at: float = 0.0
total_budget_ms: float = 3000.0 # 3-second SLA
def add_step(self, tool_name: str, parameters: dict) -> TaskStep:
step = TaskStep(tool_name=tool_name, parameters=parameters)
self.steps.append(step)
return step
def current_step(self) -> TaskStep | None:
if self.current_step_index < len(self.steps):
return self.steps[self.current_step_index]
return None
def advance(self) -> TaskStep | None:
self.current_step_index += 1
return self.current_step()
def remaining_budget_ms(self) -> float:
elapsed = (time.time() - self.started_at) * 1000
return max(0, self.total_budget_ms - elapsed)
def is_over_budget(self) -> bool:
return self.remaining_budget_ms() <= 0
def summary(self) -> dict[str, Any]:
return {
"task_id": self.task_id,
"description": self.description,
"total_steps": len(self.steps),
"completed_steps": sum(
1 for s in self.steps
if s.status == TaskStepStatus.COMPLETED
),
"failed_steps": sum(
1 for s in self.steps
if s.status == TaskStepStatus.FAILED
),
"remaining_budget_ms": self.remaining_budget_ms(),
}
4.3 Environment Context
"""
MangaAssist environment context — external state the agent reads
but does not directly control (user profile, cart, locale, etc.).
"""
import time
from dataclasses import dataclass, field
from typing import Any
@dataclass
class UserProfile:
"""User profile loaded from DynamoDB at session start."""
user_id: str
display_name: str = "Guest"
email: str = ""
locale: str = "en-US"
preferred_language: str = "en"
tier: str = "standard" # standard | premium | vip
favorite_genres: list[str] = field(default_factory=list)
reading_history: list[str] = field(default_factory=list)
created_at: str = ""
@dataclass
class CartState:
"""Current shopping cart state from the e-commerce backend."""
items: list[dict[str, Any]] = field(default_factory=list)
total_usd: float = 0.0
item_count: int = 0
@dataclass
class EnvironmentContext:
"""
Immutable (per-turn) snapshot of the environment that agents can
read to make contextual decisions.
"""
user_profile: UserProfile = field(default_factory=UserProfile)
cart: CartState = field(default_factory=CartState)
current_time_utc: float = field(default_factory=time.time)
request_locale: str = "en-US"
is_business_hours_jst: bool = True
active_promotions: list[str] = field(default_factory=list)
def to_agent_context_string(self) -> str:
"""
Format environment context for injection into the agent's
system prompt or user message prefix.
"""
lines = [
f"Customer: {self.user_profile.display_name} "
f"({self.user_profile.tier} tier)",
f"Language: {self.user_profile.preferred_language}",
f"Locale: {self.request_locale}",
]
if self.user_profile.favorite_genres:
lines.append(
f"Favorite genres: {', '.join(self.user_profile.favorite_genres)}"
)
if self.cart.item_count > 0:
lines.append(
f"Cart: {self.cart.item_count} items, "
f"${self.cart.total_usd:.2f}"
)
if self.active_promotions:
lines.append(
f"Active promotions: {', '.join(self.active_promotions)}"
)
if not self.is_business_hours_jst:
lines.append("Note: Outside JP business hours — escalations delayed")
return "\n".join(lines)
5. Comparison — Strands Agents vs AWS Agent Squad vs Custom Orchestration
| Dimension | Strands Agents | AWS Agent Squad | Custom Orchestration |
|---|---|---|---|
| Scope | Single agent with tools, memory, and ReAct loop | Multi-agent supervisor routing to specialized agents | Hand-coded routing, state machine, and tool dispatch |
| Best For | Single-domain tasks (one agent handles one intent) | Multi-domain chatbots with distinct capability areas | Unique routing logic, non-standard patterns |
| Routing | N/A (single agent) | Classifier-based intent routing via LLM or rules | Developer-defined (regex, ML model, rules engine) |
| Memory | Built-in turn history with sliding window | DynamoDB-backed shared conversation store | Developer-managed (Redis, DynamoDB, or any store) |
| Tool Integration | @tool decorator, MCP support |
Agent-level tool binding | Direct API calls or MCP client library |
| Streaming | Native streaming support | Per-agent streaming configuration | Developer-implemented via WebSocket or SSE |
| Cost at 1M msgs/day | Depends on model choice per agent | +$50/day for Haiku classifier overhead | No framework overhead, but more engineering time |
| Observability | Built-in logging of reasoning steps | Built-in execution time and routing logs | Custom CloudWatch metrics and X-Ray tracing |
| Complexity | Low — single file per agent | Medium — orchestrator + sub-agents + classifier | High — everything is hand-built |
| MangaAssist Usage | Each sub-agent is a Strands Agent | Orchestrator wrapping all four sub-agents | Fallback router and SLA enforcement logic |
| Vendor Lock-in | Open-source, Bedrock-optimized | AWS-specific, uses Bedrock classifier | None (but higher maintenance) |
| Time to Production | Days | 1-2 weeks | 4-8 weeks |
6. Key Takeaways
-
Strands Agents power each sub-agent in MangaAssist — the
@tooldecorator, system prompt, and model selection create a self-contained reasoning unit. Use Sonnet ($3/$15) for complex reasoning (product search, recommendations) and Haiku ($0.25/$1.25) for simple lookups (order status, Q&A). -
AWS Agent Squad provides the supervisory layer — a Haiku-based classifier routes each user message to the right sub-agent at ~$50/day overhead for 1M messages. The classifier uses agent descriptions (not code) to make routing decisions, so adding a new agent requires zero code changes to the router.
-
Two-tier memory is essential at scale — ElastiCache Redis handles sub-millisecond session memory with 30-minute TTL and token-budget trimming, while DynamoDB stores long-term user preferences and interaction summaries for personalization across sessions. Neither alone is sufficient.
-
State management has three distinct layers — conversation state (intent, slots, phase), task execution state (tool call queue, results, budget), and environment context (user profile, cart, locale). Each layer is read by different parts of the system and updated at different frequencies.
-
Fast-route patterns bypass the classifier — for obvious intents (order ID mentioned, explicit "search" keyword), pattern matching routes instantly without an LLM call, saving ~50ms latency and ~$25/day in classifier costs.
-
The 3-second SLA drives architectural decisions — task execution state tracks remaining time budget, agents are configured with
max_tokenslimits to prevent runaway generation, and the routing layer has confidence-threshold fallbacks to avoid re-classification loops.
Next file: 02-mcp-agent-tool-interactions.md — Deep dive into MCP protocol for standardized agent-tool interactions in MangaAssist.