LOCAL PREVIEW View on GitHub

Autonomous Agent Architecture for Intelligent Systems

MangaAssist context: JP Manga store chatbot on AWS — Bedrock Claude 3 (Sonnet at $3/$15 per 1M tokens input/output, Haiku at $0.25/$1.25), OpenSearch Serverless (vector store), DynamoDB (sessions/products), ECS Fargate (orchestrator), API Gateway WebSocket, ElastiCache Redis. Target: useful answer in under 3 seconds, 1M messages/day scale.


Skill Mapping

Dimension Detail
Certification AWS AIP-C01 — AI Practitioner
Domain 2 — Development and Implementation of GenAI Solutions
Task 2.1 — Develop agentic AI solutions using AWS services
Skill 2.1.1 — Develop intelligent autonomous systems with appropriate memory and state management capabilities (for example, by using Strands Agents and AWS Agent Squad for multi-agent systems, MCP for agent-tool interactions)
This File Autonomous agent architecture: Strands Agents, AWS Agent Squad multi-agent orchestration, memory patterns, and state management

Skill Scope Statement

This skill requires the practitioner to architect and implement intelligent autonomous systems that can reason, plan, and execute multi-step tasks. The core capabilities include: configuring Strands Agents with tools, memory, and system prompts; orchestrating multi-agent systems with AWS Agent Squad for specialized task delegation; implementing short-term memory (Redis) and long-term memory (DynamoDB) patterns; managing conversation state, task execution state, and environment context; and integrating the MCP protocol for standardized agent-tool interactions. MangaAssist demonstrates every pattern at 1M messages/day scale.


Mind Map — Autonomous Agent Architecture

mindmap
  root((Skill 2.1.1<br/>Autonomous Agents))
    Strands Agents Architecture
      Agent Definition
        System Prompt Configuration
        Tool Binding
        Model Selection (Sonnet vs Haiku)
      Agent Lifecycle
        Initialize → Plan → Act → Observe → Reflect
        ReAct Loop Execution
        Tool Call Dispatch
      Built-in Capabilities
        Structured Output Parsing
        Streaming Response Support
        Conversation History Management
    AWS Agent Squad
      Multi-Agent Coordination
        Orchestrator Agent (Supervisor)
        Specialized Sub-Agents
        Agent Registry and Discovery
      Routing Strategies
        Classifier-Based Routing
        Intent-Based Dispatch
        Confidence-Threshold Fallback
      Sub-Agent Definitions
        ProductSearchAgent
        OrderStatusAgent
        RecommendationAgent
        MangaQAAgent
    Agent Memory Patterns
      Short-Term Session Memory
        ElastiCache Redis
        TTL-Based Expiration (30 min)
        Sliding Window Context
        Token-Budget Trimming
      Long-Term Persistent Memory
        DynamoDB Sessions Table
        User Preference History
        Interaction Summaries
        Cross-Session Continuity
      Memory Retrieval
        Recency-Weighted Recall
        Semantic Similarity Search
        Importance-Scored Filtering
    State Management
      Conversation State
        Turn History
        Active Intent Tracking
        Slot Filling Progress
      Task Execution State
        Step Queue and Progress
        Tool Call Results Cache
        Retry and Rollback State
      Environment Context
        User Profile and Preferences
        Cart and Session Metadata
        Time-of-Day and Locale
    MCP Protocol
      Tool Definitions (JSON Schema)
      Parameter Validation
      Response Envelope Format
      Error Propagation
      Discovery Endpoint

Architecture Flowchart — MangaAssist Agent Request Lifecycle

flowchart TB
    subgraph Client["Client Layer"]
        USER["User via WebSocket"]
    end

    subgraph Gateway["API Gateway"]
        WS["WebSocket API<br/>$connect / $default / $disconnect"]
    end

    subgraph Orchestrator["ECS Fargate — Agent Orchestrator"]
        RECV["Message Receiver"]
        SQUAD["AWS Agent Squad<br/>Supervisor Agent"]
        ROUTER["Intent Classifier<br/>(Haiku — $0.25/1M)"]

        subgraph SubAgents["Specialized Sub-Agents"]
            PSA["ProductSearchAgent<br/>Strands Agent"]
            OSA["OrderStatusAgent<br/>Strands Agent"]
            RCA["RecommendationAgent<br/>Strands Agent"]
            MQA["MangaQAAgent<br/>Strands Agent"]
        end

        MERGE["Response Merger"]
    end

    subgraph Memory["Memory Layer"]
        REDIS["ElastiCache Redis<br/>Session Memory<br/>TTL 30 min"]
        DDB["DynamoDB<br/>manga_sessions table<br/>Long-Term Memory"]
    end

    subgraph Tools["Tool Layer (MCP)"]
        SEARCH_TOOL["OpenSearch Serverless<br/>Vector Search Tool"]
        ORDER_TOOL["DynamoDB<br/>Order Lookup Tool"]
        RECO_TOOL["Bedrock Sonnet<br/>Recommendation Tool"]
        KB_TOOL["Bedrock KB<br/>Manga Knowledge Tool"]
    end

    subgraph Models["Foundation Models"]
        SONNET["Claude 3 Sonnet<br/>Complex Reasoning<br/>$3 / $15 per 1M"]
        HAIKU["Claude 3 Haiku<br/>Classification & Simple<br/>$0.25 / $1.25 per 1M"]
    end

    USER -->|WebSocket message| WS
    WS -->|Lambda authorizer + route| RECV
    RECV -->|Load session state| REDIS
    RECV -->|Load user profile| DDB
    RECV -->|User query + context| SQUAD
    SQUAD -->|Classify intent| ROUTER
    ROUTER -->|product_search| PSA
    ROUTER -->|order_status| OSA
    ROUTER -->|recommendation| RCA
    ROUTER -->|manga_knowledge| MQA
    PSA -->|MCP tool call| SEARCH_TOOL
    OSA -->|MCP tool call| ORDER_TOOL
    RCA -->|MCP tool call| RECO_TOOL
    MQA -->|MCP tool call| KB_TOOL
    PSA -->|Reasoning| SONNET
    OSA -->|Simple lookup| HAIKU
    RCA -->|Reasoning| SONNET
    MQA -->|Simple Q&A| HAIKU
    PSA --> MERGE
    OSA --> MERGE
    RCA --> MERGE
    MQA --> MERGE
    MERGE -->|Save turn| REDIS
    MERGE -->|Persist summary| DDB
    MERGE -->|Stream response| WS
    WS -->|WebSocket frame| USER

1. Strands Agents — Core Agent Configuration for MangaAssist

1.1 What Is Strands Agents?

