LOCAL PREVIEW View on GitHub

Chain-of-Thought Reasoning and Step Functions Orchestration

MangaAssist context: JP Manga store chatbot on AWS -- Bedrock Claude 3 (Sonnet at $3/$15 per 1M tokens input/output, Haiku at $0.25/$1.25), OpenSearch Serverless (vector store), DynamoDB (sessions/products), ECS Fargate (orchestrator), API Gateway WebSocket, ElastiCache Redis. Target: useful answer in under 3 seconds, 1M messages/day scale.


Skill Mapping

Field Value
Domain 2 -- Implementation and Integration of Foundation Models
Task 2.1 -- Implement Agentic AI Solutions and Tool Integrations
Skill 2.1.2 -- Create advanced problem-solving systems to give FMs the ability to break down and solve complex problems by following structured reasoning steps (e.g., Step Functions for ReAct patterns, chain-of-thought reasoning)
Applied Context MangaAssist -- JP Manga store chatbot serving 1M messages/day

Mind Map: Chain-of-Thought and Step Functions Orchestration

mindmap
  root((CoT & Step Functions<br/>Orchestration))
    Chain-of-Thought Techniques
      Zero-Shot CoT
        "Let's think step by step"
        No exemplars needed
        Lower token cost
        Good for straightforward queries
      Few-Shot CoT
        Exemplar reasoning traces
        Domain-specific examples
        Higher accuracy on complex tasks
        Manga recommendation exemplars
      Self-Consistency
        Multiple reasoning paths
        Majority vote on final answer
        Temperature sampling diversity
        Parallel Bedrock invocations
      Automatic CoT
        Auto-select exemplars
        Cluster-based diversity
        Reduced prompt engineering
    Step Functions Orchestration
      Express Workflows
        Sub-millisecond transitions
        Synchronous invocation
        Max 5-minute duration
        Pay per execution
      Standard Workflows
        Durable execution history
        Up to 1-year duration
        Exactly-once processing
        Higher per-transition cost
      State Types
        Task states for Lambda/Bedrock
        Choice states for branching
        Parallel states for fan-out
        Map states for iteration
        Wait states for throttle control
      Error Handling
        Retry with backoff
        Catch and fallback
        Timeout per state
        Heartbeat monitoring
    ReAct with Step Functions
      Reason-Act-Observe loop
        Thought state invokes Bedrock
        Action state invokes tools
        Observation state evaluates
        Choice state routes next step
      Bounded iteration
        Max 5 loops for latency
        Budget-aware early exit
        Best-effort synthesis
      Tool orchestration
        OpenSearch vector search
        DynamoDB product lookup
        ElastiCache rating cache
        Bedrock genre classifier
    Tree-of-Thought
      Branching strategies
        Breadth-first exploration
        Depth-first with pruning
        Best-first with scoring
      Parallel evaluation
        Step Functions Parallel state
        Concurrent branch scoring
        Branch pruning via Choice
      Cost control
        Max branch factor
        Max depth limit
        Early termination score
    Production Concerns
      Latency Budget
        3-second total target
        500ms per reasoning step
        200ms network overhead
        Express Workflows required
      Cost at Scale
        Model routing Sonnet/Haiku
        Token budget per query
        Daily cost projections
        Cache reasoning patterns
      Observability
        Reasoning trace capture
        CloudWatch metrics
        X-Ray distributed tracing
        Step Functions execution history

Section 1: ReAct Pattern Implementation with Step Functions

The ReAct (Reason + Act) pattern interleaves LLM reasoning with tool execution in a controlled loop. Step Functions provides the orchestration backbone -- managing state transitions, enforcing timeouts, handling errors, and enabling parallel tool execution.

ReAct Loop Architecture

flowchart TD
    START["User Query Arrives"] --> INIT["InitializeReasoningContext<br/>(Task State)"]
    INIT --> THOUGHT["ThoughtStep<br/>(Task State: Bedrock Invoke)"]
    THOUGHT --> EVAL{"EvaluateThought<br/>(Choice State)"}

    EVAL -->|"decision = FINAL_ANSWER"| FORMAT["FormatFinalAnswer<br/>(Task State)"]
    EVAL -->|"iterations >= max"| BESTEFF["BestEffortSynthesis<br/>(Task State)"]
    EVAL -->|"decision = CONTINUE"| ACTION["ActionStep<br/>(Task State: Tool Execution)"]
    EVAL -->|"decision = CLARIFY"| CLARIFY["FormatClarification<br/>(Task State)"]

    ACTION --> OBSERVE["ObservationStep<br/>(Task State: Bedrock Haiku)"]
    OBSERVE --> UPDATE["UpdateContext<br/>(Task State: Lambda)"]
    UPDATE --> THOUGHT

    FORMAT --> LOG["LogReasoningTrace<br/>(Task State)"]
    BESTEFF --> LOG
    CLARIFY --> LOG
    LOG --> DONE["ReturnResponse<br/>(Succeed State)"]

    ACTION -->|"ToolExecutionError"| TOOLERR["HandleToolError<br/>(Task State)"]
    ACTION -->|"States.Timeout"| TOOLTO["HandleToolTimeout<br/>(Pass State)"]
    TOOLERR --> OBSERVE
    TOOLTO --> OBSERVE

    THOUGHT -->|"States.Timeout"| FALLBACK["FallbackResponse<br/>(Task State)"]
    FALLBACK --> DONE

    style START fill:#e1f5fe
    style DONE fill:#c8e6c9
    style THOUGHT fill:#fff3e0
    style ACTION fill:#e8eaf6
    style OBSERVE fill:#fce4ec
    style EVAL fill:#f3e5f5
    style FALLBACK fill:#ffcdd2

Why Express Workflows for ReAct

Dimension Express Workflow Standard Workflow
State transition latency Sub-millisecond ~200ms per transition
Max duration 5 minutes 1 year
Execution model Synchronous (request/response) Asynchronous (poll for result)
Pricing Per execution + duration Per state transition
Throughput Unlimited (scales automatically) 800 start executions/sec default
Execution history CloudWatch Logs only Full visual history in console
MangaAssist fit Primary -- sub-3s latency required Fallback -- long-running analysis jobs

At 1M messages/day with an average of 3 iterations per ReAct loop (9 state transitions), Standard Workflows would add ~1.8 seconds of pure transition overhead. Express Workflows reduce this to under 10ms total.


Section 2: Chain-of-Thought Prompting Strategies

Chain-of-Thought (CoT) prompting guides the FM through structured, step-by-step reasoning within a single invocation. In MangaAssist, CoT prompts are embedded inside the Thought step of the ReAct loop, structuring how Claude reasons about each intermediate decision.

2.1 Zero-Shot Chain-of-Thought

Zero-shot CoT appends a trigger phrase like "Let's think step by step" without providing exemplar reasoning traces. This is the lowest-cost approach -- no exemplars consume input tokens.

"""
Zero-Shot CoT Engine for MangaAssist.

Uses trigger phrases to activate step-by-step reasoning without exemplars.
Best for straightforward queries where model knowledge is sufficient.
"""

import json
import time
import logging
from typing import Optional
from dataclasses import dataclass, field

import boto3
from botocore.config import Config

logger = logging.getLogger("cot_engine")

BEDROCK_CONFIG = Config(
    retries={"max_attempts": 2, "mode": "adaptive"},
    read_timeout=5,
    connect_timeout=2,
)


