Chain-of-Thought Reasoning and Step Functions Orchestration
MangaAssist context: JP Manga store chatbot on AWS -- Bedrock Claude 3 (Sonnet at $3/$15 per 1M tokens input/output, Haiku at $0.25/$1.25), OpenSearch Serverless (vector store), DynamoDB (sessions/products), ECS Fargate (orchestrator), API Gateway WebSocket, ElastiCache Redis. Target: useful answer in under 3 seconds, 1M messages/day scale.
Skill Mapping
| Field | Value |
|---|---|
| Domain | 2 -- Implementation and Integration of Foundation Models |
| Task | 2.1 -- Implement Agentic AI Solutions and Tool Integrations |
| Skill | 2.1.2 -- Create advanced problem-solving systems to give FMs the ability to break down and solve complex problems by following structured reasoning steps (e.g., Step Functions for ReAct patterns, chain-of-thought reasoning) |
| Applied Context | MangaAssist -- JP Manga store chatbot serving 1M messages/day |
Mind Map: Chain-of-Thought and Step Functions Orchestration
mindmap
root((CoT & Step Functions<br/>Orchestration))
Chain-of-Thought Techniques
Zero-Shot CoT
"Let's think step by step"
No exemplars needed
Lower token cost
Good for straightforward queries
Few-Shot CoT
Exemplar reasoning traces
Domain-specific examples
Higher accuracy on complex tasks
Manga recommendation exemplars
Self-Consistency
Multiple reasoning paths
Majority vote on final answer
Temperature sampling diversity
Parallel Bedrock invocations
Automatic CoT
Auto-select exemplars
Cluster-based diversity
Reduced prompt engineering
Step Functions Orchestration
Express Workflows
Sub-millisecond transitions
Synchronous invocation
Max 5-minute duration
Pay per execution
Standard Workflows
Durable execution history
Up to 1-year duration
Exactly-once processing
Higher per-transition cost
State Types
Task states for Lambda/Bedrock
Choice states for branching
Parallel states for fan-out
Map states for iteration
Wait states for throttle control
Error Handling
Retry with backoff
Catch and fallback
Timeout per state
Heartbeat monitoring
ReAct with Step Functions
Reason-Act-Observe loop
Thought state invokes Bedrock
Action state invokes tools
Observation state evaluates
Choice state routes next step
Bounded iteration
Max 5 loops for latency
Budget-aware early exit
Best-effort synthesis
Tool orchestration
OpenSearch vector search
DynamoDB product lookup
ElastiCache rating cache
Bedrock genre classifier
Tree-of-Thought
Branching strategies
Breadth-first exploration
Depth-first with pruning
Best-first with scoring
Parallel evaluation
Step Functions Parallel state
Concurrent branch scoring
Branch pruning via Choice
Cost control
Max branch factor
Max depth limit
Early termination score
Production Concerns
Latency Budget
3-second total target
500ms per reasoning step
200ms network overhead
Express Workflows required
Cost at Scale
Model routing Sonnet/Haiku
Token budget per query
Daily cost projections
Cache reasoning patterns
Observability
Reasoning trace capture
CloudWatch metrics
X-Ray distributed tracing
Step Functions execution history
Section 1: ReAct Pattern Implementation with Step Functions
The ReAct (Reason + Act) pattern interleaves LLM reasoning with tool execution in a controlled loop. Step Functions provides the orchestration backbone -- managing state transitions, enforcing timeouts, handling errors, and enabling parallel tool execution.
ReAct Loop Architecture
flowchart TD
START["User Query Arrives"] --> INIT["InitializeReasoningContext<br/>(Task State)"]
INIT --> THOUGHT["ThoughtStep<br/>(Task State: Bedrock Invoke)"]
THOUGHT --> EVAL{"EvaluateThought<br/>(Choice State)"}
EVAL -->|"decision = FINAL_ANSWER"| FORMAT["FormatFinalAnswer<br/>(Task State)"]
EVAL -->|"iterations >= max"| BESTEFF["BestEffortSynthesis<br/>(Task State)"]
EVAL -->|"decision = CONTINUE"| ACTION["ActionStep<br/>(Task State: Tool Execution)"]
EVAL -->|"decision = CLARIFY"| CLARIFY["FormatClarification<br/>(Task State)"]
ACTION --> OBSERVE["ObservationStep<br/>(Task State: Bedrock Haiku)"]
OBSERVE --> UPDATE["UpdateContext<br/>(Task State: Lambda)"]
UPDATE --> THOUGHT
FORMAT --> LOG["LogReasoningTrace<br/>(Task State)"]
BESTEFF --> LOG
CLARIFY --> LOG
LOG --> DONE["ReturnResponse<br/>(Succeed State)"]
ACTION -->|"ToolExecutionError"| TOOLERR["HandleToolError<br/>(Task State)"]
ACTION -->|"States.Timeout"| TOOLTO["HandleToolTimeout<br/>(Pass State)"]
TOOLERR --> OBSERVE
TOOLTO --> OBSERVE
THOUGHT -->|"States.Timeout"| FALLBACK["FallbackResponse<br/>(Task State)"]
FALLBACK --> DONE
style START fill:#e1f5fe
style DONE fill:#c8e6c9
style THOUGHT fill:#fff3e0
style ACTION fill:#e8eaf6
style OBSERVE fill:#fce4ec
style EVAL fill:#f3e5f5
style FALLBACK fill:#ffcdd2
Why Express Workflows for ReAct
| Dimension | Express Workflow | Standard Workflow |
|---|---|---|
| State transition latency | Sub-millisecond | ~200ms per transition |
| Max duration | 5 minutes | 1 year |
| Execution model | Synchronous (request/response) | Asynchronous (poll for result) |
| Pricing | Per execution + duration | Per state transition |
| Throughput | Unlimited (scales automatically) | 800 start executions/sec default |
| Execution history | CloudWatch Logs only | Full visual history in console |
| MangaAssist fit | Primary -- sub-3s latency required | Fallback -- long-running analysis jobs |
At 1M messages/day with an average of 3 iterations per ReAct loop (9 state transitions), Standard Workflows would add ~1.8 seconds of pure transition overhead. Express Workflows reduce this to under 10ms total.
Section 2: Chain-of-Thought Prompting Strategies
Chain-of-Thought (CoT) prompting guides the FM through structured, step-by-step reasoning within a single invocation. In MangaAssist, CoT prompts are embedded inside the Thought step of the ReAct loop, structuring how Claude reasons about each intermediate decision.
2.1 Zero-Shot Chain-of-Thought
Zero-shot CoT appends a trigger phrase like "Let's think step by step" without providing exemplar reasoning traces. This is the lowest-cost approach -- no exemplars consume input tokens.
"""
Zero-Shot CoT Engine for MangaAssist.
Uses trigger phrases to activate step-by-step reasoning without exemplars.
Best for straightforward queries where model knowledge is sufficient.
"""
import json
import time
import logging
from typing import Optional
from dataclasses import dataclass, field
import boto3
from botocore.config import Config
logger = logging.getLogger("cot_engine")
BEDROCK_CONFIG = Config(
retries={"max_attempts": 2, "mode": "adaptive"},
read_timeout=5,
connect_timeout=2,
)
@dataclass
class CoTResult:
"""Result from a chain-of-thought reasoning invocation."""
reasoning_steps: list[str]
final_answer: str
confidence: float
model_id: str
input_tokens: int
output_tokens: int
latency_ms: float
strategy: str # "zero_shot", "few_shot", "self_consistency"
class ChainOfThoughtEngine:
"""
Chain-of-Thought reasoning engine for MangaAssist.
Supports three CoT strategies:
- Zero-shot: Trigger phrase only, no exemplars
- Few-shot: Domain-specific exemplar traces
- Self-consistency: Multiple reasoning paths with majority vote
Each strategy trades off cost vs. accuracy differently at 1M msg/day.
"""
ZERO_SHOT_TRIGGERS = {
"default": "Let's think through this step by step.",
"recommendation": (
"Let's analyze this recommendation request step by step, "
"considering genre, themes, tone, and the user's specific constraints."
),
"comparison": (
"Let's compare these manga titles systematically, "
"evaluating each dimension one at a time."
),
"troubleshooting": (
"Let's diagnose this issue step by step, "
"starting with the symptoms and working toward the root cause."
),
}
def __init__(self, bedrock_client=None):
self.bedrock = bedrock_client or boto3.client(
"bedrock-runtime", config=BEDROCK_CONFIG
)
# ──────────────────────────────────────────────
# Zero-Shot CoT
# ──────────────────────────────────────────────
async def zero_shot_cot(
self,
query: str,
context: str = "",
query_type: str = "default",
model_id: str = "anthropic.claude-3-haiku-20240307-v1:0",
max_tokens: int = 800,
) -> CoTResult:
"""
Execute zero-shot chain-of-thought reasoning.
Appends a trigger phrase to activate step-by-step reasoning.
No exemplars needed -- lowest token cost.
Cost at 1M messages/day (Haiku):
Avg input: 500 tokens -> $0.125/day
Avg output: 400 tokens -> $0.50/day
Total: ~$0.625/day for zero-shot CoT queries
"""
trigger = self.ZERO_SHOT_TRIGGERS.get(query_type, self.ZERO_SHOT_TRIGGERS["default"])
prompt = f"""You are MangaAssist, a helpful JP manga store chatbot.
{f"CONTEXT: {context}" if context else ""}
USER QUERY: {query}
{trigger}
Format your response as:
STEP 1: [first reasoning step]
STEP 2: [second reasoning step]
...
ANSWER: [your final answer to the user]
CONFIDENCE: [0.0-1.0 how confident you are]"""
start = time.monotonic()
response = self.bedrock.invoke_model(
modelId=model_id,
contentType="application/json",
accept="application/json",
body=json.dumps({
"anthropic_version": "bedrock-2023-05-31",
"max_tokens": max_tokens,
"temperature": 0.1,
"messages": [{"role": "user", "content": prompt}],
}),
)
latency_ms = (time.monotonic() - start) * 1000
result = json.loads(response["body"].read())
text = result["content"][0]["text"]
usage = result.get("usage", {})
steps, answer, confidence = self._parse_cot_response(text)
return CoTResult(
reasoning_steps=steps,
final_answer=answer,
confidence=confidence,
model_id=model_id,
input_tokens=usage.get("input_tokens", 0),
output_tokens=usage.get("output_tokens", 0),
latency_ms=latency_ms,
strategy="zero_shot",
)
# ──────────────────────────────────────────────
# Few-Shot CoT
# ──────────────────────────────────────────────
async def few_shot_cot(
self,
query: str,
context: str = "",
query_type: str = "recommendation",
model_id: str = "anthropic.claude-3-sonnet-20240229-v1:0",
max_tokens: int = 1000,
num_exemplars: int = 2,
) -> CoTResult:
"""
Execute few-shot chain-of-thought reasoning with domain exemplars.
Provides exemplar reasoning traces that demonstrate the expected
step-by-step approach. Higher accuracy than zero-shot but costs
more due to exemplar tokens in the input.
Cost at 1M messages/day (Sonnet, 30% of traffic = 300K):
Avg input: 1500 tokens (with exemplars) -> $1.35/day
Avg output: 600 tokens -> $2.70/day
Total: ~$4.05/day for few-shot CoT queries
"""
exemplars = self._get_exemplars(query_type, num_exemplars)
exemplar_text = self._format_exemplars(exemplars)
prompt = f"""You are MangaAssist, a helpful JP manga store chatbot.
Here are examples of how to reason through similar queries:
{exemplar_text}
Now apply the same reasoning approach to this query:
{f"CONTEXT: {context}" if context else ""}
USER QUERY: {query}
Think through this step by step following the same pattern as the examples.
Format your response as:
STEP 1: [first reasoning step]
STEP 2: [second reasoning step]
...
ANSWER: [your final answer to the user]
CONFIDENCE: [0.0-1.0]"""
start = time.monotonic()
response = self.bedrock.invoke_model(
modelId=model_id,
contentType="application/json",
accept="application/json",
body=json.dumps({
"anthropic_version": "bedrock-2023-05-31",
"max_tokens": max_tokens,
"temperature": 0.1,
"messages": [{"role": "user", "content": prompt}],
}),
)
latency_ms = (time.monotonic() - start) * 1000
result = json.loads(response["body"].read())
text = result["content"][0]["text"]
usage = result.get("usage", {})
steps, answer, confidence = self._parse_cot_response(text)
return CoTResult(
reasoning_steps=steps,
final_answer=answer,
confidence=confidence,
model_id=model_id,
input_tokens=usage.get("input_tokens", 0),
output_tokens=usage.get("output_tokens", 0),
latency_ms=latency_ms,
strategy="few_shot",
)
# ──────────────────────────────────────────────
# Self-Consistency CoT
# ──────────────────────────────────────────────
async def self_consistency_cot(
self,
query: str,
context: str = "",
query_type: str = "recommendation",
model_id: str = "anthropic.claude-3-haiku-20240307-v1:0",
max_tokens: int = 800,
num_paths: int = 3,
temperature: float = 0.7,
) -> CoTResult:
"""
Execute self-consistency chain-of-thought reasoning.
Generates multiple reasoning paths at higher temperature, then
selects the most consistent answer via majority vote. Trades
higher cost for improved reliability on ambiguous queries.
Cost at 1M messages/day (Haiku, 5% of traffic = 50K, 3 paths each):
Avg input: 500 tokens x 3 = 1500 tokens -> $0.019/day
Avg output: 400 tokens x 3 = 1200 tokens -> $0.075/day
Plus voting call: ~200 tokens -> negligible
Total: ~$0.10/day for self-consistency queries
"""
import asyncio
# Generate multiple reasoning paths concurrently
tasks = [
self._generate_single_path(
query, context, query_type, model_id, max_tokens, temperature
)
for _ in range(num_paths)
]
paths = await asyncio.gather(*tasks)
# Extract answers from each path
answers = []
all_steps = []
total_input_tokens = 0
total_output_tokens = 0
total_latency = 0.0
for path in paths:
answers.append(path["answer"])
all_steps.append(path["steps"])
total_input_tokens += path["input_tokens"]
total_output_tokens += path["output_tokens"]
total_latency = max(total_latency, path["latency_ms"]) # parallel
# Majority vote using another LLM call
voted_answer, voted_confidence = await self._majority_vote(
query, answers, model_id
)
return CoTResult(
reasoning_steps=[
f"Path {i+1}: {'; '.join(steps)}"
for i, steps in enumerate(all_steps)
],
final_answer=voted_answer,
confidence=voted_confidence,
model_id=model_id,
input_tokens=total_input_tokens,
output_tokens=total_output_tokens,
latency_ms=total_latency,
strategy="self_consistency",
)
async def _generate_single_path(
self,
query: str,
context: str,
query_type: str,
model_id: str,
max_tokens: int,
temperature: float,
) -> dict:
"""Generate one reasoning path for self-consistency."""
trigger = self.ZERO_SHOT_TRIGGERS.get(query_type, self.ZERO_SHOT_TRIGGERS["default"])
prompt = f"""You are MangaAssist. Answer this query with step-by-step reasoning.
{f"CONTEXT: {context}" if context else ""}
USER QUERY: {query}
{trigger}
STEP 1: [reasoning]
...
ANSWER: [final answer]"""
start = time.monotonic()
response = self.bedrock.invoke_model(
modelId=model_id,
contentType="application/json",
accept="application/json",
body=json.dumps({
"anthropic_version": "bedrock-2023-05-31",
"max_tokens": max_tokens,
"temperature": temperature,
"messages": [{"role": "user", "content": prompt}],
}),
)
latency_ms = (time.monotonic() - start) * 1000
result = json.loads(response["body"].read())
text = result["content"][0]["text"]
usage = result.get("usage", {})
steps, answer, _ = self._parse_cot_response(text)
return {
"steps": steps,
"answer": answer,
"input_tokens": usage.get("input_tokens", 0),
"output_tokens": usage.get("output_tokens", 0),
"latency_ms": latency_ms,
}
async def _majority_vote(
self, query: str, answers: list[str], model_id: str
) -> tuple[str, float]:
"""Select the most consistent answer from multiple reasoning paths."""
prompt = f"""Given these {len(answers)} answers to the same query, determine the consensus.
QUERY: {query}
ANSWERS:
{chr(10).join(f"Answer {i+1}: {a}" for i, a in enumerate(answers))}
Which answer best represents the consensus? If answers agree, pick the most
complete version. If they disagree, pick the answer that appears most often.
Respond in JSON:
{{"consensus_answer": "the best answer", "agreement_ratio": 0.0-1.0}}"""
response = self.bedrock.invoke_model(
modelId=model_id,
contentType="application/json",
accept="application/json",
body=json.dumps({
"anthropic_version": "bedrock-2023-05-31",
"max_tokens": 500,
"temperature": 0.0,
"messages": [{"role": "user", "content": prompt}],
}),
)
result = json.loads(response["body"].read())
parsed = json.loads(result["content"][0]["text"])
return parsed["consensus_answer"], parsed.get("agreement_ratio", 0.5)
# ──────────────────────────────────────────────
# Exemplar Management
# ──────────────────────────────────────────────
def _get_exemplars(self, query_type: str, count: int) -> list[dict]:
"""Retrieve domain-specific exemplars for few-shot CoT."""
exemplar_bank = {
"recommendation": [
{
"query": "Suggest a manga similar to One Piece but shorter",
"steps": [
"STEP 1: Identify One Piece traits -- shounen, adventure, pirates, "
"humor, large cast, world-building, 100+ volumes",
"STEP 2: 'Shorter' constraint means under 30 volumes, ideally "
"completed series for a definitive ending",
"STEP 3: Match genres -- adventure + action + humor with strong "
"world-building but condensed into fewer volumes",
"STEP 4: Candidates -- Fullmetal Alchemist (27 vols, adventure, "
"world-building), Magi (37 vols, adventure, magic), "
"Promised Neverland (20 vols, thriller + adventure)",
"STEP 5: Rank by similarity to One Piece tone -- FMA has the "
"best mix of humor, action, and emotional depth in a shorter format",
],
"answer": "I'd recommend Fullmetal Alchemist (27 volumes, completed). "
"Like One Piece, it has a rich world, a lovable main duo on a grand "
"quest, great humor mixed with emotional moments, and a satisfying "
"conclusion. If you want something even shorter, The Promised "
"Neverland (20 volumes) has incredible world-building in a more "
"thriller-oriented package.",
},
{
"query": "I loved Death Note, what else has that cat-and-mouse feel?",
"steps": [
"STEP 1: Death Note core appeal -- psychological thriller, "
"cat-and-mouse between genius rivals, mind games, moral ambiguity",
"STEP 2: Key dimensions to match -- intellectual conflict (not "
"physical), strategic planning, tension, dark tone",
"STEP 3: Candidates -- Liar Game (pure mind games), Kaiji "
"(gambling + strategy), Monster (detective vs serial killer), "
"Code Geass (strategy + morality)",
"STEP 4: Monster has the strongest cat-and-mouse dynamic with "
"a detective hunting a serial killer across Europe",
],
"answer": "Try Monster by Naoki Urasawa (18 volumes). It has the "
"same cat-and-mouse tension as Death Note but in a more grounded, "
"realistic thriller setting. A brilliant doctor hunts a former patient "
"who turned out to be a serial killer. For pure strategic mind games, "
"Liar Game is also excellent.",
},
],
"comparison": [
{
"query": "What is the difference between shounen and seinen manga?",
"steps": [
"STEP 1: Define demographics -- shounen targets boys 12-18, "
"seinen targets men 18+",
"STEP 2: Common shounen traits -- action-driven, friendship "
"themes, power progression, optimistic tone",
"STEP 3: Common seinen traits -- complex themes, moral ambiguity, "
"slower pacing, realistic consequences",
"STEP 4: Provide concrete examples -- Naruto (shounen) vs "
"Vinland Saga (seinen) both feature warriors but differ in tone",
],
"answer": "Shounen manga (like Naruto, My Hero Academia) targets "
"teen boys and emphasizes action, friendship, and growth. Seinen "
"manga (like Berserk, Vinland Saga) targets adult men and explores "
"more complex themes with moral ambiguity.",
},
],
}
exemplars = exemplar_bank.get(query_type, exemplar_bank["recommendation"])
return exemplars[:count]
def _format_exemplars(self, exemplars: list[dict]) -> str:
"""Format exemplars into prompt text."""
parts = []
for i, ex in enumerate(exemplars):
steps_text = "\n".join(ex["steps"])
parts.append(
f"--- Example {i+1} ---\n"
f"QUERY: {ex['query']}\n{steps_text}\n"
f"ANSWER: {ex['answer']}\n"
)
return "\n".join(parts)
def _parse_cot_response(self, text: str) -> tuple[list[str], str, float]:
"""Parse a CoT-formatted response into steps, answer, and confidence."""
steps = []
answer = ""
confidence = 0.5
lines = text.strip().split("\n")
for line in lines:
line = line.strip()
if line.startswith("STEP"):
steps.append(line)
elif line.startswith("ANSWER:"):
answer = line.replace("ANSWER:", "").strip()
elif line.startswith("CONFIDENCE:"):
try:
confidence = float(line.replace("CONFIDENCE:", "").strip())
except ValueError:
confidence = 0.5
# If ANSWER was not on a single line, collect remaining text
if not answer:
answer_idx = text.find("ANSWER:")
if answer_idx >= 0:
answer = text[answer_idx + 7:].strip()
conf_idx = answer.find("CONFIDENCE:")
if conf_idx >= 0:
answer = answer[:conf_idx].strip()
return steps, answer, confidence
Section 3: Step Functions State Machine Design for Multi-Step Reasoning
3.1 ASL Definition: Chain-of-Thought Reasoning Workflow
This state machine orchestrates a multi-step CoT reasoning pipeline where each step builds on the previous, with conditional branching based on intermediate confidence scores.
{
"Comment": "MangaAssist CoT Reasoning Workflow - Skill 2.1.2",
"StartAt": "ClassifyQueryComplexity",
"States": {
"ClassifyQueryComplexity": {
"Type": "Task",
"Resource": "arn:aws:lambda:us-east-1:123456789012:function:manga-classify-complexity",
"Parameters": {
"user_query.$": "$.user_query",
"session_id.$": "$.session_id"
},
"ResultPath": "$.classification",
"TimeoutSeconds": 2,
"Next": "SelectReasoningStrategy"
},
"SelectReasoningStrategy": {
"Type": "Choice",
"Choices": [
{
"Variable": "$.classification.complexity",
"StringEquals": "simple",
"Next": "ZeroShotCoTBranch"
},
{
"Variable": "$.classification.complexity",
"StringEquals": "moderate",
"Next": "FewShotCoTBranch"
},
{
"Variable": "$.classification.complexity",
"StringEquals": "complex",
"Next": "FullReActBranch"
},
{
"Variable": "$.classification.complexity",
"StringEquals": "ambiguous",
"Next": "SelfConsistencyBranch"
}
],
"Default": "ZeroShotCoTBranch"
},
"ZeroShotCoTBranch": {
"Type": "Task",
"Resource": "arn:aws:states:::bedrock:invokeModel",
"Parameters": {
"ModelId": "anthropic.claude-3-haiku-20240307-v1:0",
"ContentType": "application/json",
"Accept": "application/json",
"Body": {
"anthropic_version": "bedrock-2023-05-31",
"max_tokens": 800,
"temperature": 0.1,
"messages": [
{
"role": "user",
"content.$": "States.Format('You are MangaAssist. Answer this query step by step.\n\nUSER QUERY: {}\n\nLet''s think through this step by step.\n\nSTEP 1: [reasoning]\n...\nANSWER: [answer]\nCONFIDENCE: [0.0-1.0]', $.user_query)"
}
]
}
},
"ResultPath": "$.cot_result",
"TimeoutSeconds": 5,
"Retry": [
{
"ErrorEquals": ["Bedrock.ThrottlingException"],
"IntervalSeconds": 1,
"MaxAttempts": 3,
"BackoffRate": 2.0
}
],
"Next": "ParseAndRespond"
},
"FewShotCoTBranch": {
"Type": "Task",
"Resource": "arn:aws:lambda:us-east-1:123456789012:function:manga-few-shot-cot",
"Parameters": {
"user_query.$": "$.user_query",
"query_type.$": "$.classification.query_type",
"num_exemplars": 2,
"model_id": "anthropic.claude-3-sonnet-20240229-v1:0"
},
"ResultPath": "$.cot_result",
"TimeoutSeconds": 5,
"Retry": [
{
"ErrorEquals": ["Bedrock.ThrottlingException"],
"IntervalSeconds": 1,
"MaxAttempts": 3,
"BackoffRate": 2.0
}
],
"Catch": [
{
"ErrorEquals": ["States.Timeout"],
"ResultPath": "$.error",
"Next": "ZeroShotCoTBranch"
}
],
"Next": "ParseAndRespond"
},
"SelfConsistencyBranch": {
"Type": "Parallel",
"Branches": [
{
"StartAt": "Path1",
"States": {
"Path1": {
"Type": "Task",
"Resource": "arn:aws:lambda:us-east-1:123456789012:function:manga-cot-single-path",
"Parameters": {
"user_query.$": "$.user_query",
"temperature": 0.7,
"path_id": 1
},
"End": true
}
}
},
{
"StartAt": "Path2",
"States": {
"Path2": {
"Type": "Task",
"Resource": "arn:aws:lambda:us-east-1:123456789012:function:manga-cot-single-path",
"Parameters": {
"user_query.$": "$.user_query",
"temperature": 0.7,
"path_id": 2
},
"End": true
}
}
},
{
"StartAt": "Path3",
"States": {
"Path3": {
"Type": "Task",
"Resource": "arn:aws:lambda:us-east-1:123456789012:function:manga-cot-single-path",
"Parameters": {
"user_query.$": "$.user_query",
"temperature": 0.7,
"path_id": 3
},
"End": true
}
}
}
],
"ResultPath": "$.parallel_paths",
"TimeoutSeconds": 5,
"Next": "MajorityVote"
},
"MajorityVote": {
"Type": "Task",
"Resource": "arn:aws:lambda:us-east-1:123456789012:function:manga-cot-majority-vote",
"Parameters": {
"paths.$": "$.parallel_paths",
"user_query.$": "$.user_query"
},
"ResultPath": "$.cot_result",
"TimeoutSeconds": 3,
"Next": "ParseAndRespond"
},
"FullReActBranch": {
"Type": "Task",
"Resource": "arn:aws:states:::states:startExecution.sync:2",
"Parameters": {
"StateMachineArn": "arn:aws:states:us-east-1:123456789012:stateMachine:manga-react-loop",
"Input": {
"user_query.$": "$.user_query",
"session_id.$": "$.session_id",
"user_id.$": "$.user_id"
}
},
"ResultPath": "$.react_result",
"TimeoutSeconds": 5,
"Catch": [
{
"ErrorEquals": ["States.Timeout"],
"ResultPath": "$.error",
"Next": "FewShotCoTBranch"
}
],
"Next": "ParseAndRespond"
},
"ParseAndRespond": {
"Type": "Task",
"Resource": "arn:aws:lambda:us-east-1:123456789012:function:manga-parse-respond",
"Parameters": {
"cot_result.$": "$.cot_result",
"session_id.$": "$.session_id",
"strategy.$": "$.classification.complexity"
},
"ResultPath": "$.final_response",
"Next": "EmitMetrics"
},
"EmitMetrics": {
"Type": "Task",
"Resource": "arn:aws:lambda:us-east-1:123456789012:function:manga-emit-cot-metrics",
"Parameters": {
"session_id.$": "$.session_id",
"strategy.$": "$.classification.complexity",
"response.$": "$.final_response"
},
"ResultPath": null,
"Next": "Succeed"
},
"Succeed": {
"Type": "Succeed",
"OutputPath": "$.final_response"
}
}
}
3.2 ASL Definition: ReAct Loop with Parallel Tool Execution
This extended ASL demonstrates how the Action step can fan out to multiple tools concurrently when the Thought step requests parallel data gathering.
{
"Comment": "MangaAssist ReAct with Parallel Tool Execution",
"StartAt": "CheckParallelActions",
"States": {
"CheckParallelActions": {
"Type": "Choice",
"Choices": [
{
"Variable": "$.thought_result.parallel_actions",
"IsPresent": true,
"Next": "ParallelToolExecution"
}
],
"Default": "SingleToolExecution"
},
"ParallelToolExecution": {
"Type": "Map",
"ItemsPath": "$.thought_result.parallel_actions",
"MaxConcurrency": 4,
"Parameters": {
"action.$": "$$.Map.Item.Value",
"reasoning_context.$": "$.reasoning_context"
},
"Iterator": {
"StartAt": "ExecuteTool",
"States": {
"ExecuteTool": {
"Type": "Task",
"Resource": "arn:aws:lambda:us-east-1:123456789012:function:manga-react-action",
"Parameters": {
"thought": {
"next_action.$": "$.action.tool_name",
"action_params.$": "$.action.params"
},
"reasoning_context.$": "$.reasoning_context"
},
"TimeoutSeconds": 2,
"Retry": [
{
"ErrorEquals": ["Lambda.TooManyRequestsException"],
"IntervalSeconds": 0,
"MaxAttempts": 2,
"BackoffRate": 2.0
}
],
"Catch": [
{
"ErrorEquals": ["States.ALL"],
"ResultPath": "$.error",
"Next": "ToolFailed"
}
],
"End": true
},
"ToolFailed": {
"Type": "Pass",
"Result": {
"success": false,
"error": "Tool execution failed"
},
"End": true
}
}
},
"ResultPath": "$.parallel_results",
"Next": "MergeResults"
},
"SingleToolExecution": {
"Type": "Task",
"Resource": "arn:aws:lambda:us-east-1:123456789012:function:manga-react-action",
"Parameters": {
"thought.$": "$.thought_result",
"reasoning_context.$": "$.reasoning_context"
},
"ResultPath": "$.action_result",
"TimeoutSeconds": 2,
"Next": "ContinueReAct"
},
"MergeResults": {
"Type": "Task",
"Resource": "arn:aws:lambda:us-east-1:123456789012:function:manga-merge-parallel-results",
"Parameters": {
"parallel_results.$": "$.parallel_results",
"reasoning_context.$": "$.reasoning_context"
},
"ResultPath": "$.action_result",
"Next": "ContinueReAct"
},
"ContinueReAct": {
"Type": "Pass",
"End": true
}
}
}
Section 4: Tree-of-Thought for Complex Manga Queries
Tree-of-Thought (ToT) extends CoT by exploring multiple reasoning branches simultaneously, evaluating each branch, and pruning unpromising paths. This is most valuable for open-ended queries where the "right" answer depends on exploration.
4.1 When to Use ToT in MangaAssist
| Query Pattern | CoT Sufficient? | ToT Benefit |
|---|---|---|
| "What genre is Attack on Titan?" | Yes -- single factual answer | None -- unnecessary overhead |
| "Recommend a manga like Naruto but darker" | Yes -- constraints narrow the space | Minimal -- CoT within ReAct handles this |
| "What should I read next?" (no constraints) | No -- too many valid paths | High -- explore genre branches, mood branches, length branches |
| "Plan my reading list for the next 6 months" | No -- complex multi-factor optimization | High -- balance variety, length, difficulty, cost |
| "Compare Vinland Saga, Berserk, and Vagabond" | Partially -- linear comparison works but misses cross-dimensions | Medium -- explore comparison dimensions in parallel |
4.2 Tree-of-Thought Implementation
"""
Tree-of-Thought reasoning for open-ended manga queries.
Explores multiple reasoning branches in parallel using Step Functions,
scores each branch, prunes low-scoring paths, and synthesizes the
best answer from the surviving branches.
"""
import json
import time
import logging
import asyncio
from dataclasses import dataclass, field
from typing import Optional
import boto3
from botocore.config import Config
logger = logging.getLogger("tot_engine")
BEDROCK_CONFIG = Config(
retries={"max_attempts": 2, "mode": "adaptive"},
read_timeout=5,
connect_timeout=2,
)
HAIKU_MODEL_ID = "anthropic.claude-3-haiku-20240307-v1:0"
SONNET_MODEL_ID = "anthropic.claude-3-sonnet-20240229-v1:0"
@dataclass
class ThoughtNode:
"""A single node in the thought tree."""
node_id: str
parent_id: Optional[str]
depth: int
thought: str
score: float # 0.0 - 1.0, evaluated by the scoring function
children: list["ThoughtNode"] = field(default_factory=list)
is_terminal: bool = False
answer: Optional[str] = None
tokens_used: int = 0
@dataclass
class TreeOfThoughtResult:
"""Result from a Tree-of-Thought reasoning session."""
best_path: list[str] # sequence of thoughts from root to best leaf
best_answer: str
best_score: float
total_nodes_explored: int
total_nodes_pruned: int
total_tokens: int
total_latency_ms: float
tree_depth: int
branching_factor: int
class TreeOfThoughtEngine:
"""
Tree-of-Thought engine for MangaAssist.
Explores multiple reasoning paths in parallel, scores them,
prunes low-scoring branches, and returns the best answer.
Architecture:
- Each level of the tree is a "wave" of parallel Bedrock calls
- After each wave, a scoring pass evaluates and prunes branches
- The best surviving branch at max depth produces the final answer
Cost control:
- Max branching factor (default 3): limits breadth
- Max depth (default 3): limits depth
- Pruning threshold (default 0.4): aggressively cuts weak branches
- Total token budget: hard cap on cumulative token usage
"""
def __init__(
self,
bedrock_client=None,
max_branching_factor: int = 3,
max_depth: int = 3,
pruning_threshold: float = 0.4,
token_budget: int = 10000,
):
self.bedrock = bedrock_client or boto3.client(
"bedrock-runtime", config=BEDROCK_CONFIG
)
self.max_branching_factor = max_branching_factor
self.max_depth = max_depth
self.pruning_threshold = pruning_threshold
self.token_budget = token_budget
async def solve(
self,
query: str,
context: str = "",
) -> TreeOfThoughtResult:
"""
Execute Tree-of-Thought reasoning for an open-ended query.
Algorithm:
1. Generate initial thought branches from the root
2. Score each branch
3. Prune branches below threshold
4. For surviving branches, generate next-level thoughts
5. Repeat until max depth or all branches are terminal
6. Return the highest-scoring terminal branch
"""
start_time = time.monotonic()
total_tokens = 0
nodes_explored = 0
nodes_pruned = 0
# Root node
root = ThoughtNode(
node_id="root",
parent_id=None,
depth=0,
thought=f"Query: {query}",
score=1.0,
)
current_frontier = [root]
for depth in range(self.max_depth):
if not current_frontier:
break
if total_tokens >= self.token_budget:
logger.warning("Token budget exhausted at depth %d", depth)
break
# Generate children for all frontier nodes in parallel
expansion_tasks = []
for node in current_frontier:
if node.is_terminal:
continue
expansion_tasks.append(
self._expand_node(query, context, node, depth + 1)
)
if not expansion_tasks:
break
expanded_results = await asyncio.gather(*expansion_tasks)
# Collect all new children
next_frontier = []
for node, children in zip(current_frontier, expanded_results):
for child in children:
total_tokens += child.tokens_used
nodes_explored += 1
node.children.append(child)
if child.score >= self.pruning_threshold:
next_frontier.append(child)
else:
nodes_pruned += 1
logger.debug(
"Pruned node %s (score=%.2f < threshold=%.2f)",
child.node_id, child.score, self.pruning_threshold,
)
current_frontier = next_frontier
# Find the best terminal node
best_node = self._find_best_leaf(root)
best_path = self._trace_path(root, best_node.node_id)
total_latency = (time.monotonic() - start_time) * 1000
return TreeOfThoughtResult(
best_path=[n.thought for n in best_path],
best_answer=best_node.answer or best_node.thought,
best_score=best_node.score,
total_nodes_explored=nodes_explored,
total_nodes_pruned=nodes_pruned,
total_tokens=total_tokens,
total_latency_ms=total_latency,
tree_depth=self.max_depth,
branching_factor=self.max_branching_factor,
)
async def _expand_node(
self,
query: str,
context: str,
parent: ThoughtNode,
depth: int,
) -> list[ThoughtNode]:
"""Generate child thought nodes from a parent node."""
prompt = f"""You are MangaAssist's reasoning engine exploring different approaches.
QUERY: {query}
{f"CONTEXT: {context}" if context else ""}
CURRENT REASONING PATH: {parent.thought}
DEPTH: {depth}/{self.max_depth}
Generate exactly {self.max_branching_factor} different next reasoning steps.
Each should explore a DIFFERENT angle or approach.
{"This is the final depth -- each step should conclude with a specific answer." if depth == self.max_depth else "Each step should advance the reasoning toward an answer."}
Respond in JSON:
[
{{
"thought": "reasoning step exploring angle A",
"score": 0.0-1.0,
"is_terminal": {"true" if depth == self.max_depth else "true/false"},
"answer": "final answer if terminal, null otherwise"
}},
...
]"""
response = self.bedrock.invoke_model(
modelId=HAIKU_MODEL_ID,
contentType="application/json",
accept="application/json",
body=json.dumps({
"anthropic_version": "bedrock-2023-05-31",
"max_tokens": 1000,
"temperature": 0.7,
"messages": [{"role": "user", "content": prompt}],
}),
)
result = json.loads(response["body"].read())
text = result["content"][0]["text"]
usage = result.get("usage", {})
tokens = usage.get("input_tokens", 0) + usage.get("output_tokens", 0)
try:
children_data = json.loads(text)
except json.JSONDecodeError:
return []
children = []
for i, child_data in enumerate(children_data[:self.max_branching_factor]):
child = ThoughtNode(
node_id=f"{parent.node_id}_d{depth}_b{i}",
parent_id=parent.node_id,
depth=depth,
thought=child_data.get("thought", ""),
score=child_data.get("score", 0.5),
is_terminal=child_data.get("is_terminal", depth == self.max_depth),
answer=child_data.get("answer"),
tokens_used=tokens // max(len(children_data), 1),
)
children.append(child)
return children
def _find_best_leaf(self, root: ThoughtNode) -> ThoughtNode:
"""Find the highest-scoring leaf node in the tree."""
best = root
stack = [root]
while stack:
node = stack.pop()
if not node.children and node.score > best.score:
best = node
stack.extend(node.children)
return best
def _trace_path(
self, root: ThoughtNode, target_id: str
) -> list[ThoughtNode]:
"""Trace the path from root to a target node."""
path = []
def dfs(node: ThoughtNode) -> bool:
path.append(node)
if node.node_id == target_id:
return True
for child in node.children:
if dfs(child):
return True
path.pop()
return False
dfs(root)
return path
Section 5: Production Orchestrator -- StepFunctionReasoningWorkflow
This class wraps the Step Functions API to start, monitor, and retrieve results from reasoning workflows.
"""
Step Functions Reasoning Workflow orchestrator.
Manages the lifecycle of reasoning state machine executions:
- Start execution with proper input formatting
- Poll for completion within latency budget
- Retrieve and parse results
- Handle timeouts and errors gracefully
"""
import json
import time
import uuid
import logging
from typing import Optional
from dataclasses import dataclass
import boto3
from botocore.config import Config
logger = logging.getLogger("sf_reasoning")
SFN_CONFIG = Config(
retries={"max_attempts": 2, "mode": "adaptive"},
read_timeout=5,
connect_timeout=2,
)
@dataclass
class WorkflowResult:
"""Result from a Step Functions reasoning workflow execution."""
execution_arn: str
status: str # "SUCCEEDED", "FAILED", "TIMED_OUT"
output: Optional[dict]
error: Optional[str]
duration_ms: float
strategy: str
class StepFunctionReasoningWorkflow:
"""
Orchestrates Step Functions Express Workflow executions for reasoning.
Provides a unified interface to start reasoning workflows
(CoT, ReAct, ToT) and retrieve results within the 3-second
latency budget.
Express Workflows are invoked synchronously -- the StartSyncExecution
API blocks until the workflow completes or times out.
"""
# State machine ARNs by strategy
STATE_MACHINES = {
"cot": "arn:aws:states:us-east-1:123456789012:stateMachine:manga-cot-reasoning",
"react": "arn:aws:states:us-east-1:123456789012:stateMachine:manga-react-loop",
"tot": "arn:aws:states:us-east-1:123456789012:stateMachine:manga-tot-reasoning",
"router": "arn:aws:states:us-east-1:123456789012:stateMachine:manga-reasoning-router",
}
def __init__(self, sfn_client=None, region: str = "us-east-1"):
self.sfn = sfn_client or boto3.client(
"stepfunctions", config=SFN_CONFIG, region_name=region
)
async def execute_reasoning(
self,
strategy: str,
user_query: str,
session_id: str,
user_id: str = "",
context: dict = None,
timeout_ms: int = 2800,
) -> WorkflowResult:
"""
Execute a reasoning workflow synchronously.
Uses Express Workflow's StartSyncExecution API which returns
the result directly without polling.
Args:
strategy: "cot", "react", "tot", or "router" (auto-selects)
user_query: The user's question
session_id: Session identifier for tracing
user_id: Optional user ID for personalization
context: Optional additional context
timeout_ms: Max wait time (default 2800ms for 3s budget)
"""
state_machine_arn = self.STATE_MACHINES.get(strategy)
if not state_machine_arn:
return WorkflowResult(
execution_arn="",
status="FAILED",
output=None,
error=f"Unknown strategy: {strategy}",
duration_ms=0,
strategy=strategy,
)
execution_input = {
"user_query": user_query,
"session_id": session_id,
"user_id": user_id,
"context": context or {},
"timeout_ms": timeout_ms,
}
execution_name = f"manga-{strategy}-{uuid.uuid4().hex[:12]}"
start_time = time.monotonic()
try:
response = self.sfn.start_sync_execution(
stateMachineArn=state_machine_arn,
name=execution_name,
input=json.dumps(execution_input),
)
duration_ms = (time.monotonic() - start_time) * 1000
status = response.get("status", "FAILED")
if status == "SUCCEEDED":
output = json.loads(response.get("output", "{}"))
return WorkflowResult(
execution_arn=response.get("executionArn", ""),
status="SUCCEEDED",
output=output,
error=None,
duration_ms=duration_ms,
strategy=strategy,
)
else:
error_msg = response.get("error", "Unknown error")
cause = response.get("cause", "")
return WorkflowResult(
execution_arn=response.get("executionArn", ""),
status="FAILED",
output=None,
error=f"{error_msg}: {cause}",
duration_ms=duration_ms,
strategy=strategy,
)
except self.sfn.exceptions.ExecutionTimedOut:
duration_ms = (time.monotonic() - start_time) * 1000
logger.warning(
"Reasoning workflow timed out: strategy=%s duration=%.0fms",
strategy, duration_ms,
)
return WorkflowResult(
execution_arn="",
status="TIMED_OUT",
output=None,
error="Execution timed out",
duration_ms=duration_ms,
strategy=strategy,
)
except Exception as e:
duration_ms = (time.monotonic() - start_time) * 1000
logger.error("Workflow execution failed: %s", str(e))
return WorkflowResult(
execution_arn="",
status="FAILED",
output=None,
error=str(e),
duration_ms=duration_ms,
strategy=strategy,
)
async def execute_with_fallback(
self,
user_query: str,
session_id: str,
user_id: str = "",
context: dict = None,
) -> WorkflowResult:
"""
Execute reasoning with automatic fallback chain.
Strategy: router -> react -> few-shot CoT -> zero-shot CoT
If the router workflow times out, fall back to simpler strategies.
Each fallback gets the remaining time budget.
"""
total_start = time.monotonic()
total_budget_ms = 2800
# Try the routing workflow first (auto-selects strategy)
result = await self.execute_reasoning(
strategy="router",
user_query=user_query,
session_id=session_id,
user_id=user_id,
context=context,
timeout_ms=total_budget_ms,
)
if result.status == "SUCCEEDED":
return result
# Fallback chain
elapsed = (time.monotonic() - total_start) * 1000
remaining = total_budget_ms - elapsed
if remaining > 500:
logger.info("Router failed, falling back to CoT (%.0fms remaining)", remaining)
result = await self.execute_reasoning(
strategy="cot",
user_query=user_query,
session_id=session_id,
user_id=user_id,
context=context,
timeout_ms=int(remaining),
)
if result.status == "SUCCEEDED":
return result
# Final fallback: return a graceful degradation response
elapsed = (time.monotonic() - total_start) * 1000
return WorkflowResult(
execution_arn="",
status="DEGRADED",
output={
"answer": "I'm having trouble processing your request right now. "
"Could you try rephrasing your question or asking something more specific?",
"is_fallback": True,
},
error="All reasoning strategies exhausted",
duration_ms=elapsed,
strategy="fallback",
)
Section 6: Comparison Tables for Reasoning Approaches
CoT Strategy Comparison
| Dimension | Zero-Shot CoT | Few-Shot CoT | Self-Consistency CoT |
|---|---|---|---|
| Mechanism | Trigger phrase ("think step by step") | Exemplar reasoning traces in prompt | Multiple paths + majority vote |
| Exemplars required | None | 2-3 domain-specific examples | None (uses temperature diversity) |
| Input token cost | Low (~500 tokens) | Medium (~1500 tokens with exemplars) | Low per path, but N paths total |
| Output token cost | Medium (~400 tokens) | Medium-High (~600 tokens) | N x Medium (~400 x N tokens) |
| Accuracy (recommendation) | Good (70-75%) | Very good (80-85%) | Best (85-90%) |
| Latency | ~500ms (single Haiku call) | ~800ms (single Sonnet call) | ~600ms (parallel Haiku calls) |
| Best model | Haiku (cost-efficient) | Sonnet (needs exemplar comprehension) | Haiku (parallelized) |
| MangaAssist use case | Simple FAQ, genre lookups | Recommendations, comparisons | Ambiguous/open-ended queries |
| Daily cost (1M msg) | ~$0.63 | ~$4.05 (30% of traffic) | ~$0.10 (5% of traffic) |
Reasoning Approach Selection Matrix
| Query Type | % of Traffic | Primary Strategy | Fallback Strategy | Avg Latency | Avg Cost/Query |
|---|---|---|---|---|---|
| Simple factual ("What genre is X?") | 20% | Zero-Shot CoT | Direct model response | 300-500ms | $0.0003 |
| Recommendation with constraints | 45% | ReAct + Few-Shot CoT | Few-Shot CoT only | 1.5-2.5s | $0.0085 |
| Open-ended recommendation | 10% | ReAct + Self-Consistency | ReAct + Zero-Shot | 2.0-2.8s | $0.012 |
| Comparison queries | 10% | Few-Shot CoT | Zero-Shot CoT | 600-900ms | $0.004 |
| Order/account management | 10% | Plan-and-Execute | ReAct | 1.0-2.0s | $0.005 |
| Ambiguous/exploration | 5% | Tree-of-Thought | Self-Consistency | 2.0-3.0s | $0.018 |
Latency Breakdown per Strategy
| Component | Zero-Shot CoT | Few-Shot CoT | ReAct (3 iterations) | Tree-of-Thought (depth=3) |
|---|---|---|---|---|
| Query classification | 0ms (not needed) | 50ms | 50ms | 50ms |
| Prompt construction | 5ms | 15ms | 5ms per iteration | 10ms per level |
| Bedrock inference | 400ms (Haiku) | 700ms (Sonnet) | 400ms x 3 (Haiku) + 700ms (Sonnet) | 400ms x 3 (parallel Haiku) |
| Tool execution | 0ms | 0ms | 200ms x 3 | 0ms |
| State transitions | 1ms (Express) | 1ms | 5ms (5 transitions) | 8ms (8 transitions) |
| Response formatting | 10ms | 10ms | 20ms | 20ms |
| Total | ~416ms | ~776ms | ~2,480ms | ~1,688ms |
Section 7: Latency and Cost Analysis at Scale
Daily Cost Projections (1M messages/day)
| Strategy | % Traffic | Messages/Day | Avg Input Tokens | Avg Output Tokens | Model | Daily Input Cost | Daily Output Cost | Total Daily |
|---|---|---|---|---|---|---|---|---|
| Zero-Shot CoT | 20% | 200,000 | 500 | 400 | Haiku | $0.025 | $0.100 | $0.125 |
| Few-Shot CoT | 25% | 250,000 | 1,500 | 600 | Sonnet | $1.125 | $2.250 | $3.375 |
| ReAct (Thought) | 45% | 450,000 | 1,000 x 3 iter | 500 x 3 iter | Mixed | $2.025 | $4.556 | $6.581 |
| ReAct (Observe) | 45% | 450,000 | 600 x 3 iter | 300 x 3 iter | Haiku | $0.203 | $0.506 | $0.709 |
| Self-Consistency | 5% | 50,000 | 500 x 3 paths | 400 x 3 paths | Haiku | $0.019 | $0.075 | $0.094 |
| Tree-of-Thought | 5% | 50,000 | 800 x 9 nodes | 500 x 9 nodes | Haiku | $0.090 | $0.281 | $0.371 |
| Total | 100% | 1,000,000 | $3.487 | $7.768 | $11.255 |
Monthly Cost Summary
| Item | Monthly Cost |
|---|---|
| Bedrock inference (CoT + ReAct + ToT) | ~$337.65 |
| Step Functions Express (9M executions avg) | ~$45.00 |
| Lambda invocations (tool executions) | ~$28.00 |
| DynamoDB (sessions + traces) | ~$50.00 |
| OpenSearch Serverless (vector search) | ~$350.00 |
| ElastiCache Redis (caching) | ~$120.00 |
| CloudWatch (logs + metrics) | ~$35.00 |
| Total monthly (reasoning infrastructure) | ~$965.65 |
Latency Budget Management
"""
Latency budget manager for reasoning workflows.
Ensures the 3-second SLA by tracking elapsed time and dynamically
adjusting strategy complexity based on remaining budget.
"""
import time
import logging
from dataclasses import dataclass
logger = logging.getLogger("latency_budget")
@dataclass
class LatencyBudget:
"""Tracks remaining latency budget for a reasoning session."""
total_budget_ms: float
start_time: float
network_overhead_ms: float = 200.0 # Reserved for WebSocket + API Gateway
@property
def elapsed_ms(self) -> float:
return (time.monotonic() - self.start_time) * 1000
@property
def remaining_ms(self) -> float:
return self.total_budget_ms - self.network_overhead_ms - self.elapsed_ms
@property
def usable_remaining_ms(self) -> float:
"""Remaining budget minus a safety margin for response formatting."""
return max(0, self.remaining_ms - 100)
def can_afford(self, estimated_ms: float) -> bool:
"""Check if we can afford an operation within the remaining budget."""
return self.usable_remaining_ms >= estimated_ms
def select_strategy(self) -> str:
"""Dynamically select reasoning strategy based on remaining budget."""
remaining = self.usable_remaining_ms
if remaining >= 2500:
return "react" # Full ReAct with 3+ iterations
elif remaining >= 1500:
return "react_limited" # ReAct with max 2 iterations
elif remaining >= 800:
return "few_shot_cot" # Single Sonnet call with exemplars
elif remaining >= 400:
return "zero_shot_cot" # Single Haiku call
else:
return "cached_response" # Return a cached/template response
def suggest_model(self) -> str:
"""Suggest the appropriate model based on remaining budget."""
remaining = self.usable_remaining_ms
if remaining >= 1000:
return "anthropic.claude-3-sonnet-20240229-v1:0"
else:
return "anthropic.claude-3-haiku-20240307-v1:0"
def log_checkpoint(self, label: str):
"""Log a latency checkpoint for debugging."""
logger.info(
"Latency checkpoint [%s]: elapsed=%.0fms remaining=%.0fms",
label, self.elapsed_ms, self.remaining_ms,
)
class ReActBudgetManager:
"""
Manages per-iteration budgets within a ReAct loop.
Divides the remaining time budget across expected iterations,
leaving enough for best-effort synthesis if budget runs out.
"""
SYNTHESIS_RESERVE_MS = 500 # Reserve for best-effort answer generation
def __init__(self, budget: LatencyBudget, max_iterations: int = 5):
self.budget = budget
self.max_iterations = max_iterations
def per_iteration_budget_ms(self, current_iteration: int) -> float:
"""Calculate the budget for the current iteration."""
remaining = self.budget.usable_remaining_ms - self.SYNTHESIS_RESERVE_MS
iterations_left = self.max_iterations - current_iteration
if iterations_left <= 0:
return 0
return remaining / iterations_left
def should_continue(self, current_iteration: int) -> bool:
"""Determine if another iteration is worthwhile."""
per_iter = self.per_iteration_budget_ms(current_iteration)
# Need at least 400ms for a useful Thought + Action + Observation
return per_iter >= 400
def should_use_sonnet(self, current_iteration: int) -> bool:
"""Determine if Sonnet is affordable for this iteration."""
per_iter = self.per_iteration_budget_ms(current_iteration)
# Sonnet typically takes 600-800ms, Haiku 200-400ms
return per_iter >= 800
Key Takeaways
-
Chain-of-Thought is a spectrum, not a single technique -- zero-shot CoT costs nearly nothing ($0.125/day at 200K queries) and works for simple factual questions. Few-shot CoT provides the best accuracy for recommendation queries at moderate cost. Self-consistency provides the highest reliability for ambiguous queries by sampling multiple reasoning paths. Choose the right strategy per query type using the classification state.
-
Step Functions Express Workflows are mandatory for real-time reasoning -- with sub-millisecond state transitions and synchronous execution, they add negligible overhead to the 3-second latency budget. Standard Workflows would consume over half the budget in transition latency alone. The trade-off is that Express Workflows provide less execution visibility (CloudWatch Logs only, no visual history in console).
-
Tree-of-Thought is powerful but expensive -- gate it carefully -- at 9+ Bedrock invocations per query, ToT costs 6x more than standard ReAct. Reserve it for the 5% of queries that are genuinely ambiguous and where exploring multiple angles produces meaningfully better answers. Use the query classifier to route only appropriate queries to ToT.
-
The ReAct loop and CoT are complementary, not competing -- CoT structures the reasoning within each Thought step (what to think about), while ReAct structures the overall problem-solving process (when to think vs. when to act). Every Thought step in a ReAct loop uses a CoT prompt internally. The ASL state machine orchestrates the outer loop; the prompt templates orchestrate the inner reasoning.
-
Dynamic budget management is critical at scale -- the
LatencyBudgetandReActBudgetManagerclasses ensure every query stays within the 3-second SLA even when individual Bedrock calls are slow. By dynamically downgrading strategy complexity (ReAct to CoT to cached response) based on remaining time, the system degrades gracefully rather than timing out. -
Monthly infrastructure cost is approximately $966 for the reasoning layer -- Bedrock inference dominates at ~$338/month, followed by OpenSearch at ~$350/month. The reasoning orchestration itself (Step Functions + Lambda) costs only ~$73/month. The biggest optimization lever is model routing: using Haiku ($0.25/$1.25 per 1M tokens) for observation evaluation and simple queries saves roughly 60% compared to using Sonnet for everything.