Strands Agents is an open-source Python SDK from AWS that implements the ReAct (Reason + Act) loop for building autonomous agents. Each agent has a system prompt, a set of tools, a memory buffer, and a model provider. The agent iterates: it reasons about the user query, decides which tool to call, observes the result, and repeats until it can produce a final answer.

For MangaAssist, each sub-agent is a Strands Agent with domain-specific tools and prompts.

1.2 Agent Definition — ProductSearchAgent

"""
MangaAssist ProductSearchAgent — Strands Agent configuration.
Handles all product discovery, search, and catalog queries.
"""

import json
import logging
from typing import Any

from strands import Agent
from strands.models.bedrock import BedrockModel
from strands.tools import tool

logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)

# ---------------------------------------------------------------------------
# Tool Definitions (exposed via MCP to the agent)
# ---------------------------------------------------------------------------

@tool
def search_manga_catalog(
    query: str,
    genre: str = "",
    author: str = "",
    max_results: int = 5,
) -> dict[str, Any]:
    """
    Search the MangaAssist product catalog using OpenSearch Serverless
    vector search. Returns ranked manga titles with metadata.

    Args:
        query: Natural-language search query from the user.
        genre: Optional genre filter (e.g., 'shonen', 'seinen', 'shojo').
        author: Optional author name filter.
        max_results: Maximum number of results to return (1-20).

    Returns:
        dict with 'results' list containing manga items and 'total_hits' count.
    """
    import boto3
    from opensearchpy import OpenSearch, RequestsHttpConnection
    from requests_aws4auth import AWS4Auth

    # Build the embedding for semantic search
    bedrock_runtime = boto3.client("bedrock-runtime", region_name="us-east-1")
    embedding_response = bedrock_runtime.invoke_model(
        modelId="amazon.titan-embed-text-v2:0",
        contentType="application/json",
        accept="application/json",
        body=json.dumps({"inputText": query}),
    )
    embedding = json.loads(embedding_response["body"].read())["embedding"]

    # Connect to OpenSearch Serverless
    credentials = boto3.Session().get_credentials()
    aws_auth = AWS4Auth(
        credentials.access_key,
        credentials.secret_key,
        "us-east-1",
        "aoss",
        session_token=credentials.token,
    )

    client = OpenSearch(
        hosts=[{"host": "manga-search.us-east-1.aoss.amazonaws.com", "port": 443}],
        http_auth=aws_auth,
        use_ssl=True,
        connection_class=RequestsHttpConnection,
    )

    # Build the hybrid query: kNN vector + keyword filters
    must_clauses = []
    if genre:
        must_clauses.append({"term": {"genre.keyword": genre.lower()}})
    if author:
        must_clauses.append({"match": {"author": author}})

    search_body = {
        "size": min(max_results, 20),
        "query": {
            "bool": {
                "must": must_clauses,
                "should": [
                    {
                        "knn": {
                            "title_embedding": {
                                "vector": embedding,
                                "k": max_results,
                            }
                        }
                    }
                ],
            }
        },
        "_source": [
            "title", "author", "genre", "price", "volumes",
            "rating", "synopsis", "cover_url", "in_stock",
        ],
    }

    response = client.search(index="manga-catalog", body=search_body)
    hits = response.get("hits", {}).get("hits", [])

    results = []
    for hit in hits:
        source = hit["_source"]
        source["relevance_score"] = hit.get("_score", 0.0)
        results.append(source)

    return {
        "results": results,
        "total_hits": response["hits"]["total"]["value"],
        "query_used": query,
    }


@tool
def get_manga_details(title_id: str) -> dict[str, Any]:
    """
    Retrieve full details for a specific manga title from DynamoDB.

    Args:
        title_id: The unique identifier for the manga title.

    Returns:
        dict with complete manga metadata including price, availability,
        volumes, author biography, and related titles.
    """
    import boto3
    from boto3.dynamodb.conditions import Key

    table = boto3.resource("dynamodb", region_name="us-east-1").Table(
        "manga_products"
    )
    response = table.get_item(Key={"title_id": title_id})

    if "Item" not in response:
        return {"error": f"Manga title '{title_id}' not found", "found": False}

    item = response["Item"]
    # Convert Decimal types for JSON serialization
    return {
        "found": True,
        "title": item["title"],
        "author": item["author"],
        "genre": item["genre"],
        "price_usd": float(item["price_usd"]),
        "volumes_available": int(item["volumes_available"]),
        "total_volumes": int(item["total_volumes"]),
        "rating": float(item["rating"]),
        "synopsis": item["synopsis"],
        "publisher": item["publisher"],
        "in_stock": item["in_stock"],
        "related_titles": item.get("related_titles", []),
    }


# ---------------------------------------------------------------------------
# Agent Configuration
# ---------------------------------------------------------------------------

PRODUCT_SEARCH_SYSTEM_PROMPT = """You are the ProductSearchAgent for MangaAssist,
a Japanese manga store chatbot. Your role is to help customers find manga titles
they are looking for.

## Your Capabilities
- Search the manga catalog by title, genre, author, or description
- Retrieve detailed information about specific manga titles
- Provide availability and pricing information
- Suggest alternatives when a title is out of stock

## Behavioral Rules
1. Always search before answering — never guess about catalog availability.
2. Present results in a concise, scannable format.
3. Include price (USD) and stock status in every product mention.
4. If the user asks in Japanese, respond in Japanese.
5. Limit tool calls to 3 per turn to stay within the 3-second budget.
6. For ambiguous queries, ask one clarifying question rather than guessing.

## Response Format
Use markdown with bullet points. Include title, author, price, and stock status.
"""

# Model: use Sonnet for complex reasoning about product recommendations
sonnet_model = BedrockModel(
    model_id="anthropic.claude-3-sonnet-20240229-v1:0",
    region_name="us-east-1",
    temperature=0.3,
    max_tokens=1024,
)

product_search_agent = Agent(
    model=sonnet_model,
    system_prompt=PRODUCT_SEARCH_SYSTEM_PROMPT,
    tools=[search_manga_catalog, get_manga_details],
    # Memory: last 10 turns kept in context (short-term)
    max_turns=10,
)


def handle_product_query(user_message: str, session_context: dict) -> str:
    """
    Entry point for product search queries. Injects session context
    and invokes the Strands Agent.
    """
    # Prepend session context so the agent knows the user's history
    context_prefix = ""
    if session_context.get("recent_searches"):
        recent = ", ".join(session_context["recent_searches"][-3:])
        context_prefix = f"[User recently searched for: {recent}]\n\n"

    if session_context.get("preferred_genres"):
        genres = ", ".join(session_context["preferred_genres"])
        context_prefix += f"[User prefers genres: {genres}]\n\n"

    augmented_message = context_prefix + user_message

    result = product_search_agent(augmented_message)
    return str(result)