@dataclass
class CoTResult:
    """Result from a chain-of-thought reasoning invocation."""
    reasoning_steps: list[str]
    final_answer: str
    confidence: float
    model_id: str
    input_tokens: int
    output_tokens: int
    latency_ms: float
    strategy: str  # "zero_shot", "few_shot", "self_consistency"


class ChainOfThoughtEngine:
    """
    Chain-of-Thought reasoning engine for MangaAssist.

    Supports three CoT strategies:
    - Zero-shot: Trigger phrase only, no exemplars
    - Few-shot: Domain-specific exemplar traces
    - Self-consistency: Multiple reasoning paths with majority vote

    Each strategy trades off cost vs. accuracy differently at 1M msg/day.
    """

    ZERO_SHOT_TRIGGERS = {
        "default": "Let's think through this step by step.",
        "recommendation": (
            "Let's analyze this recommendation request step by step, "
            "considering genre, themes, tone, and the user's specific constraints."
        ),
        "comparison": (
            "Let's compare these manga titles systematically, "
            "evaluating each dimension one at a time."
        ),
        "troubleshooting": (
            "Let's diagnose this issue step by step, "
            "starting with the symptoms and working toward the root cause."
        ),
    }

    def __init__(self, bedrock_client=None):
        self.bedrock = bedrock_client or boto3.client(
            "bedrock-runtime", config=BEDROCK_CONFIG
        )

    # ──────────────────────────────────────────────
    # Zero-Shot CoT
    # ──────────────────────────────────────────────

    async def zero_shot_cot(
        self,
        query: str,
        context: str = "",
        query_type: str = "default",
        model_id: str = "anthropic.claude-3-haiku-20240307-v1:0",
        max_tokens: int = 800,
    ) -> CoTResult:
        """
        Execute zero-shot chain-of-thought reasoning.

        Appends a trigger phrase to activate step-by-step reasoning.
        No exemplars needed -- lowest token cost.

        Cost at 1M messages/day (Haiku):
          Avg input: 500 tokens -> $0.125/day
          Avg output: 400 tokens -> $0.50/day
          Total: ~$0.625/day for zero-shot CoT queries
        """
        trigger = self.ZERO_SHOT_TRIGGERS.get(query_type, self.ZERO_SHOT_TRIGGERS["default"])

        prompt = f"""You are MangaAssist, a helpful JP manga store chatbot.

{f"CONTEXT: {context}" if context else ""}

USER QUERY: {query}

{trigger}

Format your response as:
STEP 1: [first reasoning step]
STEP 2: [second reasoning step]
...
ANSWER: [your final answer to the user]
CONFIDENCE: [0.0-1.0 how confident you are]"""

        start = time.monotonic()
        response = self.bedrock.invoke_model(
            modelId=model_id,
            contentType="application/json",
            accept="application/json",
            body=json.dumps({
                "anthropic_version": "bedrock-2023-05-31",
                "max_tokens": max_tokens,
                "temperature": 0.1,
                "messages": [{"role": "user", "content": prompt}],
            }),
        )
        latency_ms = (time.monotonic() - start) * 1000

        result = json.loads(response["body"].read())
        text = result["content"][0]["text"]
        usage = result.get("usage", {})

        steps, answer, confidence = self._parse_cot_response(text)

        return CoTResult(
            reasoning_steps=steps,
            final_answer=answer,
            confidence=confidence,
            model_id=model_id,
            input_tokens=usage.get("input_tokens", 0),
            output_tokens=usage.get("output_tokens", 0),
            latency_ms=latency_ms,
            strategy="zero_shot",
        )

    # ──────────────────────────────────────────────
    # Few-Shot CoT
    # ──────────────────────────────────────────────

    async def few_shot_cot(
        self,
        query: str,
        context: str = "",
        query_type: str = "recommendation",
        model_id: str = "anthropic.claude-3-sonnet-20240229-v1:0",
        max_tokens: int = 1000,
        num_exemplars: int = 2,
    ) -> CoTResult:
        """
        Execute few-shot chain-of-thought reasoning with domain exemplars.

        Provides exemplar reasoning traces that demonstrate the expected
        step-by-step approach. Higher accuracy than zero-shot but costs
        more due to exemplar tokens in the input.

        Cost at 1M messages/day (Sonnet, 30% of traffic = 300K):
          Avg input: 1500 tokens (with exemplars) -> $1.35/day
          Avg output: 600 tokens -> $2.70/day
          Total: ~$4.05/day for few-shot CoT queries
        """
        exemplars = self._get_exemplars(query_type, num_exemplars)
        exemplar_text = self._format_exemplars(exemplars)

        prompt = f"""You are MangaAssist, a helpful JP manga store chatbot.

Here are examples of how to reason through similar queries:

{exemplar_text}

Now apply the same reasoning approach to this query:

{f"CONTEXT: {context}" if context else ""}

USER QUERY: {query}

Think through this step by step following the same pattern as the examples.

Format your response as:
STEP 1: [first reasoning step]
STEP 2: [second reasoning step]
...
ANSWER: [your final answer to the user]
CONFIDENCE: [0.0-1.0]"""

        start = time.monotonic()
        response = self.bedrock.invoke_model(
            modelId=model_id,
            contentType="application/json",
            accept="application/json",
            body=json.dumps({
                "anthropic_version": "bedrock-2023-05-31",
                "max_tokens": max_tokens,
                "temperature": 0.1,
                "messages": [{"role": "user", "content": prompt}],
            }),
        )
        latency_ms = (time.monotonic() - start) * 1000

        result = json.loads(response["body"].read())
        text = result["content"][0]["text"]
        usage = result.get("usage", {})

        steps, answer, confidence = self._parse_cot_response(text)

        return CoTResult(
            reasoning_steps=steps,
            final_answer=answer,
            confidence=confidence,
            model_id=model_id,
            input_tokens=usage.get("input_tokens", 0),
            output_tokens=usage.get("output_tokens", 0),
            latency_ms=latency_ms,
            strategy="few_shot",
        )

    # ──────────────────────────────────────────────
    # Self-Consistency CoT
    # ──────────────────────────────────────────────

    async def self_consistency_cot(
        self,
        query: str,
        context: str = "",
        query_type: str = "recommendation",
        model_id: str = "anthropic.claude-3-haiku-20240307-v1:0",
        max_tokens: int = 800,
        num_paths: int = 3,
        temperature: float = 0.7,
    ) -> CoTResult:
        """
        Execute self-consistency chain-of-thought reasoning.

        Generates multiple reasoning paths at higher temperature, then
        selects the most consistent answer via majority vote. Trades
        higher cost for improved reliability on ambiguous queries.

        Cost at 1M messages/day (Haiku, 5% of traffic = 50K, 3 paths each):
          Avg input: 500 tokens x 3 = 1500 tokens -> $0.019/day
          Avg output: 400 tokens x 3 = 1200 tokens -> $0.075/day
          Plus voting call: ~200 tokens -> negligible
          Total: ~$0.10/day for self-consistency queries
        """
        import asyncio

        # Generate multiple reasoning paths concurrently
        tasks = [
            self._generate_single_path(
                query, context, query_type, model_id, max_tokens, temperature
            )
            for _ in range(num_paths)
        ]
        paths = await asyncio.gather(*tasks)

        # Extract answers from each path
        answers = []
        all_steps = []
        total_input_tokens = 0
        total_output_tokens = 0
        total_latency = 0.0

        for path in paths:
            answers.append(path["answer"])
            all_steps.append(path["steps"])
            total_input_tokens += path["input_tokens"]
            total_output_tokens += path["output_tokens"]
            total_latency = max(total_latency, path["latency_ms"])  # parallel

        # Majority vote using another LLM call
        voted_answer, voted_confidence = await self._majority_vote(
            query, answers, model_id
        )

        return CoTResult(
            reasoning_steps=[
                f"Path {i+1}: {'; '.join(steps)}"
                for i, steps in enumerate(all_steps)
            ],
            final_answer=voted_answer,
            confidence=voted_confidence,
            model_id=model_id,
            input_tokens=total_input_tokens,
            output_tokens=total_output_tokens,
            latency_ms=total_latency,
            strategy="self_consistency",
        )

    async def _generate_single_path(
        self,
        query: str,
        context: str,
        query_type: str,
        model_id: str,
        max_tokens: int,
        temperature: float,
    ) -> dict:
        """Generate one reasoning path for self-consistency."""
        trigger = self.ZERO_SHOT_TRIGGERS.get(query_type, self.ZERO_SHOT_TRIGGERS["default"])

        prompt = f"""You are MangaAssist. Answer this query with step-by-step reasoning.

{f"CONTEXT: {context}" if context else ""}
USER QUERY: {query}

{trigger}

STEP 1: [reasoning]
...
ANSWER: [final answer]"""

        start = time.monotonic()
        response = self.bedrock.invoke_model(
            modelId=model_id,
            contentType="application/json",
            accept="application/json",
            body=json.dumps({
                "anthropic_version": "bedrock-2023-05-31",
                "max_tokens": max_tokens,
                "temperature": temperature,
                "messages": [{"role": "user", "content": prompt}],
            }),
        )
        latency_ms = (time.monotonic() - start) * 1000

        result = json.loads(response["body"].read())
        text = result["content"][0]["text"]
        usage = result.get("usage", {})
        steps, answer, _ = self._parse_cot_response(text)

        return {
            "steps": steps,
            "answer": answer,
            "input_tokens": usage.get("input_tokens", 0),
            "output_tokens": usage.get("output_tokens", 0),
            "latency_ms": latency_ms,
        }

    async def _majority_vote(
        self, query: str, answers: list[str], model_id: str
    ) -> tuple[str, float]:
        """Select the most consistent answer from multiple reasoning paths."""
        prompt = f"""Given these {len(answers)} answers to the same query, determine the consensus.

QUERY: {query}

ANSWERS:
{chr(10).join(f"Answer {i+1}: {a}" for i, a in enumerate(answers))}

Which answer best represents the consensus? If answers agree, pick the most
complete version. If they disagree, pick the answer that appears most often.

Respond in JSON:
{{"consensus_answer": "the best answer", "agreement_ratio": 0.0-1.0}}"""

        response = self.bedrock.invoke_model(
            modelId=model_id,
            contentType="application/json",
            accept="application/json",
            body=json.dumps({
                "anthropic_version": "bedrock-2023-05-31",
                "max_tokens": 500,
                "temperature": 0.0,
                "messages": [{"role": "user", "content": prompt}],
            }),
        )

        result = json.loads(response["body"].read())
        parsed = json.loads(result["content"][0]["text"])
        return parsed["consensus_answer"], parsed.get("agreement_ratio", 0.5)

    # ──────────────────────────────────────────────
    # Exemplar Management
    # ──────────────────────────────────────────────

    def _get_exemplars(self, query_type: str, count: int) -> list[dict]:
        """Retrieve domain-specific exemplars for few-shot CoT."""
        exemplar_bank = {
            "recommendation": [
                {
                    "query": "Suggest a manga similar to One Piece but shorter",
                    "steps": [
                        "STEP 1: Identify One Piece traits -- shounen, adventure, pirates, "
                        "humor, large cast, world-building, 100+ volumes",
                        "STEP 2: 'Shorter' constraint means under 30 volumes, ideally "
                        "completed series for a definitive ending",
                        "STEP 3: Match genres -- adventure + action + humor with strong "
                        "world-building but condensed into fewer volumes",
                        "STEP 4: Candidates -- Fullmetal Alchemist (27 vols, adventure, "
                        "world-building), Magi (37 vols, adventure, magic), "
                        "Promised Neverland (20 vols, thriller + adventure)",
                        "STEP 5: Rank by similarity to One Piece tone -- FMA has the "
                        "best mix of humor, action, and emotional depth in a shorter format",
                    ],
                    "answer": "I'd recommend Fullmetal Alchemist (27 volumes, completed). "
                    "Like One Piece, it has a rich world, a lovable main duo on a grand "
                    "quest, great humor mixed with emotional moments, and a satisfying "
                    "conclusion. If you want something even shorter, The Promised "
                    "Neverland (20 volumes) has incredible world-building in a more "
                    "thriller-oriented package.",
                },
                {
                    "query": "I loved Death Note, what else has that cat-and-mouse feel?",
                    "steps": [
                        "STEP 1: Death Note core appeal -- psychological thriller, "
                        "cat-and-mouse between genius rivals, mind games, moral ambiguity",
                        "STEP 2: Key dimensions to match -- intellectual conflict (not "
                        "physical), strategic planning, tension, dark tone",
                        "STEP 3: Candidates -- Liar Game (pure mind games), Kaiji "
                        "(gambling + strategy), Monster (detective vs serial killer), "
                        "Code Geass (strategy + morality)",
                        "STEP 4: Monster has the strongest cat-and-mouse dynamic with "
                        "a detective hunting a serial killer across Europe",
                    ],
                    "answer": "Try Monster by Naoki Urasawa (18 volumes). It has the "
                    "same cat-and-mouse tension as Death Note but in a more grounded, "
                    "realistic thriller setting. A brilliant doctor hunts a former patient "
                    "who turned out to be a serial killer. For pure strategic mind games, "
                    "Liar Game is also excellent.",
                },
            ],
            "comparison": [
                {
                    "query": "What is the difference between shounen and seinen manga?",
                    "steps": [
                        "STEP 1: Define demographics -- shounen targets boys 12-18, "
                        "seinen targets men 18+",
                        "STEP 2: Common shounen traits -- action-driven, friendship "
                        "themes, power progression, optimistic tone",
                        "STEP 3: Common seinen traits -- complex themes, moral ambiguity, "
                        "slower pacing, realistic consequences",
                        "STEP 4: Provide concrete examples -- Naruto (shounen) vs "
                        "Vinland Saga (seinen) both feature warriors but differ in tone",
                    ],
                    "answer": "Shounen manga (like Naruto, My Hero Academia) targets "
                    "teen boys and emphasizes action, friendship, and growth. Seinen "
                    "manga (like Berserk, Vinland Saga) targets adult men and explores "
                    "more complex themes with moral ambiguity.",
                },
            ],
        }

        exemplars = exemplar_bank.get(query_type, exemplar_bank["recommendation"])
        return exemplars[:count]

    def _format_exemplars(self, exemplars: list[dict]) -> str:
        """Format exemplars into prompt text."""
        parts = []
        for i, ex in enumerate(exemplars):
            steps_text = "\n".join(ex["steps"])
            parts.append(
                f"--- Example {i+1} ---\n"
                f"QUERY: {ex['query']}\n{steps_text}\n"
                f"ANSWER: {ex['answer']}\n"
            )
        return "\n".join(parts)

    def _parse_cot_response(self, text: str) -> tuple[list[str], str, float]:
        """Parse a CoT-formatted response into steps, answer, and confidence."""
        steps = []
        answer = ""
        confidence = 0.5

        lines = text.strip().split("\n")
        for line in lines:
            line = line.strip()
            if line.startswith("STEP"):
                steps.append(line)
            elif line.startswith("ANSWER:"):
                answer = line.replace("ANSWER:", "").strip()
            elif line.startswith("CONFIDENCE:"):
                try:
                    confidence = float(line.replace("CONFIDENCE:", "").strip())
                except ValueError:
                    confidence = 0.5

        # If ANSWER was not on a single line, collect remaining text
        if not answer:
            answer_idx = text.find("ANSWER:")
            if answer_idx >= 0:
                answer = text[answer_idx + 7:].strip()
                conf_idx = answer.find("CONFIDENCE:")
                if conf_idx >= 0:
                    answer = answer[:conf_idx].strip()

        return steps, answer, confidence

