ReAct Reasoning Architecture for Advanced Problem-Solving Systems
MangaAssist context: JP Manga store chatbot on AWS -- Bedrock Claude 3 (Sonnet at $3/$15 per 1M tokens input/output, Haiku at $0.25/$1.25), OpenSearch Serverless (vector store), DynamoDB (sessions/products), ECS Fargate (orchestrator), API Gateway WebSocket, ElastiCache Redis. Target: useful answer in under 3 seconds, 1M messages/day scale.
Skill Mapping
| Field | Value |
|---|---|
| Domain | 2 -- Implementation and Integration of Foundation Models |
| Task | 2.1 -- Implement Agentic AI Solutions and Tool Integrations |
| Skill | 2.1.2 -- Create advanced problem-solving systems to give FMs the ability to break down and solve complex problems by following structured reasoning steps (e.g., Step Functions for ReAct patterns, chain-of-thought reasoning) |
| Applied Context | MangaAssist -- JP Manga store chatbot serving 1M messages/day |
Mind Map: ReAct and Structured Reasoning
Advanced Problem-Solving Systems (Skill 2.1.2)
|
+-- ReAct (Reasoning + Acting) Pattern
| +-- Thought: Analyze intent, plan next step
| +-- Action: Execute tool call (search, lookup, compute)
| +-- Observation: Evaluate tool output
| +-- Loop: Repeat until confident answer or max iterations
| +-- AWS Implementation: Step Functions + Lambda + Bedrock
|
+-- Chain-of-Thought (CoT) Reasoning
| +-- Zero-shot CoT: "Let's think step by step"
| +-- Few-shot CoT: Exemplar-driven reasoning traces
| +-- Self-Consistency: Multiple reasoning paths, majority vote
| +-- MangaAssist: Genre analysis -> preference matching -> recommendation ranking
|
+-- Step Functions Orchestration
| +-- State Machine for ReAct loops
| +-- Parallel branches for sub-task decomposition
| +-- Error handling with retry and fallback states
| +-- Timeout guards per reasoning step
| +-- Express Workflows for sub-3-second latency
|
+-- Problem Decomposition Strategies
| +-- Recursive decomposition: Break complex query into atomic sub-queries
| +-- Parallel fan-out: Execute independent sub-tasks concurrently
| +-- Sequential refinement: Each step narrows the solution space
| +-- Hierarchical planning: High-level plan -> detailed sub-plans
|
+-- Thought-Action-Observation Loops
+-- Bounded iteration (max 5 loops for latency budget)
+-- Early termination on high-confidence answer
+-- Observation quality scoring
+-- Trace logging for debugging and evaluation
Architecture: ReAct Loop in MangaAssist
flowchart TD
A["User Query: 'Find me a manga like Naruto but darker'"] --> B["THOUGHT 1: Analyze intent"]
B --> B1["Intent: Recommendation request\nConstraints: Similar to Naruto, darker tone\nSub-tasks: Identify Naruto traits, find darker alternatives"]
B1 --> C["ACTION 1: Search manga knowledge base"]
C --> C1["Tool: OpenSearch vector search\nQuery: Naruto genre/themes/demographics"]
C1 --> D["OBSERVATION 1: Evaluate search results"]
D --> D1["Naruto traits: shounen, action, ninja, friendship,\ncoming-of-age, long-running (72 vols)"]
D1 --> E["THOUGHT 2: Refine search criteria"]
E --> E1["Need: action + darker themes\nFilters: seinen or mature shounen,\nsimilar art style, completed or ongoing"]
E1 --> F["ACTION 2: Search with refined criteria"]
F --> F1["Tool: OpenSearch semantic search\nQuery: 'dark action manga similar themes to Naruto'\nFilters: genre=seinen OR genre=dark-shounen"]
F1 --> G["OBSERVATION 2: Evaluate candidates"]
G --> G1["Candidates: Vinland Saga, Tokyo Ghoul,\nHunter x Hunter (Chimera Ant arc),\nAttack on Titan, Berserk"]
G1 --> H["THOUGHT 3: Rank and select"]
H --> H1["Ranking criteria: thematic similarity,\ntone match 'darker', availability in store,\npopularity/ratings"]
H1 --> I["ACTION 3: Check inventory and ratings"]
I --> I1["Tool: DynamoDB product lookup\n+ ElastiCache rating cache"]
I1 --> J["OBSERVATION 3: Final data collected"]
J --> J1["All 5 titles in stock\nRatings: 4.2-4.8 range\nVinland Saga: best thematic match"]
J1 --> K["FINAL ANSWER"]
K --> K1["Top 3 recommendations with reasoning:\n1. Vinland Saga - why it matches\n2. Tokyo Ghoul - why it matches\n3. Hunter x Hunter - why it matches"]
style A fill:#e1f5fe
style K1 fill:#c8e6c9
style B fill:#fff3e0
style E fill:#fff3e0
style H fill:#fff3e0
style C fill:#e8eaf6
style F fill:#e8eaf6
style I fill:#e8eaf6
style D fill:#fce4ec
style G fill:#fce4ec
style J fill:#fce4ec
Step Functions State Machine: ReAct Loop
The following ASL (Amazon States Language) definition implements a bounded ReAct loop with timeout guards and early termination.
{
"Comment": "MangaAssist ReAct Reasoning Loop - Skill 2.1.2",
"StartAt": "InitializeReasoningContext",
"States": {
"InitializeReasoningContext": {
"Type": "Task",
"Resource": "arn:aws:lambda:us-east-1:123456789012:function:manga-react-init",
"Parameters": {
"user_query.$": "$.user_query",
"session_id.$": "$.session_id",
"user_id.$": "$.user_id",
"max_iterations": 5,
"timeout_per_step_ms": 500
},
"ResultPath": "$.reasoning_context",
"TimeoutSeconds": 2,
"Next": "ThoughtStep"
},
"ThoughtStep": {
"Type": "Task",
"Resource": "arn:aws:lambda:us-east-1:123456789012:function:manga-react-thought",
"Parameters": {
"reasoning_context.$": "$.reasoning_context",
"observation_history.$": "$.reasoning_context.observations",
"iteration.$": "$.reasoning_context.current_iteration"
},
"ResultPath": "$.thought_result",
"TimeoutSeconds": 3,
"Retry": [
{
"ErrorEquals": ["Lambda.ServiceException", "Lambda.TooManyRequestsException"],
"IntervalSeconds": 1,
"MaxAttempts": 2,
"BackoffRate": 2.0
}
],
"Catch": [
{
"ErrorEquals": ["States.Timeout"],
"ResultPath": "$.error",
"Next": "FallbackResponse"
}
],
"Next": "EvaluateThought"
},
"EvaluateThought": {
"Type": "Choice",
"Choices": [
{
"Variable": "$.thought_result.decision",
"StringEquals": "FINAL_ANSWER",
"Next": "FormatFinalAnswer"
},
{
"Variable": "$.reasoning_context.current_iteration",
"NumericGreaterThanEquals": 5,
"Next": "MaxIterationsReached"
}
],
"Default": "ActionStep"
},
"ActionStep": {
"Type": "Task",
"Resource": "arn:aws:lambda:us-east-1:123456789012:function:manga-react-action",
"Parameters": {
"thought.$": "$.thought_result",
"reasoning_context.$": "$.reasoning_context",
"available_tools": [
"opensearch_vector_search",
"dynamodb_product_lookup",
"elasticache_rating_cache",
"bedrock_genre_classifier"
]
},
"ResultPath": "$.action_result",
"TimeoutSeconds": 2,
"Catch": [
{
"ErrorEquals": ["ToolExecutionError"],
"ResultPath": "$.error",
"Next": "HandleToolError"
},
{
"ErrorEquals": ["States.Timeout"],
"ResultPath": "$.error",
"Next": "HandleToolTimeout"
}
],
"Next": "ObservationStep"
},
"ObservationStep": {
"Type": "Task",
"Resource": "arn:aws:lambda:us-east-1:123456789012:function:manga-react-observation",
"Parameters": {
"action_result.$": "$.action_result",
"thought.$": "$.thought_result",
"reasoning_context.$": "$.reasoning_context"
},
"ResultPath": "$.observation_result",
"TimeoutSeconds": 2,
"Next": "UpdateReasoningContext"
},
"UpdateReasoningContext": {
"Type": "Task",
"Resource": "arn:aws:lambda:us-east-1:123456789012:function:manga-react-update-context",
"Parameters": {
"reasoning_context.$": "$.reasoning_context",
"thought.$": "$.thought_result",
"action.$": "$.action_result",
"observation.$": "$.observation_result"
},
"ResultPath": "$.reasoning_context",
"Next": "ThoughtStep"
},
"HandleToolError": {
"Type": "Task",
"Resource": "arn:aws:lambda:us-east-1:123456789012:function:manga-react-error-handler",
"Parameters": {
"error.$": "$.error",
"reasoning_context.$": "$.reasoning_context",
"fallback_strategy": "skip_and_continue"
},
"ResultPath": "$.observation_result",
"Next": "UpdateReasoningContext"
},
"HandleToolTimeout": {
"Type": "Pass",
"Result": {
"observation": "Tool timed out - proceeding with available information",
"quality_score": 0.3
},
"ResultPath": "$.observation_result",
"Next": "UpdateReasoningContext"
},
"MaxIterationsReached": {
"Type": "Task",
"Resource": "arn:aws:lambda:us-east-1:123456789012:function:manga-react-best-effort",
"Parameters": {
"reasoning_context.$": "$.reasoning_context",
"strategy": "synthesize_from_available"
},
"ResultPath": "$.final_answer",
"Next": "LogReasoningTrace"
},
"FormatFinalAnswer": {
"Type": "Task",
"Resource": "arn:aws:lambda:us-east-1:123456789012:function:manga-react-format-answer",
"Parameters": {
"thought.$": "$.thought_result",
"reasoning_context.$": "$.reasoning_context"
},
"ResultPath": "$.final_answer",
"Next": "LogReasoningTrace"
},
"LogReasoningTrace": {
"Type": "Task",
"Resource": "arn:aws:lambda:us-east-1:123456789012:function:manga-react-log-trace",
"Parameters": {
"reasoning_context.$": "$.reasoning_context",
"final_answer.$": "$.final_answer",
"session_id.$": "$.session_id"
},
"ResultPath": "$.trace_id",
"Next": "ReturnResponse"
},
"ReturnResponse": {
"Type": "Succeed",
"OutputPath": "$.final_answer"
},
"FallbackResponse": {
"Type": "Task",
"Resource": "arn:aws:lambda:us-east-1:123456789012:function:manga-react-fallback",
"Parameters": {
"user_query.$": "$.user_query",
"error.$": "$.error",
"strategy": "simple_search_fallback"
},
"End": true
}
}
}
Production Code: ReAct Agent Implementation
Core ReAct Agent with Bedrock Claude
"""
MangaAssist ReAct Agent - Structured reasoning for complex manga queries.
Uses Bedrock Claude 3 Sonnet for reasoning steps and tool orchestration.
Implements bounded Thought-Action-Observation loops with trace capture.
"""
import json
import time
import logging
import hashlib
from enum import Enum
from typing import Any, Optional
from dataclasses import dataclass, field
from datetime import datetime, timezone
import boto3
from botocore.config import Config
logger = logging.getLogger("manga_react_agent")
logger.setLevel(logging.INFO)
# ──────────────────────────────────────────────
# Configuration
# ──────────────────────────────────────────────
BEDROCK_CONFIG = Config(
retries={"max_attempts": 2, "mode": "adaptive"},
read_timeout=5,
connect_timeout=2,
)
SONNET_MODEL_ID = "anthropic.claude-3-sonnet-20240229-v1:0"
HAIKU_MODEL_ID = "anthropic.claude-3-haiku-20240307-v1:0"
MAX_REACT_ITERATIONS = 5
LATENCY_BUDGET_MS = 2800 # Reserve 200ms for network overhead from 3s budget
PER_STEP_TIMEOUT_MS = 500
class ReActDecision(str, Enum):
"""Possible outcomes from a Thought step."""
CONTINUE = "CONTINUE" # Need more information, take another action
FINAL_ANSWER = "FINAL_ANSWER" # Confident enough to answer
CLARIFY = "CLARIFY" # Need user clarification
ESCALATE = "ESCALATE" # Cannot solve, escalate to human
@dataclass
class ThoughtResult:
"""Output from a reasoning (Thought) step."""
reasoning: str
decision: ReActDecision
next_action: Optional[str] = None
action_params: Optional[dict] = None
confidence: float = 0.0
final_answer: Optional[str] = None
@dataclass
class ActionResult:
"""Output from a tool execution (Action) step."""
tool_name: str
raw_output: Any
success: bool
latency_ms: float
error: Optional[str] = None
@dataclass
class ObservationResult:
"""Evaluated observation from an Action result."""
summary: str
quality_score: float # 0.0 - 1.0
relevant_facts: list = field(default_factory=list)
gaps: list = field(default_factory=list)
@dataclass
class ReasoningTrace:
"""Complete trace of a ReAct reasoning session."""
session_id: str
user_query: str
steps: list = field(default_factory=list)
total_latency_ms: float = 0.0
iterations: int = 0
final_decision: Optional[ReActDecision] = None
model_used: str = ""
total_input_tokens: int = 0
total_output_tokens: int = 0
class MangaAssistReActAgent:
"""
ReAct agent for MangaAssist chatbot.
Implements bounded Thought-Action-Observation loops using Bedrock Claude.
Designed for sub-3-second response times at 1M messages/day scale.
"""
def __init__(
self,
bedrock_client=None,
opensearch_client=None,
dynamodb_resource=None,
redis_client=None,
):
self.bedrock = bedrock_client or boto3.client(
"bedrock-runtime", config=BEDROCK_CONFIG
)
self.opensearch = opensearch_client
self.dynamodb = dynamodb_resource
self.redis = redis_client
self.tools = self._register_tools()
def _register_tools(self) -> dict:
"""Register available tools for the agent."""
return {
"opensearch_vector_search": {
"fn": self._tool_vector_search,
"description": "Semantic vector search over manga catalog",
"timeout_ms": 400,
},
"dynamodb_product_lookup": {
"fn": self._tool_product_lookup,
"description": "Look up specific manga product details",
"timeout_ms": 200,
},
"elasticache_rating_cache": {
"fn": self._tool_rating_lookup,
"description": "Get cached ratings and reviews for manga titles",
"timeout_ms": 100,
},
"genre_theme_classifier": {
"fn": self._tool_genre_classifier,
"description": "Classify manga genres, themes, and demographics",
"timeout_ms": 300,
},
"inventory_check": {
"fn": self._tool_inventory_check,
"description": "Check stock availability and pricing",
"timeout_ms": 150,
},
}
# ──────────────────────────────────────────────
# Main ReAct Loop
# ──────────────────────────────────────────────
async def solve(self, user_query: str, session_id: str, user_id: str) -> dict:
"""
Execute the ReAct loop for a user query.
Returns:
dict with 'answer', 'trace_id', 'latency_ms', 'iterations'
"""
start_time = time.monotonic()
trace = ReasoningTrace(
session_id=session_id,
user_query=user_query,
)
# Choose model based on query complexity
model_id = self._select_model(user_query)
trace.model_used = model_id
observations_history = []
thought_history = []
for iteration in range(MAX_REACT_ITERATIONS):
elapsed_ms = (time.monotonic() - start_time) * 1000
remaining_ms = LATENCY_BUDGET_MS - elapsed_ms
if remaining_ms < PER_STEP_TIMEOUT_MS:
logger.warning(
"Latency budget exhausted at iteration %d (%.0fms remaining)",
iteration, remaining_ms,
)
break
trace.iterations = iteration + 1
# ── THOUGHT ──
thought = await self._think(
user_query=user_query,
observations=observations_history,
thoughts=thought_history,
model_id=model_id,
iteration=iteration,
)
thought_history.append(thought)
trace.steps.append({
"type": "thought",
"iteration": iteration,
"reasoning": thought.reasoning,
"decision": thought.decision.value,
"confidence": thought.confidence,
"elapsed_ms": (time.monotonic() - start_time) * 1000,
})
# Check for terminal decisions
if thought.decision == ReActDecision.FINAL_ANSWER:
trace.final_decision = ReActDecision.FINAL_ANSWER
trace.total_latency_ms = (time.monotonic() - start_time) * 1000
return self._format_response(thought.final_answer, trace)
if thought.decision == ReActDecision.CLARIFY:
trace.final_decision = ReActDecision.CLARIFY
trace.total_latency_ms = (time.monotonic() - start_time) * 1000
return self._format_clarification(thought.final_answer, trace)
# ── ACTION ──
action = await self._act(
tool_name=thought.next_action,
params=thought.action_params,
remaining_ms=remaining_ms - PER_STEP_TIMEOUT_MS,
)
trace.steps.append({
"type": "action",
"iteration": iteration,
"tool": action.tool_name,
"success": action.success,
"latency_ms": action.latency_ms,
"elapsed_ms": (time.monotonic() - start_time) * 1000,
})
# ── OBSERVATION ──
observation = await self._observe(
action_result=action,
thought=thought,
user_query=user_query,
model_id=HAIKU_MODEL_ID, # Use Haiku for observation eval (cheaper)
)
observations_history.append(observation)
trace.steps.append({
"type": "observation",
"iteration": iteration,
"summary": observation.summary,
"quality_score": observation.quality_score,
"facts_found": len(observation.relevant_facts),
"gaps_remaining": len(observation.gaps),
"elapsed_ms": (time.monotonic() - start_time) * 1000,
})
# Max iterations reached -- synthesize best-effort answer
trace.final_decision = ReActDecision.CONTINUE
trace.total_latency_ms = (time.monotonic() - start_time) * 1000
best_effort = await self._synthesize_best_effort(
user_query, observations_history, thought_history, model_id
)
return self._format_response(best_effort, trace)
# ──────────────────────────────────────────────
# Thought Step
# ──────────────────────────────────────────────
async def _think(
self,
user_query: str,
observations: list[ObservationResult],
thoughts: list[ThoughtResult],
model_id: str,
iteration: int,
) -> ThoughtResult:
"""
Generate a reasoning step: analyze what we know, decide what to do next.
"""
obs_context = ""
for i, obs in enumerate(observations):
obs_context += (
f"\n--- Observation {i+1} ---\n"
f"Summary: {obs.summary}\n"
f"Quality: {obs.quality_score:.1f}\n"
f"Facts: {json.dumps(obs.relevant_facts)}\n"
f"Gaps: {json.dumps(obs.gaps)}\n"
)
tools_desc = "\n".join(
f"- {name}: {info['description']}"
for name, info in self.tools.items()
)
prompt = f"""You are the reasoning engine for MangaAssist, a JP manga store chatbot.
Your task is to analyze the user's query and decide the next step.
USER QUERY: {user_query}
AVAILABLE TOOLS:
{tools_desc}
PREVIOUS OBSERVATIONS:
{obs_context if obs_context else "None yet -- this is the first reasoning step."}
ITERATION: {iteration + 1} of {MAX_REACT_ITERATIONS}
Think carefully, then respond in this exact JSON format:
{{
"reasoning": "Your step-by-step reasoning about what you know and what you still need",
"decision": "CONTINUE | FINAL_ANSWER | CLARIFY",
"confidence": 0.0 to 1.0,
"next_action": "tool_name or null if FINAL_ANSWER/CLARIFY",
"action_params": {{}} or null,
"final_answer": "The answer text if decision is FINAL_ANSWER or CLARIFY, else null"
}}
Rules:
- Choose FINAL_ANSWER when confidence >= 0.8 and you have enough facts
- Choose CLARIFY only if the query is genuinely ambiguous
- Choose CONTINUE to gather more information via a tool
- Prefer opensearch_vector_search for finding similar manga
- Use elasticache_rating_cache for ratings (fastest tool)
- Use dynamodb_product_lookup for specific product details
- Consider cost: each iteration costs tokens. Be efficient."""
response = self.bedrock.invoke_model(
modelId=model_id,
contentType="application/json",
accept="application/json",
body=json.dumps({
"anthropic_version": "bedrock-2023-05-31",
"max_tokens": 600,
"temperature": 0.1,
"messages": [{"role": "user", "content": prompt}],
}),
)
result = json.loads(response["body"].read())
text = result["content"][0]["text"]
# Track token usage
input_tokens = result.get("usage", {}).get("input_tokens", 0)
output_tokens = result.get("usage", {}).get("output_tokens", 0)
parsed = json.loads(text)
return ThoughtResult(
reasoning=parsed["reasoning"],
decision=ReActDecision(parsed["decision"]),
next_action=parsed.get("next_action"),
action_params=parsed.get("action_params"),
confidence=parsed.get("confidence", 0.0),
final_answer=parsed.get("final_answer"),
)
# ──────────────────────────────────────────────
# Action Step
# ──────────────────────────────────────────────
async def _act(
self, tool_name: str, params: dict, remaining_ms: float
) -> ActionResult:
"""
Execute the selected tool with the given parameters.
"""
if tool_name not in self.tools:
return ActionResult(
tool_name=tool_name,
raw_output=None,
success=False,
latency_ms=0,
error=f"Unknown tool: {tool_name}",
)
tool_info = self.tools[tool_name]
timeout_ms = min(tool_info["timeout_ms"], remaining_ms)
start = time.monotonic()
try:
result = await tool_info["fn"](params or {}, timeout_ms=timeout_ms)
latency = (time.monotonic() - start) * 1000
return ActionResult(
tool_name=tool_name,
raw_output=result,
success=True,
latency_ms=latency,
)
except TimeoutError:
latency = (time.monotonic() - start) * 1000
return ActionResult(
tool_name=tool_name,
raw_output=None,
success=False,
latency_ms=latency,
error="Tool execution timed out",
)
except Exception as e:
latency = (time.monotonic() - start) * 1000
logger.error("Tool %s failed: %s", tool_name, str(e))
return ActionResult(
tool_name=tool_name,
raw_output=None,
success=False,
latency_ms=latency,
error=str(e),
)
# ──────────────────────────────────────────────
# Observation Step
# ──────────────────────────────────────────────
async def _observe(
self,
action_result: ActionResult,
thought: ThoughtResult,
user_query: str,
model_id: str,
) -> ObservationResult:
"""
Evaluate the action result: extract facts, assess quality, identify gaps.
Uses Haiku for cost efficiency ($0.25/1M input tokens).
"""
if not action_result.success:
return ObservationResult(
summary=f"Tool {action_result.tool_name} failed: {action_result.error}",
quality_score=0.0,
relevant_facts=[],
gaps=[thought.reasoning],
)
prompt = f"""Evaluate this tool output for answering the user's manga query.
USER QUERY: {user_query}
REASONING CONTEXT: {thought.reasoning}
TOOL: {action_result.tool_name}
TOOL OUTPUT: {json.dumps(action_result.raw_output, default=str)[:2000]}
Respond in JSON:
{{
"summary": "Brief summary of what the tool returned",
"quality_score": 0.0 to 1.0 (how useful is this for answering the query),
"relevant_facts": ["fact1", "fact2", ...],
"gaps": ["what we still need to find out", ...]
}}"""
response = self.bedrock.invoke_model(
modelId=model_id,
contentType="application/json",
accept="application/json",
body=json.dumps({
"anthropic_version": "bedrock-2023-05-31",
"max_tokens": 400,
"temperature": 0.0,
"messages": [{"role": "user", "content": prompt}],
}),
)
result = json.loads(response["body"].read())
text = result["content"][0]["text"]
parsed = json.loads(text)
return ObservationResult(
summary=parsed["summary"],
quality_score=parsed["quality_score"],
relevant_facts=parsed.get("relevant_facts", []),
gaps=parsed.get("gaps", []),
)
# ──────────────────────────────────────────────
# Model Selection
# ──────────────────────────────────────────────
def _select_model(self, query: str) -> str:
"""
Route to Sonnet or Haiku based on query complexity.
Sonnet ($3/$15 per 1M tokens): Complex reasoning, multi-constraint queries
Haiku ($0.25/$1.25 per 1M tokens): Simple lookups, direct questions
Cost impact at 1M messages/day:
- All Sonnet: ~$18K/day (assuming avg 1K input + 500 output tokens)
- All Haiku: ~$875/day
- Smart routing (30% Sonnet / 70% Haiku): ~$6K/day
"""
complexity_signals = [
"similar to", "like", "recommend", "compare", "but",
"and also", "except", "between", "difference",
"completed", "under", "volumes", "available in",
]
complexity_count = sum(
1 for signal in complexity_signals if signal.lower() in query.lower()
)
if complexity_count >= 2:
return SONNET_MODEL_ID
return HAIKU_MODEL_ID
# ──────────────────────────────────────────────
# Tool Implementations
# ──────────────────────────────────────────────
async def _tool_vector_search(self, params: dict, timeout_ms: float) -> dict:
"""Semantic search over manga catalog using OpenSearch Serverless."""
query_text = params.get("query", "")
filters = params.get("filters", {})
top_k = params.get("top_k", 10)
# Generate embedding via Bedrock Titan Embeddings
embed_response = self.bedrock.invoke_model(
modelId="amazon.titan-embed-text-v2:0",
contentType="application/json",
accept="application/json",
body=json.dumps({"inputText": query_text, "dimensions": 1024}),
)
embedding = json.loads(embed_response["body"].read())["embedding"]
# Build OpenSearch knn query with filters
os_query = {
"size": top_k,
"query": {
"bool": {
"must": [
{
"knn": {
"manga_embedding": {
"vector": embedding,
"k": top_k,
}
}
}
],
"filter": self._build_os_filters(filters),
}
},
"_source": [
"title", "title_jp", "author", "genres", "themes",
"demographics", "volumes", "status", "synopsis", "rating",
],
}
results = self.opensearch.search(index="manga-catalog", body=os_query)
return {
"matches": [
{**hit["_source"], "score": hit["_score"]}
for hit in results["hits"]["hits"]
],
"total": results["hits"]["total"]["value"],
}
async def _tool_product_lookup(self, params: dict, timeout_ms: float) -> dict:
"""Look up specific manga product in DynamoDB."""
table = self.dynamodb.Table("manga-products")
manga_id = params.get("manga_id")
title = params.get("title")
if manga_id:
response = table.get_item(Key={"manga_id": manga_id})
return response.get("Item", {})
if title:
response = table.query(
IndexName="title-index",
KeyConditionExpression="title = :t",
ExpressionAttributeValues={":t": title},
Limit=5,
)
return {"items": response.get("Items", [])}
return {"error": "Provide manga_id or title"}
async def _tool_rating_lookup(self, params: dict, timeout_ms: float) -> dict:
"""Get cached ratings from ElastiCache Redis."""
manga_ids = params.get("manga_ids", [])
ratings = {}
for mid in manga_ids:
cache_key = f"rating:{mid}"
cached = self.redis.get(cache_key)
if cached:
ratings[mid] = json.loads(cached)
return ratings
async def _tool_genre_classifier(self, params: dict, timeout_ms: float) -> dict:
"""Classify manga by genre/theme using Bedrock Haiku (fast and cheap)."""
description = params.get("description", "")
prompt = f"""Classify this manga description into genres, themes, and demographics.
Description: {description}
Respond in JSON: {{"genres": [], "themes": [], "demographics": "", "tone": ""}}"""
response = self.bedrock.invoke_model(
modelId=HAIKU_MODEL_ID,
contentType="application/json",
accept="application/json",
body=json.dumps({
"anthropic_version": "bedrock-2023-05-31",
"max_tokens": 200,
"temperature": 0.0,
"messages": [{"role": "user", "content": prompt}],
}),
)
result = json.loads(response["body"].read())
return json.loads(result["content"][0]["text"])
async def _tool_inventory_check(self, params: dict, timeout_ms: float) -> dict:
"""Check stock availability and pricing."""
manga_ids = params.get("manga_ids", [])
table = self.dynamodb.Table("manga-inventory")
results = {}
for mid in manga_ids:
# Check Redis cache first
cache_key = f"inventory:{mid}"
cached = self.redis.get(cache_key)
if cached:
results[mid] = json.loads(cached)
else:
item = table.get_item(Key={"manga_id": mid}).get("Item", {})
results[mid] = {
"in_stock": item.get("quantity", 0) > 0,
"quantity": item.get("quantity", 0),
"price_jpy": item.get("price_jpy", 0),
"price_usd": item.get("price_usd", 0),
}
self.redis.setex(cache_key, 300, json.dumps(results[mid]))
return results
def _build_os_filters(self, filters: dict) -> list:
"""Build OpenSearch filter clauses from parameter dict."""
clauses = []
if "genre" in filters:
clauses.append({"term": {"genres": filters["genre"]}})
if "status" in filters:
clauses.append({"term": {"status": filters["status"]}})
if "max_volumes" in filters:
clauses.append({"range": {"volumes": {"lte": filters["max_volumes"]}}})
if "demographics" in filters:
clauses.append({"term": {"demographics": filters["demographics"]}})
if "min_rating" in filters:
clauses.append({"range": {"rating": {"gte": filters["min_rating"]}}})
return clauses
# ──────────────────────────────────────────────
# Response Formatting
# ──────────────────────────────────────────────
async def _synthesize_best_effort(
self,
user_query: str,
observations: list[ObservationResult],
thoughts: list[ThoughtResult],
model_id: str,
) -> str:
"""Synthesize a best-effort answer when max iterations are reached."""
all_facts = []
for obs in observations:
all_facts.extend(obs.relevant_facts)
prompt = f"""Based on the following gathered facts, provide the best possible
answer to the user's manga query. Be honest if information is incomplete.
USER QUERY: {user_query}
GATHERED FACTS: {json.dumps(all_facts)}
Provide a helpful, conversational answer with manga recommendations if possible."""
response = self.bedrock.invoke_model(
modelId=model_id,
contentType="application/json",
accept="application/json",
body=json.dumps({
"anthropic_version": "bedrock-2023-05-31",
"max_tokens": 500,
"temperature": 0.3,
"messages": [{"role": "user", "content": prompt}],
}),
)
result = json.loads(response["body"].read())
return result["content"][0]["text"]
def _format_response(self, answer: str, trace: ReasoningTrace) -> dict:
"""Format the final response with metadata."""
trace_id = hashlib.sha256(
f"{trace.session_id}:{trace.user_query}:{time.time()}".encode()
).hexdigest()[:16]
return {
"answer": answer,
"trace_id": trace_id,
"latency_ms": round(trace.total_latency_ms, 1),
"iterations": trace.iterations,
"model": trace.model_used,
"decision": trace.final_decision.value if trace.final_decision else "UNKNOWN",
}
def _format_clarification(self, question: str, trace: ReasoningTrace) -> dict:
"""Format a clarification request."""
return {
"answer": question,
"needs_clarification": True,
"trace_id": hashlib.sha256(
f"{trace.session_id}:clarify:{time.time()}".encode()
).hexdigest()[:16],
"latency_ms": round(trace.total_latency_ms, 1),
}
Chain-of-Thought Prompt Templates for MangaAssist
Manga Recommendation Reasoning Template
"""
Chain-of-Thought prompt templates that guide Claude through structured reasoning
for MangaAssist use cases. These are used inside the ReAct Thought step.
"""
COT_MANGA_RECOMMENDATION = """You are MangaAssist's recommendation reasoning engine.
Think through this recommendation request step by step.
USER REQUEST: {user_query}
USER HISTORY: {user_history}
CATALOG DATA: {catalog_context}
Follow these reasoning steps:
STEP 1 - PARSE CONSTRAINTS:
Identify all explicit and implicit constraints in the user's request.
- Explicit: stated genres, titles, volume counts, completion status
- Implicit: demographic preferences, tone preferences, reading level
STEP 2 - REFERENCE ANALYSIS:
If the user references a specific manga (e.g., "like Naruto"):
- Genre tags: {reference_genres}
- Core themes: {reference_themes}
- Demographics: {reference_demographics}
- Art style category: {reference_art_style}
- Emotional tone: {reference_tone}
STEP 3 - SIMILARITY SCORING:
For each candidate manga from the catalog:
a) Genre overlap score (0-1): How many genres match?
b) Theme similarity (0-1): How thematically close?
c) Tone match (0-1): Does the emotional tone align with what user wants?
(Important: if user says "but darker", penalize candidates with lighter tone)
d) Constraint satisfaction (0-1): Does it meet all explicit constraints?
e) Popularity bonus (0-0.2): Higher-rated manga get a small boost
STEP 4 - RANKING:
Weighted score = 0.25*genre + 0.30*theme + 0.25*tone + 0.15*constraints + 0.05*popularity
Rank candidates by weighted score descending.
STEP 5 - DIVERSITY CHECK:
Ensure top-3 are not all from the same author or sub-genre.
If too homogeneous, swap the 3rd pick for a more diverse option.
STEP 6 - EXPLANATION GENERATION:
For each top recommendation, generate a 1-2 sentence explanation connecting
it to the user's request. Reference specific similarities.
Provide your reasoning for each step, then your final recommendations."""
COT_ORDER_TROUBLESHOOTING = """You are MangaAssist's order troubleshooting engine.
Diagnose this customer's order issue step by step.
CUSTOMER ISSUE: {customer_issue}
ORDER DATA: {order_data}
CUSTOMER HISTORY: {customer_history}
Follow these diagnostic steps:
STEP 1 - CLASSIFY THE ISSUE:
Categorize into one of:
a) Shipping delay (order placed but not delivered on time)
b) Wrong item (received different manga than ordered)
c) Damaged item (manga arrived damaged)
d) Payment issue (charge problem, refund needed)
e) Availability (item went out of stock after ordering)
f) Account issue (login, address, payment method)
STEP 2 - GATHER RELEVANT DATA:
Based on the classification, identify which data points matter:
- Order ID, placed date, expected delivery, current status
- Shipping carrier, tracking number, last tracking event
- Payment method, charge amount, refund status
- Product SKU, title, quantity
STEP 3 - ROOT CAUSE ANALYSIS:
Based on the data, determine the most likely root cause:
- Is this a carrier issue? (tracking shows stuck at facility)
- Is this a warehouse issue? (not shipped yet, past SLA)
- Is this a system issue? (order stuck in processing)
- Is this a customer error? (wrong address, typo)
STEP 4 - RESOLUTION OPTIONS:
List possible resolutions ranked by customer satisfaction:
a) Immediate resolution (refund, reship, etc.)
b) Investigation needed (escalate to team)
c) Customer action required (provide info, wait)
STEP 5 - COMPOSE RESPONSE:
Draft a response that:
- Acknowledges the issue empathetically
- Explains what happened (if known)
- States the resolution clearly
- Provides next steps and timeline
- Offers proactive help (coupon, priority shipping)
Provide your reasoning for each step."""
Step Functions Integration: Lambda Handlers
"""
Lambda handlers for each step in the Step Functions ReAct state machine.
Each handler is a self-contained function invoked by the state machine.
"""
import json
import time
import boto3
import logging
from datetime import datetime, timezone
logger = logging.getLogger()
logger.setLevel(logging.INFO)
bedrock = boto3.client("bedrock-runtime")
dynamodb = boto3.resource("dynamodb")
sessions_table = dynamodb.Table("manga-reasoning-sessions")
# ──────────────────────────────────────────────
# Handler: Initialize Reasoning Context
# ──────────────────────────────────────────────
def handler_init(event, context):
"""
Initialize the reasoning context for a new ReAct session.
Called by: InitializeReasoningContext state
"""
session_id = event["session_id"]
user_query = event["user_query"]
user_id = event["user_id"]
max_iterations = event.get("max_iterations", 5)
# Load user preferences from cache/DB for context
user_prefs = _load_user_preferences(user_id)
reasoning_context = {
"session_id": session_id,
"user_query": user_query,
"user_id": user_id,
"user_preferences": user_prefs,
"max_iterations": max_iterations,
"current_iteration": 0,
"observations": [],
"thoughts": [],
"actions": [],
"start_time": datetime.now(timezone.utc).isoformat(),
"token_usage": {"input": 0, "output": 0},
}
# Persist initial context
sessions_table.put_item(Item={
"session_id": session_id,
"status": "in_progress",
"context": json.dumps(reasoning_context, default=str),
"created_at": reasoning_context["start_time"],
"ttl": int(time.time()) + 3600,
})
return reasoning_context
def _load_user_preferences(user_id: str) -> dict:
"""Load user preferences for personalized reasoning."""
prefs_table = dynamodb.Table("manga-user-preferences")
try:
response = prefs_table.get_item(Key={"user_id": user_id})
return response.get("Item", {}).get("preferences", {})
except Exception:
return {}
# ──────────────────────────────────────────────
# Handler: Thought Step
# ──────────────────────────────────────────────
def handler_thought(event, context):
"""
Execute the Thought step: analyze current state and decide next action.
Called by: ThoughtStep state
"""
ctx = event["reasoning_context"]
iteration = ctx["current_iteration"]
observations = ctx.get("observations", [])
model_id = _select_model_for_iteration(ctx["user_query"], iteration)
obs_summary = _format_observations(observations)
prompt = f"""You are MangaAssist's reasoning engine (iteration {iteration + 1}/{ctx['max_iterations']}).
USER QUERY: {ctx['user_query']}
USER PREFERENCES: {json.dumps(ctx.get('user_preferences', {}))}
OBSERVATIONS SO FAR:
{obs_summary}
Available tools: opensearch_vector_search, dynamodb_product_lookup,
elasticache_rating_cache, genre_theme_classifier, inventory_check
Respond in JSON:
{{
"reasoning": "step-by-step analysis",
"decision": "CONTINUE | FINAL_ANSWER | CLARIFY",
"confidence": 0.0-1.0,
"next_action": "tool_name or null",
"action_params": {{}},
"final_answer": "answer if FINAL_ANSWER/CLARIFY else null"
}}"""
response = bedrock.invoke_model(
modelId=model_id,
contentType="application/json",
accept="application/json",
body=json.dumps({
"anthropic_version": "bedrock-2023-05-31",
"max_tokens": 600,
"temperature": 0.1,
"messages": [{"role": "user", "content": prompt}],
}),
)
result = json.loads(response["body"].read())
text = result["content"][0]["text"]
usage = result.get("usage", {})
parsed = json.loads(text)
parsed["model_used"] = model_id
parsed["tokens"] = {
"input": usage.get("input_tokens", 0),
"output": usage.get("output_tokens", 0),
}
return parsed
def _select_model_for_iteration(query: str, iteration: int) -> str:
"""First iteration uses Sonnet for deeper analysis, later ones use Haiku."""
if iteration == 0 and _is_complex_query(query):
return "anthropic.claude-3-sonnet-20240229-v1:0"
return "anthropic.claude-3-haiku-20240307-v1:0"
def _is_complex_query(query: str) -> bool:
"""Heuristic for query complexity."""
complex_keywords = ["similar", "like", "recommend", "compare", "but", "except"]
return sum(1 for kw in complex_keywords if kw in query.lower()) >= 2
def _format_observations(observations: list) -> str:
"""Format observation history for the prompt."""
if not observations:
return "No observations yet."
lines = []
for i, obs in enumerate(observations):
lines.append(f"[{i+1}] {obs.get('summary', 'N/A')} "
f"(quality: {obs.get('quality_score', 0):.1f})")
return "\n".join(lines)
# ──────────────────────────────────────────────
# Handler: Action Step
# ──────────────────────────────────────────────
def handler_action(event, context):
"""
Execute the selected tool action.
Called by: ActionStep state
"""
thought = event["thought"]
tool_name = thought["next_action"]
params = thought.get("action_params", {})
start = time.monotonic()
tool_handlers = {
"opensearch_vector_search": _execute_vector_search,
"dynamodb_product_lookup": _execute_product_lookup,
"elasticache_rating_cache": _execute_rating_lookup,
"genre_theme_classifier": _execute_genre_classifier,
"inventory_check": _execute_inventory_check,
}
handler = tool_handlers.get(tool_name)
if not handler:
return {
"tool_name": tool_name,
"success": False,
"error": f"Unknown tool: {tool_name}",
"latency_ms": 0,
}
try:
result = handler(params)
latency = (time.monotonic() - start) * 1000
return {
"tool_name": tool_name,
"success": True,
"output": result,
"latency_ms": round(latency, 1),
}
except Exception as e:
latency = (time.monotonic() - start) * 1000
logger.error("Tool %s failed: %s", tool_name, str(e))
return {
"tool_name": tool_name,
"success": False,
"error": str(e),
"latency_ms": round(latency, 1),
}
def _execute_vector_search(params: dict) -> dict:
"""Execute OpenSearch vector search (placeholder implementation)."""
# In production, invoke OpenSearch Serverless with knn query
return {"matches": [], "total": 0}
def _execute_product_lookup(params: dict) -> dict:
"""Execute DynamoDB product lookup."""
table = dynamodb.Table("manga-products")
manga_id = params.get("manga_id")
if manga_id:
resp = table.get_item(Key={"manga_id": manga_id})
return resp.get("Item", {})
return {}
def _execute_rating_lookup(params: dict) -> dict:
"""Execute ElastiCache rating lookup (placeholder)."""
return {}
def _execute_genre_classifier(params: dict) -> dict:
"""Classify genres using Haiku."""
return {}
def _execute_inventory_check(params: dict) -> dict:
"""Check inventory in DynamoDB."""
return {}
# ──────────────────────────────────────────────
# Handler: Observation Step
# ──────────────────────────────────────────────
def handler_observation(event, context):
"""
Evaluate the action result and produce a structured observation.
Uses Haiku for cost-efficient evaluation.
Called by: ObservationStep state
"""
action = event["action_result"]
thought = event["thought"]
ctx = event["reasoning_context"]
if not action.get("success", False):
return {
"summary": f"Tool {action['tool_name']} failed: {action.get('error', 'unknown')}",
"quality_score": 0.0,
"relevant_facts": [],
"gaps": [thought.get("reasoning", "")],
}
prompt = f"""Evaluate this tool output concisely.
USER QUERY: {ctx['user_query']}
TOOL: {action['tool_name']}
OUTPUT: {json.dumps(action.get('output', {}), default=str)[:1500]}
JSON response: {{"summary": "...", "quality_score": 0.0-1.0, "relevant_facts": [...], "gaps": [...]}}"""
response = bedrock.invoke_model(
modelId="anthropic.claude-3-haiku-20240307-v1:0",
contentType="application/json",
accept="application/json",
body=json.dumps({
"anthropic_version": "bedrock-2023-05-31",
"max_tokens": 300,
"temperature": 0.0,
"messages": [{"role": "user", "content": prompt}],
}),
)
result = json.loads(response["body"].read())
return json.loads(result["content"][0]["text"])
# ──────────────────────────────────────────────
# Handler: Update Reasoning Context
# ──────────────────────────────────────────────
def handler_update_context(event, context):
"""
Append the latest thought/action/observation to the reasoning context
and increment the iteration counter.
Called by: UpdateReasoningContext state
"""
ctx = event["reasoning_context"]
thought = event["thought"]
action = event["action"]
observation = event["observation"]
ctx["thoughts"].append(thought)
ctx["actions"].append(action)
ctx["observations"].append(observation)
ctx["current_iteration"] += 1
# Accumulate token usage
if "tokens" in thought:
ctx["token_usage"]["input"] += thought["tokens"].get("input", 0)
ctx["token_usage"]["output"] += thought["tokens"].get("output", 0)
# Persist updated context
sessions_table.update_item(
Key={"session_id": ctx["session_id"]},
UpdateExpression="SET context = :c, updated_at = :u",
ExpressionAttributeValues={
":c": json.dumps(ctx, default=str),
":u": datetime.now(timezone.utc).isoformat(),
},
)
return ctx
# ──────────────────────────────────────────────
# Handler: Log Reasoning Trace
# ──────────────────────────────────────────────
def handler_log_trace(event, context):
"""
Persist the complete reasoning trace for debugging and evaluation.
Called by: LogReasoningTrace state
"""
ctx = event["reasoning_context"]
final_answer = event["final_answer"]
trace = {
"session_id": ctx["session_id"],
"user_query": ctx["user_query"],
"iterations": ctx["current_iteration"],
"thoughts": ctx["thoughts"],
"actions": ctx["actions"],
"observations": ctx["observations"],
"final_answer": final_answer,
"token_usage": ctx["token_usage"],
"timestamp": datetime.now(timezone.utc).isoformat(),
}
# Write to DynamoDB traces table
traces_table = dynamodb.Table("manga-reasoning-traces")
traces_table.put_item(Item={
"trace_id": f"{ctx['session_id']}:{int(time.time())}",
"trace": json.dumps(trace, default=str),
"created_at": trace["timestamp"],
"ttl": int(time.time()) + 86400 * 7, # 7-day retention
})
logger.info(
"Reasoning trace: session=%s iterations=%d tokens_in=%d tokens_out=%d",
ctx["session_id"],
ctx["current_iteration"],
ctx["token_usage"]["input"],
ctx["token_usage"]["output"],
)
return {"trace_id": f"{ctx['session_id']}:{int(time.time())}"}
Problem Decomposition: Complex Query Example
"""
Decomposing "find me a manga like Naruto but darker" into structured sub-tasks.
Demonstrates how the ReAct agent breaks a single user query into atomic operations.
"""
import json
from dataclasses import dataclass
from typing import Optional
@dataclass
class SubTask:
"""An atomic sub-task derived from query decomposition."""
id: str
description: str
tool: str
params: dict
depends_on: list # IDs of sub-tasks this depends on
priority: int # Lower = higher priority
class QueryDecomposer:
"""
Decomposes complex manga queries into structured sub-task plans.
Example input: "Find me a manga like Naruto but darker"
Example output: A DAG of 5 sub-tasks with dependencies
"""
DECOMPOSITION_PROMPT = """Decompose this manga query into atomic sub-tasks.
USER QUERY: {query}
Return a JSON array of sub-tasks:
[
{{
"id": "task_1",
"description": "What this sub-task accomplishes",
"tool": "tool_name",
"params": {{}},
"depends_on": [],
"priority": 1
}}
]
Available tools:
- opensearch_vector_search: Semantic search for similar manga
- dynamodb_product_lookup: Get specific manga details
- elasticache_rating_cache: Get ratings
- genre_theme_classifier: Classify genre/theme/tone
- inventory_check: Check availability and pricing
Rules:
- Independent tasks should have no dependencies (can run in parallel)
- Tasks that need output from previous tasks list dependencies
- Keep total sub-tasks under 6 for latency budget
- Priority 1 = most important, 5 = least important"""
def __init__(self, bedrock_client):
self.bedrock = bedrock_client
def decompose(self, query: str) -> list[SubTask]:
"""
Decompose a user query into a list of sub-tasks.
"Find me a manga like Naruto but darker" becomes:
task_1: Retrieve Naruto's genre/theme profile (genre_theme_classifier)
depends_on: [] -- can start immediately
priority: 1
task_2: Semantic search for "dark action manga similar to Naruto"
depends_on: [] -- can start immediately (parallel with task_1)
priority: 1
task_3: Filter results by "darker" tone constraint
depends_on: [task_1, task_2] -- needs both profile and candidates
priority: 2
task_4: Get ratings for filtered candidates
depends_on: [task_3] -- needs the filtered list
priority: 3
task_5: Check inventory for top candidates
depends_on: [task_3] -- needs the filtered list
priority: 3 (parallel with task_4)
"""
response = self.bedrock.invoke_model(
modelId="anthropic.claude-3-haiku-20240307-v1:0",
contentType="application/json",
accept="application/json",
body=json.dumps({
"anthropic_version": "bedrock-2023-05-31",
"max_tokens": 800,
"temperature": 0.0,
"messages": [{
"role": "user",
"content": self.DECOMPOSITION_PROMPT.format(query=query),
}],
}),
)
result = json.loads(response["body"].read())
tasks_json = json.loads(result["content"][0]["text"])
return [
SubTask(
id=t["id"],
description=t["description"],
tool=t["tool"],
params=t["params"],
depends_on=t.get("depends_on", []),
priority=t.get("priority", 3),
)
for t in tasks_json
]
def build_execution_plan(self, tasks: list[SubTask]) -> list[list[SubTask]]:
"""
Build a parallel execution plan (list of waves).
Each wave contains tasks that can run concurrently.
Returns:
[
[task_1, task_2], # Wave 1: no dependencies, run in parallel
[task_3], # Wave 2: depends on wave 1
[task_4, task_5], # Wave 3: both depend on task_3, run in parallel
]
"""
completed = set()
waves = []
remaining = list(tasks)
while remaining:
wave = []
for task in remaining:
if all(dep in completed for dep in task.depends_on):
wave.append(task)
if not wave:
# Circular dependency or error -- break with remaining tasks
wave = remaining[:1]
for task in wave:
remaining.remove(task)
completed.add(task.id)
waves.append(wave)
return waves
# ──────────────────────────────────────────────
# Example: Full Decomposition Trace
# ──────────────────────────────────────────────
EXAMPLE_DECOMPOSITION = """
Query: "Find me a manga like Naruto but darker"
Decomposition:
Wave 1 (parallel):
[task_1] Classify Naruto: genre=shounen, themes=[ninja, friendship, coming-of-age],
tone=adventurous, demographics=teen
[task_2] Vector search: "dark action manga, ninja themes, mature tone"
-> Returns 15 candidates
Wave 2 (depends on wave 1):
[task_3] Filter candidates by:
- Must share >= 2 themes with Naruto
- Tone must be "dark", "gritty", or "mature"
- Exclude Naruto itself
-> Returns 7 candidates: Vinland Saga, Tokyo Ghoul, Berserk,
Attack on Titan, Hunter x Hunter, Jujutsu Kaisen, Claymore
Wave 3 (parallel, depends on wave 2):
[task_4] Get ratings: VS=4.8, TG=4.3, B=4.7, AoT=4.6, HxH=4.5, JJK=4.4, C=4.1
[task_5] Check inventory: All in stock except Claymore vol 15-20
Final ranking (computed in THOUGHT step):
1. Vinland Saga (4.8 rating, strong theme match, darker tone) -- BEST MATCH
2. Attack on Titan (4.6, action+dark+coming-of-age overlap)
3. Hunter x Hunter (4.5, specific arcs match "darker" criterion)
Total: 3 waves, 5 tool calls, ~1.8 seconds
"""
Reasoning Trace Capture and Logging
"""
Reasoning trace infrastructure for debugging, evaluation, and cost tracking.
Every ReAct loop produces a structured trace stored in DynamoDB with CloudWatch metrics.
"""
import json
import time
import logging
import boto3
from datetime import datetime, timezone
from dataclasses import dataclass, field, asdict
logger = logging.getLogger("reasoning_trace")
cloudwatch = boto3.client("cloudwatch")
dynamodb = boto3.resource("dynamodb")
traces_table = dynamodb.Table("manga-reasoning-traces")
@dataclass
class ReasoningStepLog:
"""A single step in the reasoning trace."""
step_type: str # "thought", "action", "observation"
iteration: int
timestamp: str
duration_ms: float
content: dict
model_id: Optional[str] = None
input_tokens: int = 0
output_tokens: int = 0
@dataclass
class FullReasoningTrace:
"""Complete reasoning trace for a ReAct session."""
trace_id: str
session_id: str
user_id: str
user_query: str
start_time: str
end_time: str = ""
total_duration_ms: float = 0.0
total_iterations: int = 0
final_decision: str = ""
total_input_tokens: int = 0
total_output_tokens: int = 0
estimated_cost_usd: float = 0.0
steps: list = field(default_factory=list)
success: bool = True
error: Optional[str] = None
class ReasoningTraceLogger:
"""
Captures and persists reasoning traces for every ReAct session.
Features:
- Structured step-by-step logging
- Token usage and cost estimation
- CloudWatch metrics emission
- DynamoDB persistence with TTL
- Query patterns for debugging specific sessions
"""
# Pricing per 1M tokens (Bedrock Claude 3)
PRICING = {
"anthropic.claude-3-sonnet-20240229-v1:0": {"input": 3.0, "output": 15.0},
"anthropic.claude-3-haiku-20240307-v1:0": {"input": 0.25, "output": 1.25},
}
def __init__(self):
self._current_traces: dict[str, FullReasoningTrace] = {}
def start_trace(
self, trace_id: str, session_id: str, user_id: str, user_query: str
) -> FullReasoningTrace:
"""Begin a new reasoning trace."""
trace = FullReasoningTrace(
trace_id=trace_id,
session_id=session_id,
user_id=user_id,
user_query=user_query,
start_time=datetime.now(timezone.utc).isoformat(),
)
self._current_traces[trace_id] = trace
return trace
def log_step(
self,
trace_id: str,
step_type: str,
iteration: int,
content: dict,
duration_ms: float,
model_id: str = None,
input_tokens: int = 0,
output_tokens: int = 0,
):
"""Log a single reasoning step."""
trace = self._current_traces.get(trace_id)
if not trace:
logger.warning("Trace %s not found", trace_id)
return
step = ReasoningStepLog(
step_type=step_type,
iteration=iteration,
timestamp=datetime.now(timezone.utc).isoformat(),
duration_ms=duration_ms,
content=content,
model_id=model_id,
input_tokens=input_tokens,
output_tokens=output_tokens,
)
trace.steps.append(asdict(step))
trace.total_input_tokens += input_tokens
trace.total_output_tokens += output_tokens
def finalize_trace(
self, trace_id: str, final_decision: str, success: bool = True, error: str = None
):
"""Finalize and persist the reasoning trace."""
trace = self._current_traces.get(trace_id)
if not trace:
return
trace.end_time = datetime.now(timezone.utc).isoformat()
trace.final_decision = final_decision
trace.success = success
trace.error = error
trace.total_iterations = max(
(s["iteration"] for s in trace.steps), default=0
) + 1
# Calculate cost
trace.estimated_cost_usd = self._estimate_cost(trace)
# Calculate total duration
start = datetime.fromisoformat(trace.start_time)
end = datetime.fromisoformat(trace.end_time)
trace.total_duration_ms = (end - start).total_seconds() * 1000
# Persist to DynamoDB
self._persist_trace(trace)
# Emit CloudWatch metrics
self._emit_metrics(trace)
# Clean up memory
del self._current_traces[trace_id]
logger.info(
"Trace finalized: id=%s iterations=%d duration=%.0fms cost=$%.6f",
trace_id,
trace.total_iterations,
trace.total_duration_ms,
trace.estimated_cost_usd,
)
def _estimate_cost(self, trace: FullReasoningTrace) -> float:
"""Estimate the token cost for this trace."""
total_cost = 0.0
for step in trace.steps:
model_id = step.get("model_id")
if model_id and model_id in self.PRICING:
pricing = self.PRICING[model_id]
input_cost = (step["input_tokens"] / 1_000_000) * pricing["input"]
output_cost = (step["output_tokens"] / 1_000_000) * pricing["output"]
total_cost += input_cost + output_cost
return round(total_cost, 8)
def _persist_trace(self, trace: FullReasoningTrace):
"""Write the trace to DynamoDB."""
try:
traces_table.put_item(
Item={
"trace_id": trace.trace_id,
"session_id": trace.session_id,
"user_id": trace.user_id,
"user_query": trace.user_query,
"data": json.dumps(asdict(trace), default=str),
"iterations": trace.total_iterations,
"duration_ms": int(trace.total_duration_ms),
"cost_usd": str(trace.estimated_cost_usd),
"success": trace.success,
"created_at": trace.start_time,
"ttl": int(time.time()) + 86400 * 30, # 30-day retention
}
)
except Exception as e:
logger.error("Failed to persist trace %s: %s", trace.trace_id, str(e))
def _emit_metrics(self, trace: FullReasoningTrace):
"""Emit CloudWatch metrics for monitoring."""
try:
cloudwatch.put_metric_data(
Namespace="MangaAssist/Reasoning",
MetricData=[
{
"MetricName": "ReActIterations",
"Value": trace.total_iterations,
"Unit": "Count",
"Dimensions": [
{"Name": "Decision", "Value": trace.final_decision},
],
},
{
"MetricName": "ReActLatency",
"Value": trace.total_duration_ms,
"Unit": "Milliseconds",
},
{
"MetricName": "ReActTokenCost",
"Value": trace.estimated_cost_usd,
"Unit": "None",
},
{
"MetricName": "ReActSuccess",
"Value": 1 if trace.success else 0,
"Unit": "Count",
},
],
)
except Exception as e:
logger.error("Failed to emit metrics: %s", str(e))
Comparison: Reasoning Approaches
| Dimension | ReAct (Reasoning + Acting) | Chain-of-Thought (CoT) | Tree-of-Thought (ToT) | Plan-and-Execute |
|---|---|---|---|---|
| Core idea | Interleave reasoning with tool use in a loop | Linear step-by-step reasoning in a single prompt | Explore multiple reasoning branches, backtrack on dead ends | Create a full plan first, then execute steps sequentially |
| Tool use | Yes -- tools called between reasoning steps | No -- pure reasoning, no external calls | Optional -- can call tools at branch points | Yes -- tools called during execution phase |
| Latency | Medium (multiple LLM + tool calls) | Low (single LLM call) | High (many LLM calls for branching) | Medium-High (planning call + execution calls) |
| Cost per query | Medium -- 2-5 LLM calls typical | Low -- 1 LLM call | High -- 5-20+ LLM calls | Medium -- 1 planning + N execution calls |
| When to use | Complex queries needing external data | Questions answerable from model knowledge | Ambiguous problems with multiple valid paths | Multi-step workflows with known tool sequences |
| MangaAssist example | "Find manga like Naruto but darker" (needs search + filter + rank) | "What genre is Attack on Titan?" (model knows this) | "What should I read next?" with no constraints (many valid paths) | "Place an order for Demon Slayer vols 1-5" (clear step sequence) |
| Max iterations | Bounded (5 for MangaAssist) | 1 (single pass) | Configurable branching factor * depth | Fixed by plan length |
| Error recovery | Good -- observe failure, retry with different tool | Poor -- no recovery from bad reasoning | Good -- backtrack to previous branch | Moderate -- re-plan on step failure |
| AWS implementation | Step Functions + Lambda + Bedrock | Single Bedrock invoke with CoT prompt | Step Functions with Parallel + Choice states | Step Functions sequential workflow |
| Best for MangaAssist | Recommendation queries (70% of traffic) | Simple FAQ and factual queries (20%) | Open-ended exploration (5%) | Order management workflows (5%) |
| Token efficiency | Moderate -- context grows with observations | High -- single prompt, controlled output | Low -- many branches, redundant reasoning | Moderate -- plan is reusable context |
Key Takeaways
-
ReAct is the primary reasoning pattern for MangaAssist -- most customer queries require external data (catalog search, inventory check, ratings) that the model cannot answer from parametric knowledge alone.
-
Bound your loops aggressively -- with a 3-second latency target and 1M messages/day, every iteration costs both time and money. MangaAssist caps at 5 iterations with a latency budget that forces early termination if time runs out.
-
Use model routing within the ReAct loop -- first Thought step uses Sonnet for deep analysis ($3/1M input), subsequent steps and all Observation evaluations use Haiku ($0.25/1M input). This cuts costs by approximately 60% without meaningful quality degradation.
-
Step Functions Express Workflows are the right orchestration choice -- standard workflows have a 200ms state transition overhead that would consume most of the latency budget. Express Workflows have sub-millisecond transitions and support the high throughput needed for 1M messages/day.
-
Every reasoning session produces a trace -- stored in DynamoDB with 30-day TTL, these traces are essential for debugging bad recommendations, evaluating reasoning quality, and optimizing cost. CloudWatch metrics on iteration count, latency, and cost per trace enable operational dashboards.
-
Problem decomposition enables parallelism -- by breaking a complex query into independent sub-tasks (e.g., "classify reference manga" and "vector search for candidates" run in parallel), you recover latency that would be lost in a purely sequential ReAct loop.
-
Chain-of-Thought is a building block, not a replacement -- CoT prompts are used inside each Thought step of the ReAct loop. The two patterns are complementary: CoT structures the reasoning within a single LLM call, ReAct structures the overall problem-solving loop across multiple calls and tools.
-
Fallback design is non-negotiable -- when the ReAct loop times out or a tool fails, the system must still return a useful answer. MangaAssist uses a "synthesize from available" strategy that produces a best-effort response from whatever observations were gathered before the budget expired.