1.3 Agent Definition — OrderStatusAgent

"""
MangaAssist OrderStatusAgent — lightweight Strands Agent using Haiku
for fast, inexpensive order lookups.
"""

from strands import Agent
from strands.models.bedrock import BedrockModel
from strands.tools import tool

from typing import Any


@tool
def lookup_order(order_id: str) -> dict[str, Any]:
    """
    Look up an order by ID from the DynamoDB orders table.

    Args:
        order_id: The customer's order ID (format: MNG-XXXXXXXX).

    Returns:
        dict with order status, items, shipping info, and estimated delivery.
    """
    import boto3

    table = boto3.resource("dynamodb", region_name="us-east-1").Table(
        "manga_orders"
    )
    response = table.get_item(Key={"order_id": order_id})

    if "Item" not in response:
        return {"found": False, "error": f"Order {order_id} not found"}

    item = response["Item"]
    return {
        "found": True,
        "order_id": item["order_id"],
        "status": item["status"],
        "items": item["items"],
        "total_usd": float(item["total_usd"]),
        "shipping_method": item["shipping_method"],
        "tracking_number": item.get("tracking_number", "Not yet assigned"),
        "estimated_delivery": item.get("estimated_delivery", "Pending"),
        "placed_at": item["placed_at"],
    }


@tool
def list_recent_orders(user_id: str, limit: int = 5) -> dict[str, Any]:
    """
    List a user's recent orders.

    Args:
        user_id: The authenticated user's ID.
        limit: Number of recent orders to retrieve (1-10).

    Returns:
        dict with list of recent orders, each with order_id, status, date, total.
    """
    import boto3
    from boto3.dynamodb.conditions import Key

    table = boto3.resource("dynamodb", region_name="us-east-1").Table(
        "manga_orders"
    )
    response = table.query(
        IndexName="user_id-placed_at-index",
        KeyConditionExpression=Key("user_id").eq(user_id),
        ScanIndexForward=False,  # Most recent first
        Limit=min(limit, 10),
    )

    orders = []
    for item in response.get("Items", []):
        orders.append({
            "order_id": item["order_id"],
            "status": item["status"],
            "total_usd": float(item["total_usd"]),
            "placed_at": item["placed_at"],
            "item_count": len(item["items"]),
        })

    return {"orders": orders, "count": len(orders)}


ORDER_STATUS_SYSTEM_PROMPT = """You are the OrderStatusAgent for MangaAssist.
You help customers check their order status, track shipments, and view order history.

## Rules
1. Always verify the order ID format (MNG-XXXXXXXX) before lookup.
2. If the user doesn't provide an order ID, list their recent orders first.
3. Be empathetic if an order is delayed — offer to escalate.
4. Never reveal internal system details or database schemas.
5. Keep responses brief — order status should be 2-3 sentences max.
"""

# Use Haiku for order lookups — simple retrieval, no complex reasoning needed
haiku_model = BedrockModel(
    model_id="anthropic.claude-3-haiku-20240307-v1:0",
    region_name="us-east-1",
    temperature=0.1,
    max_tokens=512,
)

order_status_agent = Agent(
    model=haiku_model,
    system_prompt=ORDER_STATUS_SYSTEM_PROMPT,
    tools=[lookup_order, list_recent_orders],
    max_turns=5,
)

2. AWS Agent Squad — Multi-Agent Orchestration

2.1 What Is AWS Agent Squad?

AWS Agent Squad (formerly Multi-Agent Orchestrator) is an AWS framework for coordinating multiple specialized agents under a single supervisor. The supervisor classifies user intents, routes to the right sub-agent, and merges responses. It supports:

  • Classifier-based routing: An LLM (Haiku) classifies the intent and picks the best agent.
  • Agent registry: Sub-agents are registered with descriptions, and the classifier uses those descriptions to route.
  • Conversation memory: Shared memory across agents so context is not lost during hand-offs.

2.2 MangaAssist Agent Squad Configuration

"""
MangaAssist AWS Agent Squad — Supervisor orchestrating four specialized agents.
Production configuration for ECS Fargate deployment.
"""

import logging
import time
from dataclasses import dataclass, field
from typing import Any

from multi_agent_orchestrator.orchestrator import MultiAgentOrchestrator
from multi_agent_orchestrator.classifiers import BedrockClassifier
from multi_agent_orchestrator.agents import (
    BedrockLLMAgent,
    BedrockLLMAgentOptions,
    AgentResponse,
)
from multi_agent_orchestrator.storage import DynamoDbChatStorage

logger = logging.getLogger(__name__)

# ---------------------------------------------------------------------------
# Shared Memory Store (DynamoDB-backed for cross-session persistence)
# ---------------------------------------------------------------------------

chat_storage = DynamoDbChatStorage(
    table_name="manga_agent_conversations",
    region="us-east-1",
    ttl_key="expires_at",
    ttl_duration=86400,  # 24-hour conversation retention
)

# ---------------------------------------------------------------------------
# Intent Classifier — Haiku for cost-efficiency at 1M msgs/day
# ---------------------------------------------------------------------------
# At 1M messages/day, classifier cost:
#   Input: ~100 tokens/msg * 1M = 100M tokens * $0.25/1M = $25/day
#   Output: ~20 tokens/msg * 1M = 20M tokens * $1.25/1M = $25/day
#   Total classifier cost: ~$50/day
# ---------------------------------------------------------------------------

classifier = BedrockClassifier(
    model_id="anthropic.claude-3-haiku-20240307-v1:0",
    region="us-east-1",
    inference_config={
        "temperature": 0.0,  # Deterministic classification
        "maxTokens": 100,
    },
)

# ---------------------------------------------------------------------------
# Sub-Agent Definitions
# ---------------------------------------------------------------------------

product_search_agent_config = BedrockLLMAgent(
    BedrockLLMAgentOptions(
        name="ProductSearchAgent",
        description=(
            "Handles all product discovery queries: searching manga titles, "
            "browsing by genre or author, checking prices and availability, "
            "finding specific manga volumes. Use for any question about "
            "what manga is available or product details."
        ),
        model_id="anthropic.claude-3-sonnet-20240229-v1:0",
        region="us-east-1",
        inference_config={
            "temperature": 0.3,
            "maxTokens": 1024,
        },
        streaming=True,
        save_chat=True,
    )
)