Section 3: Step Functions State Machine Design for Multi-Step Reasoning

3.1 ASL Definition: Chain-of-Thought Reasoning Workflow

This state machine orchestrates a multi-step CoT reasoning pipeline where each step builds on the previous, with conditional branching based on intermediate confidence scores.

{
  "Comment": "MangaAssist CoT Reasoning Workflow - Skill 2.1.2",
  "StartAt": "ClassifyQueryComplexity",
  "States": {
    "ClassifyQueryComplexity": {
      "Type": "Task",
      "Resource": "arn:aws:lambda:us-east-1:123456789012:function:manga-classify-complexity",
      "Parameters": {
        "user_query.$": "$.user_query",
        "session_id.$": "$.session_id"
      },
      "ResultPath": "$.classification",
      "TimeoutSeconds": 2,
      "Next": "SelectReasoningStrategy"
    },

    "SelectReasoningStrategy": {
      "Type": "Choice",
      "Choices": [
        {
          "Variable": "$.classification.complexity",
          "StringEquals": "simple",
          "Next": "ZeroShotCoTBranch"
        },
        {
          "Variable": "$.classification.complexity",
          "StringEquals": "moderate",
          "Next": "FewShotCoTBranch"
        },
        {
          "Variable": "$.classification.complexity",
          "StringEquals": "complex",
          "Next": "FullReActBranch"
        },
        {
          "Variable": "$.classification.complexity",
          "StringEquals": "ambiguous",
          "Next": "SelfConsistencyBranch"
        }
      ],
      "Default": "ZeroShotCoTBranch"
    },

    "ZeroShotCoTBranch": {
      "Type": "Task",
      "Resource": "arn:aws:states:::bedrock:invokeModel",
      "Parameters": {
        "ModelId": "anthropic.claude-3-haiku-20240307-v1:0",
        "ContentType": "application/json",
        "Accept": "application/json",
        "Body": {
          "anthropic_version": "bedrock-2023-05-31",
          "max_tokens": 800,
          "temperature": 0.1,
          "messages": [
            {
              "role": "user",
              "content.$": "States.Format('You are MangaAssist. Answer this query step by step.\n\nUSER QUERY: {}\n\nLet''s think through this step by step.\n\nSTEP 1: [reasoning]\n...\nANSWER: [answer]\nCONFIDENCE: [0.0-1.0]', $.user_query)"
            }
          ]
        }
      },
      "ResultPath": "$.cot_result",
      "TimeoutSeconds": 5,
      "Retry": [
        {
          "ErrorEquals": ["Bedrock.ThrottlingException"],
          "IntervalSeconds": 1,
          "MaxAttempts": 3,
          "BackoffRate": 2.0
        }
      ],
      "Next": "ParseAndRespond"
    },

    "FewShotCoTBranch": {
      "Type": "Task",
      "Resource": "arn:aws:lambda:us-east-1:123456789012:function:manga-few-shot-cot",
      "Parameters": {
        "user_query.$": "$.user_query",
        "query_type.$": "$.classification.query_type",
        "num_exemplars": 2,
        "model_id": "anthropic.claude-3-sonnet-20240229-v1:0"
      },
      "ResultPath": "$.cot_result",
      "TimeoutSeconds": 5,
      "Retry": [
        {
          "ErrorEquals": ["Bedrock.ThrottlingException"],
          "IntervalSeconds": 1,
          "MaxAttempts": 3,
          "BackoffRate": 2.0
        }
      ],
      "Catch": [
        {
          "ErrorEquals": ["States.Timeout"],
          "ResultPath": "$.error",
          "Next": "ZeroShotCoTBranch"
        }
      ],
      "Next": "ParseAndRespond"
    },

    "SelfConsistencyBranch": {
      "Type": "Parallel",
      "Branches": [
        {
          "StartAt": "Path1",
          "States": {
            "Path1": {
              "Type": "Task",
              "Resource": "arn:aws:lambda:us-east-1:123456789012:function:manga-cot-single-path",
              "Parameters": {
                "user_query.$": "$.user_query",
                "temperature": 0.7,
                "path_id": 1
              },
              "End": true
            }
          }
        },
        {
          "StartAt": "Path2",
          "States": {
            "Path2": {
              "Type": "Task",
              "Resource": "arn:aws:lambda:us-east-1:123456789012:function:manga-cot-single-path",
              "Parameters": {
                "user_query.$": "$.user_query",
                "temperature": 0.7,
                "path_id": 2
              },
              "End": true
            }
          }
        },
        {
          "StartAt": "Path3",
          "States": {
            "Path3": {
              "Type": "Task",
              "Resource": "arn:aws:lambda:us-east-1:123456789012:function:manga-cot-single-path",
              "Parameters": {
                "user_query.$": "$.user_query",
                "temperature": 0.7,
                "path_id": 3
              },
              "End": true
            }
          }
        }
      ],
      "ResultPath": "$.parallel_paths",
      "TimeoutSeconds": 5,
      "Next": "MajorityVote"
    },

    "MajorityVote": {
      "Type": "Task",
      "Resource": "arn:aws:lambda:us-east-1:123456789012:function:manga-cot-majority-vote",
      "Parameters": {
        "paths.$": "$.parallel_paths",
        "user_query.$": "$.user_query"
      },
      "ResultPath": "$.cot_result",
      "TimeoutSeconds": 3,
      "Next": "ParseAndRespond"
    },

    "FullReActBranch": {
      "Type": "Task",
      "Resource": "arn:aws:states:::states:startExecution.sync:2",
      "Parameters": {
        "StateMachineArn": "arn:aws:states:us-east-1:123456789012:stateMachine:manga-react-loop",
        "Input": {
          "user_query.$": "$.user_query",
          "session_id.$": "$.session_id",
          "user_id.$": "$.user_id"
        }
      },
      "ResultPath": "$.react_result",
      "TimeoutSeconds": 5,
      "Catch": [
        {
          "ErrorEquals": ["States.Timeout"],
          "ResultPath": "$.error",
          "Next": "FewShotCoTBranch"
        }
      ],
      "Next": "ParseAndRespond"
    },

    "ParseAndRespond": {
      "Type": "Task",
      "Resource": "arn:aws:lambda:us-east-1:123456789012:function:manga-parse-respond",
      "Parameters": {
        "cot_result.$": "$.cot_result",
        "session_id.$": "$.session_id",
        "strategy.$": "$.classification.complexity"
      },
      "ResultPath": "$.final_response",
      "Next": "EmitMetrics"
    },

    "EmitMetrics": {
      "Type": "Task",
      "Resource": "arn:aws:lambda:us-east-1:123456789012:function:manga-emit-cot-metrics",
      "Parameters": {
        "session_id.$": "$.session_id",
        "strategy.$": "$.classification.complexity",
        "response.$": "$.final_response"
      },
      "ResultPath": null,
      "Next": "Succeed"
    },

    "Succeed": {
      "Type": "Succeed",
      "OutputPath": "$.final_response"
    }
  }
}