order_status_agent_config = BedrockLLMAgent(
    BedrockLLMAgentOptions(
        name="OrderStatusAgent",
        description=(
            "Handles order-related queries: checking order status, tracking "
            "shipments, viewing order history, reporting delivery issues. "
            "Use for any question about existing or past orders."
        ),
        model_id="anthropic.claude-3-haiku-20240307-v1:0",
        region="us-east-1",
        inference_config={
            "temperature": 0.1,
            "maxTokens": 512,
        },
        streaming=False,  # Order lookups are fast, no need to stream
        save_chat=True,
    )
)

recommendation_agent_config = BedrockLLMAgent(
    BedrockLLMAgentOptions(
        name="RecommendationAgent",
        description=(
            "Provides personalized manga recommendations based on user "
            "preferences, reading history, and similar titles. Use when "
            "the user asks for suggestions, 'what should I read next', "
            "or wants similar manga to something they liked."
        ),
        model_id="anthropic.claude-3-sonnet-20240229-v1:0",
        region="us-east-1",
        inference_config={
            "temperature": 0.7,  # Higher creativity for recommendations
            "maxTokens": 1024,
        },
        streaming=True,
        save_chat=True,
    )
)

manga_qa_agent_config = BedrockLLMAgent(
    BedrockLLMAgentOptions(
        name="MangaQAAgent",
        description=(
            "Answers general knowledge questions about manga: plot summaries, "
            "character information, author backgrounds, genre explanations, "
            "manga vs anime differences. Use for informational questions "
            "that don't involve searching the product catalog."
        ),
        model_id="anthropic.claude-3-haiku-20240307-v1:0",
        region="us-east-1",
        inference_config={
            "temperature": 0.3,
            "maxTokens": 768,
        },
        streaming=True,
        save_chat=True,
    )
)

# ---------------------------------------------------------------------------
# Orchestrator Assembly
# ---------------------------------------------------------------------------

def build_manga_orchestrator() -> MultiAgentOrchestrator:
    """
    Build and return the fully configured MangaAssist Agent Squad orchestrator.
    Call this once at ECS task startup and reuse across requests.
    """
    orchestrator = MultiAgentOrchestrator(
        classifier=classifier,
        storage=chat_storage,
        options={
            "LOG_AGENT_CHAT": True,
            "LOG_CLASSIFIER_CHAT": True,
            "LOG_CLASSIFIER_RAW_OUTPUT": False,
            "LOG_CLASSIFIER_OUTPUT": True,
            "LOG_EXECUTION_TIMES": True,
            "MAX_MESSAGE_PAIRS_PER_AGENT": 10,
        },
    )

    orchestrator.add_agent(product_search_agent_config)
    orchestrator.add_agent(order_status_agent_config)
    orchestrator.add_agent(recommendation_agent_config)
    orchestrator.add_agent(manga_qa_agent_config)

    logger.info("MangaAssist Agent Squad initialized with 4 sub-agents")
    return orchestrator


# ---------------------------------------------------------------------------
# Request Handler — called per WebSocket message
# ---------------------------------------------------------------------------

@dataclass
class AgentRequestContext:
    """Context passed with every agent request."""
    user_id: str
    session_id: str
    locale: str = "en-US"
    timestamp: float = field(default_factory=time.time)
    user_tier: str = "standard"  # standard | premium | vip
    cart_items: list[str] = field(default_factory=list)


async def handle_user_message(
    orchestrator: MultiAgentOrchestrator,
    message: str,
    context: AgentRequestContext,
) -> AgentResponse:
    """
    Route a user message through the Agent Squad and return the response.

    The orchestrator:
    1. Classifies the intent using Haiku ($0.25/1M tokens)
    2. Routes to the best sub-agent
    3. Sub-agent reasons and calls tools as needed
    4. Returns the merged response
    """
    start_time = time.time()

    response = await orchestrator.route_request(
        user_input=message,
        user_id=context.user_id,
        session_id=context.session_id,
        additional_params={
            "locale": context.locale,
            "user_tier": context.user_tier,
            "cart_items": context.cart_items,
        },
    )

    elapsed_ms = (time.time() - start_time) * 1000
    logger.info(
        "Agent response in %.0f ms | agent=%s | user=%s | session=%s",
        elapsed_ms,
        response.metadata.get("agent_name", "unknown"),
        context.user_id,
        context.session_id,
    )

    # Alert if exceeding 3-second SLA
    if elapsed_ms > 3000:
        logger.warning(
            "SLA BREACH: Response took %.0f ms (limit: 3000 ms) | agent=%s",
            elapsed_ms,
            response.metadata.get("agent_name", "unknown"),
        )

    return response

2.3 Multi-Agent Routing Logic — How the Orchestrator Decides

The Agent Squad classifier uses Haiku to analyze the user message plus recent conversation history, then selects the best sub-agent. The routing decision is based on:

"""
MangaAssist intent classification and routing logic.
Demonstrates the internal classifier prompt and fallback strategy.
"""

from dataclasses import dataclass
from enum import Enum


class AgentIntent(str, Enum):
    PRODUCT_SEARCH = "product_search"
    ORDER_STATUS = "order_status"
    RECOMMENDATION = "recommendation"
    MANGA_QA = "manga_qa"
    AMBIGUOUS = "ambiguous"


@dataclass
class ClassificationResult:
    intent: AgentIntent
    confidence: float
    selected_agent: str
    reasoning: str


# The classifier prompt template used internally by Agent Squad
CLASSIFIER_PROMPT_TEMPLATE = """You are a routing classifier for MangaAssist,
a Japanese manga store chatbot. Given the user's message and conversation
history, determine which specialized agent should handle this request.

Available agents:
{agent_descriptions}

Conversation history:
{conversation_history}

User message: {user_message}

Respond with ONLY the agent name and confidence (0.0-1.0).
Format: AgentName|confidence
Example: ProductSearchAgent|0.95
"""


class MangaAssistRouter:
    """
    Custom routing logic that wraps Agent Squad's classifier with
    MangaAssist-specific overrides and fallback strategies.
    """

    # Keywords that bypass the classifier for instant routing
    FAST_ROUTE_PATTERNS = {
        AgentIntent.ORDER_STATUS: [
            "order", "tracking", "shipment", "delivery", "MNG-",
            "when will", "where is my",
        ],
        AgentIntent.PRODUCT_SEARCH: [
            "how much", "price", "in stock", "available", "buy",
            "volumes", "find", "search",
        ],
        AgentIntent.RECOMMENDATION: [
            "recommend", "suggest", "similar to", "like", "should I read",
            "what's good",
        ],
    }

    CONFIDENCE_THRESHOLD = 0.70  # Below this, use fallback strategy

    def __init__(self):
        self._routing_metrics = {
            intent.value: 0 for intent in AgentIntent
        }

    def fast_route(self, message: str) -> ClassificationResult | None:
        """
        Pattern-based fast routing for obvious intents.
        Avoids a classifier LLM call (~50ms savings + cost savings).
        """
        message_lower = message.lower()
        for intent, patterns in self.FAST_ROUTE_PATTERNS.items():
            for pattern in patterns:
                if pattern.lower() in message_lower:
                    self._routing_metrics[intent.value] += 1
                    return ClassificationResult(
                        intent=intent,
                        confidence=0.90,
                        selected_agent=self._intent_to_agent(intent),
                        reasoning=f"Fast-route match on pattern: '{pattern}'",
                    )
        return None

    def apply_fallback_strategy(
        self,
        classification: ClassificationResult,
        conversation_history: list[dict],
    ) -> ClassificationResult:
        """
        If the classifier confidence is below threshold, apply fallbacks:
        1. Use the same agent as the last turn (conversation continuity)
        2. Default to ProductSearchAgent (most common intent)
        """
        if classification.confidence >= self.CONFIDENCE_THRESHOLD:
            return classification

        # Fallback 1: Continue with the same agent as last turn
        if conversation_history:
            last_agent = conversation_history[-1].get("agent_name")
            if last_agent:
                return ClassificationResult(
                    intent=classification.intent,
                    confidence=0.75,
                    selected_agent=last_agent,
                    reasoning=(
                        f"Low confidence ({classification.confidence:.2f}), "
                        f"continuing with last agent: {last_agent}"
                    ),
                )

        # Fallback 2: Default to ProductSearchAgent
        return ClassificationResult(
            intent=AgentIntent.PRODUCT_SEARCH,
            confidence=0.60,
            selected_agent="ProductSearchAgent",
            reasoning="Low confidence, defaulting to ProductSearchAgent",
        )

    @staticmethod
    def _intent_to_agent(intent: AgentIntent) -> str:
        mapping = {
            AgentIntent.PRODUCT_SEARCH: "ProductSearchAgent",
            AgentIntent.ORDER_STATUS: "OrderStatusAgent",
            AgentIntent.RECOMMENDATION: "RecommendationAgent",
            AgentIntent.MANGA_QA: "MangaQAAgent",
        }
        return mapping.get(intent, "ProductSearchAgent")

3. Agent Memory Implementation

3.1 Short-Term Memory — ElastiCache Redis

Short-term memory holds the current conversation context. It must be fast (sub-millisecond reads), automatically expire (TTL), and support the token budget for the LLM context window.

"""
MangaAssist short-term session memory using ElastiCache Redis.
Handles turn history, sliding window, and token-budget trimming.
"""

import json
import hashlib
import logging
import time
from dataclasses import dataclass, field, asdict
from typing import Any

import redis
import tiktoken

logger = logging.getLogger(__name__)

# Token counting for Claude 3 context window management
# Claude 3 Sonnet context: 200K tokens, but we budget 4K for conversation history
ENCODER = tiktoken.encoding_for_model("gpt-4")  # Approximate for Claude
MAX_HISTORY_TOKENS = 4096
MAX_TURNS = 20
SESSION_TTL_SECONDS = 1800  # 30 minutes sliding window


@dataclass
class ConversationTurn:
    role: str  # "user" or "assistant"
    content: str
    timestamp: float
    agent_name: str = ""
    tool_calls: list[dict] = field(default_factory=list)
    token_count: int = 0

    def __post_init__(self):
        if self.token_count == 0:
            self.token_count = len(ENCODER.encode(self.content))


class RedisSessionMemory:
    """
    Short-term conversation memory backed by ElastiCache Redis.

    Design decisions:
    - Each session is a Redis list of JSON-serialized turns.
    - TTL is refreshed on every read/write (sliding window).
    - Token budget trimming removes oldest turns when context is too large.
    - Separate hash for session metadata (user_id, locale, cart, etc.).
    """

    def __init__(
        self,
        redis_host: str = "manga-redis.xxxxx.use1.cache.amazonaws.com",
        redis_port: int = 6379,
        redis_db: int = 0,
        use_ssl: bool = True,
    ):
        self._client = redis.Redis(
            host=redis_host,
            port=redis_port,
            db=redis_db,
            ssl=use_ssl,
            decode_responses=True,
            socket_connect_timeout=2,
            socket_timeout=1,
            retry_on_timeout=True,
        )
        logger.info("Redis session memory connected to %s:%d", redis_host, redis_port)

    def _session_key(self, session_id: str) -> str:
        return f"manga:session:{session_id}:turns"

    def _metadata_key(self, session_id: str) -> str:
        return f"manga:session:{session_id}:meta"

    async def add_turn(
        self, session_id: str, turn: ConversationTurn
    ) -> None:
        """Append a turn and enforce token budget."""
        key = self._session_key(session_id)
        self._client.rpush(key, json.dumps(asdict(turn)))
        self._client.expire(key, SESSION_TTL_SECONDS)  # Refresh TTL

        # Enforce max turns
        current_length = self._client.llen(key)
        if current_length > MAX_TURNS:
            # Remove oldest turns beyond the limit
            excess = current_length - MAX_TURNS
            for _ in range(excess):
                self._client.lpop(key)

        # Enforce token budget
        await self._trim_to_token_budget(session_id)

    async def get_history(self, session_id: str) -> list[ConversationTurn]:
        """Retrieve all turns for a session, refreshing TTL."""
        key = self._session_key(session_id)
        self._client.expire(key, SESSION_TTL_SECONDS)  # Sliding window

        raw_turns = self._client.lrange(key, 0, -1)
        turns = []
        for raw in raw_turns:
            data = json.loads(raw)
            turns.append(ConversationTurn(**data))
        return turns

    async def get_formatted_history(self, session_id: str) -> list[dict]:
        """Return history in the format expected by Claude's messages API."""
        turns = await self.get_history(session_id)
        return [
            {"role": turn.role, "content": turn.content}
            for turn in turns
        ]

    async def _trim_to_token_budget(self, session_id: str) -> None:
        """Remove oldest turns until total tokens fit within budget."""
        key = self._session_key(session_id)
        raw_turns = self._client.lrange(key, 0, -1)

        total_tokens = 0
        turns_data = []
        for raw in raw_turns:
            data = json.loads(raw)
            total_tokens += data.get("token_count", 0)
            turns_data.append(data)

        while total_tokens > MAX_HISTORY_TOKENS and len(turns_data) > 2:
            removed = turns_data.pop(0)
            total_tokens -= removed.get("token_count", 0)
            self._client.lpop(key)

        logger.debug(
            "Session %s: %d turns, %d tokens (budget: %d)",
            session_id, len(turns_data), total_tokens, MAX_HISTORY_TOKENS,
        )

    async def set_metadata(
        self, session_id: str, metadata: dict[str, Any]
    ) -> None:
        """Store session metadata (user profile, locale, cart, etc.)."""
        key = self._metadata_key(session_id)
        self._client.hset(key, mapping={
            k: json.dumps(v) if isinstance(v, (dict, list)) else str(v)
            for k, v in metadata.items()
        })
        self._client.expire(key, SESSION_TTL_SECONDS)

    async def get_metadata(self, session_id: str) -> dict[str, Any]:
        """Retrieve session metadata."""
        key = self._metadata_key(session_id)
        self._client.expire(key, SESSION_TTL_SECONDS)
        raw = self._client.hgetall(key)
        result = {}
        for k, v in raw.items():
            try:
                result[k] = json.loads(v)
            except (json.JSONDecodeError, TypeError):
                result[k] = v
        return result

    async def clear_session(self, session_id: str) -> None:
        """Delete all data for a session."""
        self._client.delete(
            self._session_key(session_id),
            self._metadata_key(session_id),
        )