3.2 ASL Definition: ReAct Loop with Parallel Tool Execution

This extended ASL demonstrates how the Action step can fan out to multiple tools concurrently when the Thought step requests parallel data gathering.

{
  "Comment": "MangaAssist ReAct with Parallel Tool Execution",
  "StartAt": "CheckParallelActions",
  "States": {
    "CheckParallelActions": {
      "Type": "Choice",
      "Choices": [
        {
          "Variable": "$.thought_result.parallel_actions",
          "IsPresent": true,
          "Next": "ParallelToolExecution"
        }
      ],
      "Default": "SingleToolExecution"
    },

    "ParallelToolExecution": {
      "Type": "Map",
      "ItemsPath": "$.thought_result.parallel_actions",
      "MaxConcurrency": 4,
      "Parameters": {
        "action.$": "$$.Map.Item.Value",
        "reasoning_context.$": "$.reasoning_context"
      },
      "Iterator": {
        "StartAt": "ExecuteTool",
        "States": {
          "ExecuteTool": {
            "Type": "Task",
            "Resource": "arn:aws:lambda:us-east-1:123456789012:function:manga-react-action",
            "Parameters": {
              "thought": {
                "next_action.$": "$.action.tool_name",
                "action_params.$": "$.action.params"
              },
              "reasoning_context.$": "$.reasoning_context"
            },
            "TimeoutSeconds": 2,
            "Retry": [
              {
                "ErrorEquals": ["Lambda.TooManyRequestsException"],
                "IntervalSeconds": 0,
                "MaxAttempts": 2,
                "BackoffRate": 2.0
              }
            ],
            "Catch": [
              {
                "ErrorEquals": ["States.ALL"],
                "ResultPath": "$.error",
                "Next": "ToolFailed"
              }
            ],
            "End": true
          },
          "ToolFailed": {
            "Type": "Pass",
            "Result": {
              "success": false,
              "error": "Tool execution failed"
            },
            "End": true
          }
        }
      },
      "ResultPath": "$.parallel_results",
      "Next": "MergeResults"
    },

    "SingleToolExecution": {
      "Type": "Task",
      "Resource": "arn:aws:lambda:us-east-1:123456789012:function:manga-react-action",
      "Parameters": {
        "thought.$": "$.thought_result",
        "reasoning_context.$": "$.reasoning_context"
      },
      "ResultPath": "$.action_result",
      "TimeoutSeconds": 2,
      "Next": "ContinueReAct"
    },

    "MergeResults": {
      "Type": "Task",
      "Resource": "arn:aws:lambda:us-east-1:123456789012:function:manga-merge-parallel-results",
      "Parameters": {
        "parallel_results.$": "$.parallel_results",
        "reasoning_context.$": "$.reasoning_context"
      },
      "ResultPath": "$.action_result",
      "Next": "ContinueReAct"
    },

    "ContinueReAct": {
      "Type": "Pass",
      "End": true
    }
  }
}

Section 4: Tree-of-Thought for Complex Manga Queries

Tree-of-Thought (ToT) extends CoT by exploring multiple reasoning branches simultaneously, evaluating each branch, and pruning unpromising paths. This is most valuable for open-ended queries where the "right" answer depends on exploration.

4.1 When to Use ToT in MangaAssist

Query Pattern CoT Sufficient? ToT Benefit
"What genre is Attack on Titan?" Yes -- single factual answer None -- unnecessary overhead
"Recommend a manga like Naruto but darker" Yes -- constraints narrow the space Minimal -- CoT within ReAct handles this
"What should I read next?" (no constraints) No -- too many valid paths High -- explore genre branches, mood branches, length branches
"Plan my reading list for the next 6 months" No -- complex multi-factor optimization High -- balance variety, length, difficulty, cost
"Compare Vinland Saga, Berserk, and Vagabond" Partially -- linear comparison works but misses cross-dimensions Medium -- explore comparison dimensions in parallel

4.2 Tree-of-Thought Implementation

"""
Tree-of-Thought reasoning for open-ended manga queries.

Explores multiple reasoning branches in parallel using Step Functions,
scores each branch, prunes low-scoring paths, and synthesizes the
best answer from the surviving branches.
"""

import json
import time
import logging
import asyncio
from dataclasses import dataclass, field
from typing import Optional

import boto3
from botocore.config import Config

logger = logging.getLogger("tot_engine")

BEDROCK_CONFIG = Config(
    retries={"max_attempts": 2, "mode": "adaptive"},
    read_timeout=5,
    connect_timeout=2,
)

HAIKU_MODEL_ID = "anthropic.claude-3-haiku-20240307-v1:0"
SONNET_MODEL_ID = "anthropic.claude-3-sonnet-20240229-v1:0"


@dataclass
class ThoughtNode:
    """A single node in the thought tree."""
    node_id: str
    parent_id: Optional[str]
    depth: int
    thought: str
    score: float  # 0.0 - 1.0, evaluated by the scoring function
    children: list["ThoughtNode"] = field(default_factory=list)
    is_terminal: bool = False
    answer: Optional[str] = None
    tokens_used: int = 0


@dataclass
class TreeOfThoughtResult:
    """Result from a Tree-of-Thought reasoning session."""
    best_path: list[str]  # sequence of thoughts from root to best leaf
    best_answer: str
    best_score: float
    total_nodes_explored: int
    total_nodes_pruned: int
    total_tokens: int
    total_latency_ms: float
    tree_depth: int
    branching_factor: int