3.2 Long-Term Memory — DynamoDB Persistent State

Long-term memory persists across sessions. It stores user preferences, interaction summaries, and historical context for personalization.

"""
MangaAssist long-term memory using DynamoDB.
Stores user preferences, interaction summaries, and cross-session context.
"""

import json
import logging
import time
import uuid
from dataclasses import dataclass, field
from decimal import Decimal
from typing import Any

import boto3
from boto3.dynamodb.conditions import Key, Attr

logger = logging.getLogger(__name__)


@dataclass
class UserMemoryRecord:
    """A single memory record for a user."""
    memory_id: str = field(default_factory=lambda: str(uuid.uuid4()))
    user_id: str = ""
    memory_type: str = ""  # "preference", "interaction_summary", "feedback"
    content: str = ""
    metadata: dict[str, Any] = field(default_factory=dict)
    importance_score: float = 0.5  # 0.0 - 1.0
    created_at: float = field(default_factory=time.time)
    accessed_at: float = field(default_factory=time.time)
    access_count: int = 0
    ttl: int = 0  # 0 = no expiry


class DynamoDBLongTermMemory:
    """
    Long-term user memory backed by DynamoDB.

    Table schema:
        PK: user_id (String)
        SK: memory_id (String)
        GSI1: user_id + memory_type (for filtered queries)
        GSI2: user_id + importance_score (for ranked retrieval)

    Capacity: On-demand (pay-per-request) for unpredictable access patterns.
    """

    def __init__(
        self,
        table_name: str = "manga_user_memory",
        region: str = "us-east-1",
    ):
        self._table = boto3.resource(
            "dynamodb", region_name=region
        ).Table(table_name)
        self._table_name = table_name
        logger.info("DynamoDB long-term memory connected: %s", table_name)

    async def store_memory(self, record: UserMemoryRecord) -> None:
        """Store or update a memory record."""
        item = {
            "user_id": record.user_id,
            "memory_id": record.memory_id,
            "memory_type": record.memory_type,
            "content": record.content,
            "metadata": json.loads(json.dumps(record.metadata, default=str)),
            "importance_score": Decimal(str(record.importance_score)),
            "created_at": Decimal(str(record.created_at)),
            "accessed_at": Decimal(str(record.accessed_at)),
            "access_count": record.access_count,
        }
        if record.ttl > 0:
            item["expires_at"] = int(time.time()) + record.ttl

        self._table.put_item(Item=item)
        logger.debug("Stored memory %s for user %s", record.memory_id, record.user_id)

    async def recall_memories(
        self,
        user_id: str,
        memory_type: str | None = None,
        limit: int = 10,
        min_importance: float = 0.0,
    ) -> list[UserMemoryRecord]:
        """
        Retrieve memories for a user, optionally filtered by type and importance.
        Results are ordered by importance_score descending.
        """
        if memory_type:
            response = self._table.query(
                IndexName="user_id-memory_type-index",
                KeyConditionExpression=(
                    Key("user_id").eq(user_id) &
                    Key("memory_type").eq(memory_type)
                ),
                FilterExpression=Attr("importance_score").gte(
                    Decimal(str(min_importance))
                ),
                Limit=limit,
            )
        else:
            response = self._table.query(
                KeyConditionExpression=Key("user_id").eq(user_id),
                FilterExpression=Attr("importance_score").gte(
                    Decimal(str(min_importance))
                ),
                ScanIndexForward=False,
                Limit=limit,
            )

        records = []
        for item in response.get("Items", []):
            record = UserMemoryRecord(
                memory_id=item["memory_id"],
                user_id=item["user_id"],
                memory_type=item["memory_type"],
                content=item["content"],
                metadata=item.get("metadata", {}),
                importance_score=float(item["importance_score"]),
                created_at=float(item["created_at"]),
                accessed_at=float(item["accessed_at"]),
                access_count=int(item.get("access_count", 0)),
            )
            records.append(record)

            # Update access tracking
            self._table.update_item(
                Key={"user_id": user_id, "memory_id": item["memory_id"]},
                UpdateExpression=(
                    "SET accessed_at = :now, access_count = access_count + :one"
                ),
                ExpressionAttributeValues={
                    ":now": Decimal(str(time.time())),
                    ":one": 1,
                },
            )

        # Sort by importance descending
        records.sort(key=lambda r: r.importance_score, reverse=True)
        return records[:limit]

    async def store_interaction_summary(
        self,
        user_id: str,
        session_id: str,
        summary: str,
        topics: list[str],
        sentiment: str,
    ) -> None:
        """
        Store a compressed summary of a conversation session.
        Called when a session ends or times out.
        """
        record = UserMemoryRecord(
            user_id=user_id,
            memory_type="interaction_summary",
            content=summary,
            metadata={
                "session_id": session_id,
                "topics": topics,
                "sentiment": sentiment,
            },
            importance_score=0.6,
            ttl=90 * 86400,  # 90 days retention
        )
        await self.store_memory(record)

    async def store_user_preference(
        self,
        user_id: str,
        preference_key: str,
        preference_value: Any,
    ) -> None:
        """
        Store or update a user preference (e.g., favorite genre, language).
        Preferences have high importance and no expiry.
        """
        # Check if preference already exists
        existing = await self.recall_memories(
            user_id=user_id,
            memory_type="preference",
            limit=50,
        )
        for record in existing:
            if record.metadata.get("preference_key") == preference_key:
                # Update existing preference
                record.content = str(preference_value)
                record.importance_score = 0.9
                record.accessed_at = time.time()
                await self.store_memory(record)
                return

        # Create new preference
        record = UserMemoryRecord(
            user_id=user_id,
            memory_type="preference",
            content=str(preference_value),
            metadata={"preference_key": preference_key},
            importance_score=0.9,
        )
        await self.store_memory(record)

    async def build_user_context(self, user_id: str) -> str:
        """
        Build a context string from the user's long-term memory
        to inject into agent system prompts for personalization.
        """
        preferences = await self.recall_memories(
            user_id=user_id,
            memory_type="preference",
            limit=10,
        )
        summaries = await self.recall_memories(
            user_id=user_id,
            memory_type="interaction_summary",
            limit=5,
            min_importance=0.5,
        )

        context_parts = []
        if preferences:
            pref_lines = []
            for p in preferences:
                key = p.metadata.get("preference_key", "unknown")
                pref_lines.append(f"  - {key}: {p.content}")
            context_parts.append("User Preferences:\n" + "\n".join(pref_lines))

        if summaries:
            summary_lines = []
            for s in summaries:
                topics = ", ".join(s.metadata.get("topics", []))
                summary_lines.append(f"  - Topics: {topics} | {s.content}")
            context_parts.append(
                "Recent Interactions:\n" + "\n".join(summary_lines)
            )

        return "\n\n".join(context_parts) if context_parts else ""