class TreeOfThoughtEngine:
    """
    Tree-of-Thought engine for MangaAssist.

    Explores multiple reasoning paths in parallel, scores them,
    prunes low-scoring branches, and returns the best answer.

    Architecture:
    - Each level of the tree is a "wave" of parallel Bedrock calls
    - After each wave, a scoring pass evaluates and prunes branches
    - The best surviving branch at max depth produces the final answer

    Cost control:
    - Max branching factor (default 3): limits breadth
    - Max depth (default 3): limits depth
    - Pruning threshold (default 0.4): aggressively cuts weak branches
    - Total token budget: hard cap on cumulative token usage
    """

    def __init__(
        self,
        bedrock_client=None,
        max_branching_factor: int = 3,
        max_depth: int = 3,
        pruning_threshold: float = 0.4,
        token_budget: int = 10000,
    ):
        self.bedrock = bedrock_client or boto3.client(
            "bedrock-runtime", config=BEDROCK_CONFIG
        )
        self.max_branching_factor = max_branching_factor
        self.max_depth = max_depth
        self.pruning_threshold = pruning_threshold
        self.token_budget = token_budget

    async def solve(
        self,
        query: str,
        context: str = "",
    ) -> TreeOfThoughtResult:
        """
        Execute Tree-of-Thought reasoning for an open-ended query.

        Algorithm:
        1. Generate initial thought branches from the root
        2. Score each branch
        3. Prune branches below threshold
        4. For surviving branches, generate next-level thoughts
        5. Repeat until max depth or all branches are terminal
        6. Return the highest-scoring terminal branch
        """
        start_time = time.monotonic()
        total_tokens = 0
        nodes_explored = 0
        nodes_pruned = 0

        # Root node
        root = ThoughtNode(
            node_id="root",
            parent_id=None,
            depth=0,
            thought=f"Query: {query}",
            score=1.0,
        )

        current_frontier = [root]

        for depth in range(self.max_depth):
            if not current_frontier:
                break
            if total_tokens >= self.token_budget:
                logger.warning("Token budget exhausted at depth %d", depth)
                break

            # Generate children for all frontier nodes in parallel
            expansion_tasks = []
            for node in current_frontier:
                if node.is_terminal:
                    continue
                expansion_tasks.append(
                    self._expand_node(query, context, node, depth + 1)
                )

            if not expansion_tasks:
                break

            expanded_results = await asyncio.gather(*expansion_tasks)

            # Collect all new children
            next_frontier = []
            for node, children in zip(current_frontier, expanded_results):
                for child in children:
                    total_tokens += child.tokens_used
                    nodes_explored += 1
                    node.children.append(child)

                    if child.score >= self.pruning_threshold:
                        next_frontier.append(child)
                    else:
                        nodes_pruned += 1
                        logger.debug(
                            "Pruned node %s (score=%.2f < threshold=%.2f)",
                            child.node_id, child.score, self.pruning_threshold,
                        )

            current_frontier = next_frontier

        # Find the best terminal node
        best_node = self._find_best_leaf(root)
        best_path = self._trace_path(root, best_node.node_id)

        total_latency = (time.monotonic() - start_time) * 1000

        return TreeOfThoughtResult(
            best_path=[n.thought for n in best_path],
            best_answer=best_node.answer or best_node.thought,
            best_score=best_node.score,
            total_nodes_explored=nodes_explored,
            total_nodes_pruned=nodes_pruned,
            total_tokens=total_tokens,
            total_latency_ms=total_latency,
            tree_depth=self.max_depth,
            branching_factor=self.max_branching_factor,
        )

    async def _expand_node(
        self,
        query: str,
        context: str,
        parent: ThoughtNode,
        depth: int,
    ) -> list[ThoughtNode]:
        """Generate child thought nodes from a parent node."""
        prompt = f"""You are MangaAssist's reasoning engine exploring different approaches.

QUERY: {query}
{f"CONTEXT: {context}" if context else ""}
CURRENT REASONING PATH: {parent.thought}
DEPTH: {depth}/{self.max_depth}

Generate exactly {self.max_branching_factor} different next reasoning steps.
Each should explore a DIFFERENT angle or approach.

{"This is the final depth -- each step should conclude with a specific answer." if depth == self.max_depth else "Each step should advance the reasoning toward an answer."}

Respond in JSON:
[
  {{
    "thought": "reasoning step exploring angle A",
    "score": 0.0-1.0,
    "is_terminal": {"true" if depth == self.max_depth else "true/false"},
    "answer": "final answer if terminal, null otherwise"
  }},
  ...
]"""

        response = self.bedrock.invoke_model(
            modelId=HAIKU_MODEL_ID,
            contentType="application/json",
            accept="application/json",
            body=json.dumps({
                "anthropic_version": "bedrock-2023-05-31",
                "max_tokens": 1000,
                "temperature": 0.7,
                "messages": [{"role": "user", "content": prompt}],
            }),
        )

        result = json.loads(response["body"].read())
        text = result["content"][0]["text"]
        usage = result.get("usage", {})
        tokens = usage.get("input_tokens", 0) + usage.get("output_tokens", 0)

        try:
            children_data = json.loads(text)
        except json.JSONDecodeError:
            return []

        children = []
        for i, child_data in enumerate(children_data[:self.max_branching_factor]):
            child = ThoughtNode(
                node_id=f"{parent.node_id}_d{depth}_b{i}",
                parent_id=parent.node_id,
                depth=depth,
                thought=child_data.get("thought", ""),
                score=child_data.get("score", 0.5),
                is_terminal=child_data.get("is_terminal", depth == self.max_depth),
                answer=child_data.get("answer"),
                tokens_used=tokens // max(len(children_data), 1),
            )
            children.append(child)

        return children

    def _find_best_leaf(self, root: ThoughtNode) -> ThoughtNode:
        """Find the highest-scoring leaf node in the tree."""
        best = root
        stack = [root]

        while stack:
            node = stack.pop()
            if not node.children and node.score > best.score:
                best = node
            stack.extend(node.children)

        return best

    def _trace_path(
        self, root: ThoughtNode, target_id: str
    ) -> list[ThoughtNode]:
        """Trace the path from root to a target node."""
        path = []

        def dfs(node: ThoughtNode) -> bool:
            path.append(node)
            if node.node_id == target_id:
                return True
            for child in node.children:
                if dfs(child):
                    return True
            path.pop()
            return False

        dfs(root)
        return path

Section 5: Production Orchestrator -- StepFunctionReasoningWorkflow

This class wraps the Step Functions API to start, monitor, and retrieve results from reasoning workflows.

"""
Step Functions Reasoning Workflow orchestrator.

Manages the lifecycle of reasoning state machine executions:
- Start execution with proper input formatting
- Poll for completion within latency budget
- Retrieve and parse results
- Handle timeouts and errors gracefully
"""

import json
import time
import uuid
import logging
from typing import Optional
from dataclasses import dataclass

import boto3
from botocore.config import Config

logger = logging.getLogger("sf_reasoning")

SFN_CONFIG = Config(
    retries={"max_attempts": 2, "mode": "adaptive"},
    read_timeout=5,
    connect_timeout=2,
)


@dataclass
class WorkflowResult:
    """Result from a Step Functions reasoning workflow execution."""
    execution_arn: str
    status: str  # "SUCCEEDED", "FAILED", "TIMED_OUT"
    output: Optional[dict]
    error: Optional[str]
    duration_ms: float
    strategy: str