4. State Management Patterns

4.1 Conversation State Tracking

"""
MangaAssist conversation state management.
Tracks active intents, slot filling, and multi-turn dialogue state.
"""

import time
from dataclasses import dataclass, field
from enum import Enum
from typing import Any


class ConversationPhase(str, Enum):
    GREETING = "greeting"
    INTENT_GATHERING = "intent_gathering"
    SLOT_FILLING = "slot_filling"
    TOOL_EXECUTION = "tool_execution"
    RESPONSE_DELIVERY = "response_delivery"
    FOLLOW_UP = "follow_up"
    FAREWELL = "farewell"


class SlotStatus(str, Enum):
    EMPTY = "empty"
    PARTIALLY_FILLED = "partially_filled"
    CONFIRMED = "confirmed"


@dataclass
class DialogueSlot:
    """A slot in a slot-filling dialogue (e.g., genre, price range)."""
    name: str
    value: Any = None
    status: SlotStatus = SlotStatus.EMPTY
    required: bool = False
    prompt: str = ""  # Question to ask if slot is empty


@dataclass
class ConversationState:
    """
    Full conversation state for a MangaAssist session.
    Serialized to Redis after every turn.
    """
    session_id: str
    user_id: str
    phase: ConversationPhase = ConversationPhase.GREETING
    active_intent: str = ""
    active_agent: str = ""
    slots: dict[str, DialogueSlot] = field(default_factory=dict)
    turn_count: int = 0
    last_activity: float = field(default_factory=time.time)
    pending_clarification: str = ""
    context_flags: dict[str, bool] = field(default_factory=dict)

    def advance_phase(self, new_phase: ConversationPhase) -> None:
        """Transition to a new conversation phase."""
        self.phase = new_phase
        self.last_activity = time.time()

    def set_intent(self, intent: str, agent: str) -> None:
        """Set the active intent and responsible agent."""
        self.active_intent = intent
        self.active_agent = agent
        self.phase = ConversationPhase.INTENT_GATHERING

    def add_slot(self, slot: DialogueSlot) -> None:
        """Register a slot for the current intent."""
        self.slots[slot.name] = slot

    def fill_slot(self, name: str, value: Any) -> None:
        """Fill a slot with a value."""
        if name in self.slots:
            self.slots[name].value = value
            self.slots[name].status = SlotStatus.CONFIRMED

    def get_missing_slots(self) -> list[DialogueSlot]:
        """Return all required slots that are not yet confirmed."""
        return [
            slot for slot in self.slots.values()
            if slot.required and slot.status != SlotStatus.CONFIRMED
        ]

    def all_slots_filled(self) -> bool:
        """Check if all required slots are filled."""
        return len(self.get_missing_slots()) == 0

    def increment_turn(self) -> None:
        """Advance the turn counter."""
        self.turn_count += 1
        self.last_activity = time.time()

    def is_stale(self, timeout_seconds: int = 1800) -> bool:
        """Check if the conversation has been idle too long."""
        return (time.time() - self.last_activity) > timeout_seconds

    def to_context_string(self) -> str:
        """Serialize state to a human-readable context string for the agent."""
        lines = [
            f"Phase: {self.phase.value}",
            f"Intent: {self.active_intent or 'none'}",
            f"Turn: {self.turn_count}",
        ]
        filled_slots = {
            name: slot.value
            for name, slot in self.slots.items()
            if slot.status == SlotStatus.CONFIRMED
        }
        if filled_slots:
            lines.append(f"Confirmed slots: {filled_slots}")
        missing = self.get_missing_slots()
        if missing:
            lines.append(
                f"Missing required slots: {[s.name for s in missing]}"
            )
        return " | ".join(lines)

4.2 Task Execution State

"""
MangaAssist task execution state — tracks multi-step tool execution
within a single agent turn.
"""

import time
import uuid
from dataclasses import dataclass, field
from enum import Enum
from typing import Any


class TaskStepStatus(str, Enum):
    PENDING = "pending"
    RUNNING = "running"
    COMPLETED = "completed"
    FAILED = "failed"
    SKIPPED = "skipped"
    RETRYING = "retrying"


@dataclass
class TaskStep:
    """A single step in a multi-step task execution."""
    step_id: str = field(default_factory=lambda: str(uuid.uuid4())[:8])
    tool_name: str = ""
    parameters: dict[str, Any] = field(default_factory=dict)
    status: TaskStepStatus = TaskStepStatus.PENDING
    result: Any = None
    error: str = ""
    started_at: float = 0.0
    completed_at: float = 0.0
    retry_count: int = 0
    max_retries: int = 2

    @property
    def duration_ms(self) -> float:
        if self.started_at and self.completed_at:
            return (self.completed_at - self.started_at) * 1000
        return 0.0

    def can_retry(self) -> bool:
        return self.retry_count < self.max_retries