class StepFunctionReasoningWorkflow:
    """
    Orchestrates Step Functions Express Workflow executions for reasoning.

    Provides a unified interface to start reasoning workflows
    (CoT, ReAct, ToT) and retrieve results within the 3-second
    latency budget.

    Express Workflows are invoked synchronously -- the StartSyncExecution
    API blocks until the workflow completes or times out.
    """

    # State machine ARNs by strategy
    STATE_MACHINES = {
        "cot": "arn:aws:states:us-east-1:123456789012:stateMachine:manga-cot-reasoning",
        "react": "arn:aws:states:us-east-1:123456789012:stateMachine:manga-react-loop",
        "tot": "arn:aws:states:us-east-1:123456789012:stateMachine:manga-tot-reasoning",
        "router": "arn:aws:states:us-east-1:123456789012:stateMachine:manga-reasoning-router",
    }

    def __init__(self, sfn_client=None, region: str = "us-east-1"):
        self.sfn = sfn_client or boto3.client(
            "stepfunctions", config=SFN_CONFIG, region_name=region
        )

    async def execute_reasoning(
        self,
        strategy: str,
        user_query: str,
        session_id: str,
        user_id: str = "",
        context: dict = None,
        timeout_ms: int = 2800,
    ) -> WorkflowResult:
        """
        Execute a reasoning workflow synchronously.

        Uses Express Workflow's StartSyncExecution API which returns
        the result directly without polling.

        Args:
            strategy: "cot", "react", "tot", or "router" (auto-selects)
            user_query: The user's question
            session_id: Session identifier for tracing
            user_id: Optional user ID for personalization
            context: Optional additional context
            timeout_ms: Max wait time (default 2800ms for 3s budget)
        """
        state_machine_arn = self.STATE_MACHINES.get(strategy)
        if not state_machine_arn:
            return WorkflowResult(
                execution_arn="",
                status="FAILED",
                output=None,
                error=f"Unknown strategy: {strategy}",
                duration_ms=0,
                strategy=strategy,
            )

        execution_input = {
            "user_query": user_query,
            "session_id": session_id,
            "user_id": user_id,
            "context": context or {},
            "timeout_ms": timeout_ms,
        }

        execution_name = f"manga-{strategy}-{uuid.uuid4().hex[:12]}"
        start_time = time.monotonic()

        try:
            response = self.sfn.start_sync_execution(
                stateMachineArn=state_machine_arn,
                name=execution_name,
                input=json.dumps(execution_input),
            )

            duration_ms = (time.monotonic() - start_time) * 1000
            status = response.get("status", "FAILED")

            if status == "SUCCEEDED":
                output = json.loads(response.get("output", "{}"))
                return WorkflowResult(
                    execution_arn=response.get("executionArn", ""),
                    status="SUCCEEDED",
                    output=output,
                    error=None,
                    duration_ms=duration_ms,
                    strategy=strategy,
                )
            else:
                error_msg = response.get("error", "Unknown error")
                cause = response.get("cause", "")
                return WorkflowResult(
                    execution_arn=response.get("executionArn", ""),
                    status="FAILED",
                    output=None,
                    error=f"{error_msg}: {cause}",
                    duration_ms=duration_ms,
                    strategy=strategy,
                )

        except self.sfn.exceptions.ExecutionTimedOut:
            duration_ms = (time.monotonic() - start_time) * 1000
            logger.warning(
                "Reasoning workflow timed out: strategy=%s duration=%.0fms",
                strategy, duration_ms,
            )
            return WorkflowResult(
                execution_arn="",
                status="TIMED_OUT",
                output=None,
                error="Execution timed out",
                duration_ms=duration_ms,
                strategy=strategy,
            )

        except Exception as e:
            duration_ms = (time.monotonic() - start_time) * 1000
            logger.error("Workflow execution failed: %s", str(e))
            return WorkflowResult(
                execution_arn="",
                status="FAILED",
                output=None,
                error=str(e),
                duration_ms=duration_ms,
                strategy=strategy,
            )

    async def execute_with_fallback(
        self,
        user_query: str,
        session_id: str,
        user_id: str = "",
        context: dict = None,
    ) -> WorkflowResult:
        """
        Execute reasoning with automatic fallback chain.

        Strategy: router -> react -> few-shot CoT -> zero-shot CoT

        If the router workflow times out, fall back to simpler strategies.
        Each fallback gets the remaining time budget.
        """
        total_start = time.monotonic()
        total_budget_ms = 2800

        # Try the routing workflow first (auto-selects strategy)
        result = await self.execute_reasoning(
            strategy="router",
            user_query=user_query,
            session_id=session_id,
            user_id=user_id,
            context=context,
            timeout_ms=total_budget_ms,
        )

        if result.status == "SUCCEEDED":
            return result

        # Fallback chain
        elapsed = (time.monotonic() - total_start) * 1000
        remaining = total_budget_ms - elapsed

        if remaining > 500:
            logger.info("Router failed, falling back to CoT (%.0fms remaining)", remaining)
            result = await self.execute_reasoning(
                strategy="cot",
                user_query=user_query,
                session_id=session_id,
                user_id=user_id,
                context=context,
                timeout_ms=int(remaining),
            )

            if result.status == "SUCCEEDED":
                return result

        # Final fallback: return a graceful degradation response
        elapsed = (time.monotonic() - total_start) * 1000
        return WorkflowResult(
            execution_arn="",
            status="DEGRADED",
            output={
                "answer": "I'm having trouble processing your request right now. "
                "Could you try rephrasing your question or asking something more specific?",
                "is_fallback": True,
            },
            error="All reasoning strategies exhausted",
            duration_ms=elapsed,
            strategy="fallback",
        )

Section 6: Comparison Tables for Reasoning Approaches

CoT Strategy Comparison

Dimension Zero-Shot CoT Few-Shot CoT Self-Consistency CoT
Mechanism Trigger phrase ("think step by step") Exemplar reasoning traces in prompt Multiple paths + majority vote
Exemplars required None 2-3 domain-specific examples None (uses temperature diversity)
Input token cost Low (~500 tokens) Medium (~1500 tokens with exemplars) Low per path, but N paths total
Output token cost Medium (~400 tokens) Medium-High (~600 tokens) N x Medium (~400 x N tokens)
Accuracy (recommendation) Good (70-75%) Very good (80-85%) Best (85-90%)
Latency ~500ms (single Haiku call) ~800ms (single Sonnet call) ~600ms (parallel Haiku calls)
Best model Haiku (cost-efficient) Sonnet (needs exemplar comprehension) Haiku (parallelized)
MangaAssist use case Simple FAQ, genre lookups Recommendations, comparisons Ambiguous/open-ended queries
Daily cost (1M msg) ~$0.63 ~$4.05 (30% of traffic) ~$0.10 (5% of traffic)

Reasoning Approach Selection Matrix

Query Type % of Traffic Primary Strategy Fallback Strategy Avg Latency Avg Cost/Query
Simple factual ("What genre is X?") 20% Zero-Shot CoT Direct model response 300-500ms $0.0003
Recommendation with constraints 45% ReAct + Few-Shot CoT Few-Shot CoT only 1.5-2.5s $0.0085
Open-ended recommendation 10% ReAct + Self-Consistency ReAct + Zero-Shot 2.0-2.8s $0.012
Comparison queries 10% Few-Shot CoT Zero-Shot CoT 600-900ms $0.004
Order/account management 10% Plan-and-Execute ReAct 1.0-2.0s $0.005
Ambiguous/exploration 5% Tree-of-Thought Self-Consistency 2.0-3.0s $0.018

Latency Breakdown per Strategy

Component Zero-Shot CoT Few-Shot CoT ReAct (3 iterations) Tree-of-Thought (depth=3)
Query classification 0ms (not needed) 50ms 50ms 50ms
Prompt construction 5ms 15ms 5ms per iteration 10ms per level
Bedrock inference 400ms (Haiku) 700ms (Sonnet) 400ms x 3 (Haiku) + 700ms (Sonnet) 400ms x 3 (parallel Haiku)
Tool execution 0ms 0ms 200ms x 3 0ms
State transitions 1ms (Express) 1ms 5ms (5 transitions) 8ms (8 transitions)
Response formatting 10ms 10ms 20ms 20ms
Total ~416ms ~776ms ~2,480ms ~1,688ms

Section 7: Latency and Cost Analysis at Scale

Daily Cost Projections (1M messages/day)

Strategy % Traffic Messages/Day Avg Input Tokens Avg Output Tokens Model Daily Input Cost Daily Output Cost Total Daily
Zero-Shot CoT 20% 200,000 500 400 Haiku $0.025 $0.100 $0.125
Few-Shot CoT 25% 250,000 1,500 600 Sonnet $1.125 $2.250 $3.375
ReAct (Thought) 45% 450,000 1,000 x 3 iter 500 x 3 iter Mixed $2.025 $4.556 $6.581
ReAct (Observe) 45% 450,000 600 x 3 iter 300 x 3 iter Haiku $0.203 $0.506 $0.709
Self-Consistency 5% 50,000 500 x 3 paths 400 x 3 paths Haiku $0.019 $0.075 $0.094
Tree-of-Thought 5% 50,000 800 x 9 nodes 500 x 9 nodes Haiku $0.090 $0.281 $0.371
Total 100% 1,000,000 $3.487 $7.768 $11.255

Monthly Cost Summary

Item Monthly Cost
Bedrock inference (CoT + ReAct + ToT) ~$337.65
Step Functions Express (9M executions avg) ~$45.00
Lambda invocations (tool executions) ~$28.00
DynamoDB (sessions + traces) ~$50.00
OpenSearch Serverless (vector search) ~$350.00
ElastiCache Redis (caching) ~$120.00
CloudWatch (logs + metrics) ~$35.00
Total monthly (reasoning infrastructure) ~$965.65

Latency Budget Management

"""
Latency budget manager for reasoning workflows.

Ensures the 3-second SLA by tracking elapsed time and dynamically
adjusting strategy complexity based on remaining budget.
"""

import time
import logging
from dataclasses import dataclass

logger = logging.getLogger("latency_budget")


@dataclass
class LatencyBudget:
    """Tracks remaining latency budget for a reasoning session."""
    total_budget_ms: float
    start_time: float
    network_overhead_ms: float = 200.0  # Reserved for WebSocket + API Gateway

    @property
    def elapsed_ms(self) -> float:
        return (time.monotonic() - self.start_time) * 1000

    @property
    def remaining_ms(self) -> float:
        return self.total_budget_ms - self.network_overhead_ms - self.elapsed_ms

    @property
    def usable_remaining_ms(self) -> float:
        """Remaining budget minus a safety margin for response formatting."""
        return max(0, self.remaining_ms - 100)

    def can_afford(self, estimated_ms: float) -> bool:
        """Check if we can afford an operation within the remaining budget."""
        return self.usable_remaining_ms >= estimated_ms

    def select_strategy(self) -> str:
        """Dynamically select reasoning strategy based on remaining budget."""
        remaining = self.usable_remaining_ms

        if remaining >= 2500:
            return "react"           # Full ReAct with 3+ iterations
        elif remaining >= 1500:
            return "react_limited"   # ReAct with max 2 iterations
        elif remaining >= 800:
            return "few_shot_cot"    # Single Sonnet call with exemplars
        elif remaining >= 400:
            return "zero_shot_cot"   # Single Haiku call
        else:
            return "cached_response" # Return a cached/template response

    def suggest_model(self) -> str:
        """Suggest the appropriate model based on remaining budget."""
        remaining = self.usable_remaining_ms

        if remaining >= 1000:
            return "anthropic.claude-3-sonnet-20240229-v1:0"
        else:
            return "anthropic.claude-3-haiku-20240307-v1:0"

    def log_checkpoint(self, label: str):
        """Log a latency checkpoint for debugging."""
        logger.info(
            "Latency checkpoint [%s]: elapsed=%.0fms remaining=%.0fms",
            label, self.elapsed_ms, self.remaining_ms,
        )


class ReActBudgetManager:
    """
    Manages per-iteration budgets within a ReAct loop.

    Divides the remaining time budget across expected iterations,
    leaving enough for best-effort synthesis if budget runs out.
    """

    SYNTHESIS_RESERVE_MS = 500  # Reserve for best-effort answer generation

    def __init__(self, budget: LatencyBudget, max_iterations: int = 5):
        self.budget = budget
        self.max_iterations = max_iterations

    def per_iteration_budget_ms(self, current_iteration: int) -> float:
        """Calculate the budget for the current iteration."""
        remaining = self.budget.usable_remaining_ms - self.SYNTHESIS_RESERVE_MS
        iterations_left = self.max_iterations - current_iteration

        if iterations_left <= 0:
            return 0

        return remaining / iterations_left

    def should_continue(self, current_iteration: int) -> bool:
        """Determine if another iteration is worthwhile."""
        per_iter = self.per_iteration_budget_ms(current_iteration)

        # Need at least 400ms for a useful Thought + Action + Observation
        return per_iter >= 400

    def should_use_sonnet(self, current_iteration: int) -> bool:
        """Determine if Sonnet is affordable for this iteration."""
        per_iter = self.per_iteration_budget_ms(current_iteration)
        # Sonnet typically takes 600-800ms, Haiku 200-400ms
        return per_iter >= 800

Key Takeaways

  1. Chain-of-Thought is a spectrum, not a single technique -- zero-shot CoT costs nearly nothing ($0.125/day at 200K queries) and works for simple factual questions. Few-shot CoT provides the best accuracy for recommendation queries at moderate cost. Self-consistency provides the highest reliability for ambiguous queries by sampling multiple reasoning paths. Choose the right strategy per query type using the classification state.

  2. Step Functions Express Workflows are mandatory for real-time reasoning -- with sub-millisecond state transitions and synchronous execution, they add negligible overhead to the 3-second latency budget. Standard Workflows would consume over half the budget in transition latency alone. The trade-off is that Express Workflows provide less execution visibility (CloudWatch Logs only, no visual history in console).

  3. Tree-of-Thought is powerful but expensive -- gate it carefully -- at 9+ Bedrock invocations per query, ToT costs 6x more than standard ReAct. Reserve it for the 5% of queries that are genuinely ambiguous and where exploring multiple angles produces meaningfully better answers. Use the query classifier to route only appropriate queries to ToT.

  4. The ReAct loop and CoT are complementary, not competing -- CoT structures the reasoning within each Thought step (what to think about), while ReAct structures the overall problem-solving process (when to think vs. when to act). Every Thought step in a ReAct loop uses a CoT prompt internally. The ASL state machine orchestrates the outer loop; the prompt templates orchestrate the inner reasoning.

  5. Dynamic budget management is critical at scale -- the LatencyBudget and ReActBudgetManager classes ensure every query stays within the 3-second SLA even when individual Bedrock calls are slow. By dynamically downgrading strategy complexity (ReAct to CoT to cached response) based on remaining time, the system degrades gracefully rather than timing out.

  6. Monthly infrastructure cost is approximately $966 for the reasoning layer -- Bedrock inference dominates at ~$338/month, followed by OpenSearch at ~$350/month. The reasoning orchestration itself (Step Functions + Lambda) costs only ~$73/month. The biggest optimization lever is model routing: using Haiku ($0.25/$1.25 per 1M tokens) for observation evaluation and simple queries saves roughly 60% compared to using Sonnet for everything.