@dataclass
class TaskExecutionState:
    """
    Tracks the execution state of a multi-step task within an agent turn.
    For example: "Find manga similar to Naruto" might involve:
      1. Search for Naruto to get its metadata
      2. Extract genre and themes
      3. Search for similar titles
      4. Rank and format results
    """
    task_id: str = field(default_factory=lambda: str(uuid.uuid4()))
    description: str = ""
    steps: list[TaskStep] = field(default_factory=list)
    current_step_index: int = 0
    started_at: float = field(default_factory=time.time)
    completed_at: float = 0.0
    total_budget_ms: float = 3000.0  # 3-second SLA

    def add_step(self, tool_name: str, parameters: dict) -> TaskStep:
        step = TaskStep(tool_name=tool_name, parameters=parameters)
        self.steps.append(step)
        return step

    def current_step(self) -> TaskStep | None:
        if self.current_step_index < len(self.steps):
            return self.steps[self.current_step_index]
        return None

    def advance(self) -> TaskStep | None:
        self.current_step_index += 1
        return self.current_step()

    def remaining_budget_ms(self) -> float:
        elapsed = (time.time() - self.started_at) * 1000
        return max(0, self.total_budget_ms - elapsed)

    def is_over_budget(self) -> bool:
        return self.remaining_budget_ms() <= 0

    def summary(self) -> dict[str, Any]:
        return {
            "task_id": self.task_id,
            "description": self.description,
            "total_steps": len(self.steps),
            "completed_steps": sum(
                1 for s in self.steps
                if s.status == TaskStepStatus.COMPLETED
            ),
            "failed_steps": sum(
                1 for s in self.steps
                if s.status == TaskStepStatus.FAILED
            ),
            "remaining_budget_ms": self.remaining_budget_ms(),
        }

4.3 Environment Context

"""
MangaAssist environment context — external state the agent reads
but does not directly control (user profile, cart, locale, etc.).
"""

import time
from dataclasses import dataclass, field
from typing import Any


@dataclass
class UserProfile:
    """User profile loaded from DynamoDB at session start."""
    user_id: str
    display_name: str = "Guest"
    email: str = ""
    locale: str = "en-US"
    preferred_language: str = "en"
    tier: str = "standard"  # standard | premium | vip
    favorite_genres: list[str] = field(default_factory=list)
    reading_history: list[str] = field(default_factory=list)
    created_at: str = ""


@dataclass
class CartState:
    """Current shopping cart state from the e-commerce backend."""
    items: list[dict[str, Any]] = field(default_factory=list)
    total_usd: float = 0.0
    item_count: int = 0


@dataclass
class EnvironmentContext:
    """
    Immutable (per-turn) snapshot of the environment that agents can
    read to make contextual decisions.
    """
    user_profile: UserProfile = field(default_factory=UserProfile)
    cart: CartState = field(default_factory=CartState)
    current_time_utc: float = field(default_factory=time.time)
    request_locale: str = "en-US"
    is_business_hours_jst: bool = True
    active_promotions: list[str] = field(default_factory=list)

    def to_agent_context_string(self) -> str:
        """
        Format environment context for injection into the agent's
        system prompt or user message prefix.
        """
        lines = [
            f"Customer: {self.user_profile.display_name} "
            f"({self.user_profile.tier} tier)",
            f"Language: {self.user_profile.preferred_language}",
            f"Locale: {self.request_locale}",
        ]

        if self.user_profile.favorite_genres:
            lines.append(
                f"Favorite genres: {', '.join(self.user_profile.favorite_genres)}"
            )

        if self.cart.item_count > 0:
            lines.append(
                f"Cart: {self.cart.item_count} items, "
                f"${self.cart.total_usd:.2f}"
            )

        if self.active_promotions:
            lines.append(
                f"Active promotions: {', '.join(self.active_promotions)}"
            )

        if not self.is_business_hours_jst:
            lines.append("Note: Outside JP business hours — escalations delayed")

        return "\n".join(lines)

5. Comparison — Strands Agents vs AWS Agent Squad vs Custom Orchestration

Dimension Strands Agents AWS Agent Squad Custom Orchestration
Scope Single agent with tools, memory, and ReAct loop Multi-agent supervisor routing to specialized agents Hand-coded routing, state machine, and tool dispatch
Best For Single-domain tasks (one agent handles one intent) Multi-domain chatbots with distinct capability areas Unique routing logic, non-standard patterns
Routing N/A (single agent) Classifier-based intent routing via LLM or rules Developer-defined (regex, ML model, rules engine)
Memory Built-in turn history with sliding window DynamoDB-backed shared conversation store Developer-managed (Redis, DynamoDB, or any store)
Tool Integration @tool decorator, MCP support Agent-level tool binding Direct API calls or MCP client library
Streaming Native streaming support Per-agent streaming configuration Developer-implemented via WebSocket or SSE
Cost at 1M msgs/day Depends on model choice per agent +$50/day for Haiku classifier overhead No framework overhead, but more engineering time
Observability Built-in logging of reasoning steps Built-in execution time and routing logs Custom CloudWatch metrics and X-Ray tracing
Complexity Low — single file per agent Medium — orchestrator + sub-agents + classifier High — everything is hand-built
MangaAssist Usage Each sub-agent is a Strands Agent Orchestrator wrapping all four sub-agents Fallback router and SLA enforcement logic
Vendor Lock-in Open-source, Bedrock-optimized AWS-specific, uses Bedrock classifier None (but higher maintenance)
Time to Production Days 1-2 weeks 4-8 weeks

6. Key Takeaways

  1. Strands Agents power each sub-agent in MangaAssist — the @tool decorator, system prompt, and model selection create a self-contained reasoning unit. Use Sonnet ($3/$15) for complex reasoning (product search, recommendations) and Haiku ($0.25/$1.25) for simple lookups (order status, Q&A).

  2. AWS Agent Squad provides the supervisory layer — a Haiku-based classifier routes each user message to the right sub-agent at ~$50/day overhead for 1M messages. The classifier uses agent descriptions (not code) to make routing decisions, so adding a new agent requires zero code changes to the router.

  3. Two-tier memory is essential at scale — ElastiCache Redis handles sub-millisecond session memory with 30-minute TTL and token-budget trimming, while DynamoDB stores long-term user preferences and interaction summaries for personalization across sessions. Neither alone is sufficient.

  4. State management has three distinct layers — conversation state (intent, slots, phase), task execution state (tool call queue, results, budget), and environment context (user profile, cart, locale). Each layer is read by different parts of the system and updated at different frequencies.

  5. Fast-route patterns bypass the classifier — for obvious intents (order ID mentioned, explicit "search" keyword), pattern matching routes instantly without an LLM call, saving ~50ms latency and ~$25/day in classifier costs.

  6. The 3-second SLA drives architectural decisions — task execution state tracks remaining time budget, agents are configured with max_tokens limits to prevent runaway generation, and the routing layer has confidence-threshold fallbacks to avoid re-classification loops.


Next file: 02-mcp-agent-tool-interactions.md — Deep dive into MCP protocol for standardized agent-tool interactions in MangaAssist.