LOCAL PREVIEW View on GitHub

Problem Decomposition Strategies for Advanced Problem-Solving Systems

MangaAssist context: JP Manga store chatbot on AWS — Bedrock Claude 3 (Sonnet at $3/$15 per 1M tokens input/output, Haiku at $0.25/$1.25), OpenSearch Serverless (vector store), DynamoDB (sessions/products), ECS Fargate (orchestrator), API Gateway WebSocket, ElastiCache Redis. Target: useful answer in under 3 seconds, 1M messages/day scale.


Skill Mapping

Dimension Detail
Certification AWS AIP-C01 — AI Practitioner
Domain 2 — Development and Implementation of GenAI Solutions
Task 2.1 — Develop agentic AI solutions using AWS services
Skill 2.1.2 — Create advanced problem-solving systems to give FMs the ability to break down and solve complex problems by following structured reasoning steps (e.g., Step Functions for ReAct patterns, chain-of-thought reasoning)
This File Deep-dive into multi-step problem decomposition, parallel sub-task execution, result synthesis, and Step Functions orchestration patterns

Skill Scope Statement

This file extends the ReAct reasoning architecture (covered in File 01) with advanced decomposition strategies for complex, multi-constraint queries. It covers: recursive query decomposition into atomic sub-tasks, parallel execution waves via Step Functions, result synthesis from multiple tool outputs, conflict resolution when parallel sub-tasks return contradictory results, and cost-optimized model routing within decomposition trees. MangaAssist uses these patterns for queries like "Find a completed seinen manga under 20 volumes similar to Berserk but with less graphic content, available in English, under $15 per volume."


Mind Map — Problem Decomposition Strategies

mindmap
  root((Problem<br/>Decomposition))
    Decomposition Patterns
      Recursive Decomposition
        Complex Query → Atomic Sub-Queries
        Dependency Graph Construction
        Base Case Detection
      Parallel Fan-Out
        Independent Sub-Tasks
        Step Functions Parallel State
        Concurrent Tool Invocation
      Sequential Refinement
        Each Step Narrows Solution Space
        Progressive Filtering
        Iterative Constraint Tightening
      Hierarchical Planning
        High-Level Plan Generation
        Detailed Sub-Plan Expansion
        Plan Validation Before Execution
    Execution Strategies
      Wave-Based Execution
        Dependency-Ordered Waves
        Maximum Parallelism Per Wave
        Budget Allocation Per Wave
      Step Functions Orchestration
        Express Workflows (sub-3s)
        Parallel State with MaxConcurrency
        Map State for Dynamic Fan-Out
        Choice State for Conditional Paths
      Timeout-Aware Scheduling
        Per-Wave Time Budgets
        Early Termination on Budget Exhaustion
        Best-Effort Synthesis
    Result Synthesis
      Aggregation Strategies
        Union (combine all results)
        Intersection (common across sub-tasks)
        Ranked Merge (score-weighted)
      Conflict Resolution
        Majority Vote
        Confidence-Weighted Selection
        Human Escalation
      Quality Scoring
        Completeness Check
        Consistency Validation
        Relevance to Original Query
    Cost Optimization
      Model Routing in Decomposition
        Sonnet for Planning ($3/1M)
        Haiku for Sub-Task Execution ($0.25/1M)
        Haiku for Result Evaluation ($0.25/1M)
      Token Budget Distribution
        Proportional to Sub-Task Complexity
        Reserve Budget for Synthesis
        Early Exit on High Confidence

Architecture — Decomposition Pipeline

flowchart TB
    subgraph Input["User Query"]
        Q["Find completed seinen manga<br/>under 20 volumes, similar to Berserk,<br/>less graphic, English, under $15/vol"]
    end

    subgraph Planner["Decomposition Planner (Sonnet — $3/1M)"]
        PLAN["Generate Sub-Task Plan"]
        VALIDATE["Validate Plan Feasibility"]
    end

    subgraph Wave1["Wave 1 — Independent Sub-Tasks (Parallel)"]
        T1["Sub-Task 1:<br/>Classify Berserk<br/>(genre/themes/tone)<br/>Tool: genre_classifier"]
        T2["Sub-Task 2:<br/>Search seinen manga<br/>< 20 volumes, completed<br/>Tool: opensearch_search"]
        T3["Sub-Task 3:<br/>Check English availability<br/>filter<br/>Tool: inventory_check"]
    end

    subgraph Wave2["Wave 2 — Dependent Sub-Tasks"]
        T4["Sub-Task 4:<br/>Filter by similarity to Berserk<br/>but less graphic<br/>Uses: T1 + T2 results"]
        T5["Sub-Task 5:<br/>Price check < $15/vol<br/>Uses: T2 + T3 results"]
    end

    subgraph Wave3["Wave 3 — Synthesis"]
        T6["Sub-Task 6:<br/>Intersect T4 and T5<br/>Rank by overall match"]
    end

    subgraph Synthesis["Result Synthesizer (Haiku — $0.25/1M)"]
        MERGE["Merge and Rank Results"]
        FORMAT["Format User Response"]
    end

    Q --> PLAN
    PLAN --> VALIDATE
    VALIDATE --> T1 & T2 & T3
    T1 --> T4
    T2 --> T4 & T5
    T3 --> T5
    T4 --> T6
    T5 --> T6
    T6 --> MERGE
    MERGE --> FORMAT

1. Recursive Query Decomposition

1.1 The Decomposition Engine

"""
MangaAssist Query Decomposition Engine — breaks complex user queries
into atomic sub-tasks organized as a directed acyclic graph (DAG).
Uses Sonnet for the initial planning step and Haiku for sub-task execution.
"""

import json
import time
import logging
import hashlib
from typing import Any, Optional
from dataclasses import dataclass, field
from enum import Enum

import boto3
from botocore.config import Config

logger = logging.getLogger("manga_decomposer")

BEDROCK_CONFIG = Config(
    retries={"max_attempts": 2, "mode": "adaptive"},
    read_timeout=5,
    connect_timeout=2,
)

bedrock = boto3.client("bedrock-runtime", config=BEDROCK_CONFIG)

SONNET_MODEL_ID = "anthropic.claude-3-sonnet-20240229-v1:0"
HAIKU_MODEL_ID = "anthropic.claude-3-haiku-20240307-v1:0"


class SubTaskStatus(str, Enum):
    PENDING = "pending"
    RUNNING = "running"
    COMPLETED = "completed"
    FAILED = "failed"
    SKIPPED = "skipped"


@dataclass
class SubTask:
    """An atomic sub-task in the decomposition DAG."""
    task_id: str
    description: str
    tool_name: str
    parameters: dict = field(default_factory=dict)
    depends_on: list[str] = field(default_factory=list)
    priority: int = 1
    status: SubTaskStatus = SubTaskStatus.PENDING
    result: Any = None
    error: Optional[str] = None
    latency_ms: float = 0.0
    model_for_eval: str = HAIKU_MODEL_ID  # Default to cheap model

    @property
    def is_terminal(self) -> bool:
        return self.status in (
            SubTaskStatus.COMPLETED,
            SubTaskStatus.FAILED,
            SubTaskStatus.SKIPPED,
        )


@dataclass
class DecompositionPlan:
    """Complete decomposition plan for a user query."""
    query: str
    sub_tasks: list[SubTask] = field(default_factory=list)
    waves: list[list[str]] = field(default_factory=list)  # Task ID groups
    total_estimated_latency_ms: float = 0.0
    planning_latency_ms: float = 0.0
    planning_tokens: dict = field(default_factory=dict)


class QueryDecompositionEngine:
    """
    Decomposes complex manga queries into sub-task DAGs.
    Uses a two-phase approach:
      Phase 1: LLM-based planning (Sonnet) to generate the sub-task list
      Phase 2: Topological sort to organize into execution waves
    """

    DECOMPOSITION_PROMPT = """You are the query planner for MangaAssist.
Decompose this user query into atomic sub-tasks that can be executed by tools.

USER QUERY: {query}

AVAILABLE TOOLS:
- opensearch_vector_search: Semantic search over manga catalog (params: query, filters)
- dynamodb_product_lookup: Get specific manga details (params: manga_id or title)
- elasticache_rating_cache: Get ratings for manga IDs (params: manga_ids)
- genre_theme_classifier: Classify manga genre/theme/tone (params: description or title)
- inventory_check: Check stock and pricing (params: title_ids)

RULES:
1. Each sub-task must use exactly ONE tool.
2. Mark dependencies: if sub-task B needs output from A, list A as a dependency.
3. Independent sub-tasks should have NO dependencies (they run in parallel).
4. Keep total sub-tasks between 3 and 7 for the latency budget.
5. Priority 1 = most critical, 5 = nice-to-have.

Respond with a JSON array:
[
  {{
    "task_id": "t1",
    "description": "What this sub-task accomplishes",
    "tool_name": "tool_name",
    "parameters": {{}},
    "depends_on": [],
    "priority": 1
  }}
]"""

    async def decompose(self, query: str) -> DecompositionPlan:
        """
        Generate a decomposition plan for a complex query.
        Returns a plan with sub-tasks organized into execution waves.
        """
        start = time.monotonic()

        # Phase 1: LLM-based planning
        response = bedrock.invoke_model(
            modelId=SONNET_MODEL_ID,
            contentType="application/json",
            accept="application/json",
            body=json.dumps({
                "anthropic_version": "bedrock-2023-05-31",
                "max_tokens": 1000,
                "temperature": 0.0,
                "messages": [{
                    "role": "user",
                    "content": self.DECOMPOSITION_PROMPT.format(query=query),
                }],
            }),
        )

        result = json.loads(response["body"].read())
        usage = result.get("usage", {})
        text = result["content"][0]["text"]
        tasks_json = json.loads(text)

        planning_ms = (time.monotonic() - start) * 1000

        # Build sub-tasks
        sub_tasks = []
        for t in tasks_json:
            sub_tasks.append(SubTask(
                task_id=t["task_id"],
                description=t["description"],
                tool_name=t["tool_name"],
                parameters=t.get("parameters", {}),
                depends_on=t.get("depends_on", []),
                priority=t.get("priority", 3),
            ))

        # Phase 2: Organize into execution waves
        waves = self._build_waves(sub_tasks)

        plan = DecompositionPlan(
            query=query,
            sub_tasks=sub_tasks,
            waves=waves,
            planning_latency_ms=planning_ms,
            planning_tokens={
                "input": usage.get("input_tokens", 0),
                "output": usage.get("output_tokens", 0),
            },
        )

        logger.info(
            "Decomposed query into %d sub-tasks across %d waves (%.0fms)",
            len(sub_tasks), len(waves), planning_ms,
        )
        return plan

    def _build_waves(self, sub_tasks: list[SubTask]) -> list[list[str]]:
        """
        Topological sort of sub-tasks into parallel execution waves.
        Tasks in the same wave have no dependencies on each other.
        """
        completed = set()
        waves = []
        remaining = {t.task_id: t for t in sub_tasks}

        while remaining:
            wave = []
            for task_id, task in remaining.items():
                if all(dep in completed for dep in task.depends_on):
                    wave.append(task_id)

            if not wave:
                # Circular dependency — break by taking highest priority
                wave = [min(remaining.keys(), key=lambda tid: remaining[tid].priority)]
                logger.warning("Circular dependency detected — breaking with %s", wave[0])

            for task_id in wave:
                del remaining[task_id]
                completed.add(task_id)

            waves.append(wave)

        return waves

1.2 Complexity-Based Decomposition Routing

"""
Routes queries to either direct execution or decomposition based on
estimated complexity. Simple queries skip the planning step entirely.
"""


class ComplexityRouter:
    """
    Estimates query complexity and routes to the appropriate handler.

    Simple queries (single-constraint, direct lookup) → Direct execution
    Complex queries (multi-constraint, comparison, similarity) → Decomposition
    """

    COMPLEXITY_SIGNALS = {
        "high": [
            "similar to", "like", "but", "except", "compare",
            "between", "under", "over", "recommend", "best",
            "completed", "ongoing", "volumes", "and also",
        ],
        "medium": [
            "find", "search", "available", "price", "in stock",
            "genre", "author", "rating",
        ],
        "low": [
            "order", "track", "status", "delivery", "MNG-",
            "what is", "who is", "when",
        ],
    }

    def estimate_complexity(self, query: str) -> str:
        """
        Return 'high', 'medium', or 'low' complexity estimate.
        High → decompose into sub-tasks
        Medium → single ReAct loop
        Low → direct tool call
        """
        query_lower = query.lower()
        high_count = sum(
            1 for signal in self.COMPLEXITY_SIGNALS["high"]
            if signal in query_lower
        )
        if high_count >= 2:
            return "high"

        medium_count = sum(
            1 for signal in self.COMPLEXITY_SIGNALS["medium"]
            if signal in query_lower
        )
        if medium_count >= 2 or high_count >= 1:
            return "medium"

        return "low"

    def should_decompose(self, query: str) -> bool:
        """Determine if a query should go through the decomposition pipeline."""
        return self.estimate_complexity(query) == "high"

2. Parallel Sub-Task Execution

2.1 Wave Executor with Step Functions

"""
MangaAssist Wave Executor — runs sub-task waves in parallel using
asyncio and Step Functions Map state for production deployment.
"""

import asyncio
import time
import logging
from typing import Any

logger = logging.getLogger("manga_wave_executor")


class WaveExecutor:
    """
    Executes decomposition waves, running tasks within each wave in parallel
    and waves sequentially. Respects the latency budget.
    """

    def __init__(self, tool_dispatcher, budget_ms: float = 2500.0):
        self._dispatcher = tool_dispatcher
        self._budget_ms = budget_ms

    async def execute_plan(self, plan: DecompositionPlan) -> dict:
        """
        Execute all waves in the decomposition plan.
        Returns aggregated results from all sub-tasks.
        """
        start = time.monotonic()
        all_results = {}
        task_lookup = {t.task_id: t for t in plan.sub_tasks}

        for wave_idx, wave_task_ids in enumerate(plan.waves):
            elapsed_ms = (time.monotonic() - start) * 1000
            remaining_ms = self._budget_ms - elapsed_ms

            if remaining_ms <= 200:
                logger.warning(
                    "Budget exhausted before wave %d — skipping %d tasks",
                    wave_idx, len(wave_task_ids),
                )
                for tid in wave_task_ids:
                    task_lookup[tid].status = SubTaskStatus.SKIPPED
                continue

            # Calculate per-task timeout for this wave
            waves_remaining = len(plan.waves) - wave_idx
            wave_budget_ms = remaining_ms / waves_remaining
            per_task_timeout = wave_budget_ms / max(len(wave_task_ids), 1)

            logger.info(
                "Wave %d: executing %d tasks (budget: %.0fms, per-task: %.0fms)",
                wave_idx, len(wave_task_ids), wave_budget_ms, per_task_timeout,
            )

            # Execute all tasks in this wave concurrently
            wave_tasks = []
            for task_id in wave_task_ids:
                task = task_lookup[task_id]
                # Inject results from dependencies
                dep_results = {
                    dep_id: all_results.get(dep_id)
                    for dep_id in task.depends_on
                    if dep_id in all_results
                }
                wave_tasks.append(
                    self._execute_task(task, dep_results, per_task_timeout)
                )

            wave_results = await asyncio.gather(*wave_tasks, return_exceptions=True)

            # Collect results
            for task_id, result in zip(wave_task_ids, wave_results):
                task = task_lookup[task_id]
                if isinstance(result, Exception):
                    task.status = SubTaskStatus.FAILED
                    task.error = str(result)
                    all_results[task_id] = {"error": str(result)}
                else:
                    task.status = SubTaskStatus.COMPLETED
                    task.result = result
                    all_results[task_id] = result

        total_ms = (time.monotonic() - start) * 1000
        logger.info(
            "Plan executed in %.0fms: %d completed, %d failed, %d skipped",
            total_ms,
            sum(1 for t in plan.sub_tasks if t.status == SubTaskStatus.COMPLETED),
            sum(1 for t in plan.sub_tasks if t.status == SubTaskStatus.FAILED),
            sum(1 for t in plan.sub_tasks if t.status == SubTaskStatus.SKIPPED),
        )

        return all_results

    async def _execute_task(
        self,
        task: SubTask,
        dependency_results: dict,
        timeout_ms: float,
    ) -> Any:
        """Execute a single sub-task with timeout enforcement."""
        task.status = SubTaskStatus.RUNNING
        start = time.monotonic()

        # Enrich parameters with dependency results
        enriched_params = dict(task.parameters)
        if dependency_results:
            enriched_params["_dependency_results"] = dependency_results

        try:
            result = await asyncio.wait_for(
                self._dispatcher.dispatch(task.tool_name, enriched_params),
                timeout=timeout_ms / 1000,
            )
            task.latency_ms = (time.monotonic() - start) * 1000
            return result
        except asyncio.TimeoutError:
            task.latency_ms = (time.monotonic() - start) * 1000
            raise TimeoutError(
                f"Sub-task {task.task_id} ({task.tool_name}) timed out "
                f"after {task.latency_ms:.0f}ms"
            )

2.2 Step Functions ASL for Parallel Decomposition

{
  "Comment": "MangaAssist Parallel Decomposition — Express Workflow",
  "StartAt": "DecomposeQuery",
  "States": {
    "DecomposeQuery": {
      "Type": "Task",
      "Resource": "arn:aws:lambda:us-east-1:123456789012:function:manga-decompose",
      "TimeoutSeconds": 3,
      "ResultPath": "$.plan",
      "Next": "ExecuteWave1"
    },
    "ExecuteWave1": {
      "Type": "Parallel",
      "Branches": [
        {
          "StartAt": "ClassifyReference",
          "States": {
            "ClassifyReference": {
              "Type": "Task",
              "Resource": "arn:aws:lambda:us-east-1:123456789012:function:manga-classify",
              "TimeoutSeconds": 2,
              "End": true
            }
          }
        },
        {
          "StartAt": "SearchCatalog",
          "States": {
            "SearchCatalog": {
              "Type": "Task",
              "Resource": "arn:aws:lambda:us-east-1:123456789012:function:manga-search",
              "TimeoutSeconds": 2,
              "End": true
            }
          }
        },
        {
          "StartAt": "CheckAvailability",
          "States": {
            "CheckAvailability": {
              "Type": "Task",
              "Resource": "arn:aws:lambda:us-east-1:123456789012:function:manga-inventory",
              "TimeoutSeconds": 2,
              "End": true
            }
          }
        }
      ],
      "ResultPath": "$.wave1Results",
      "Next": "ExecuteWave2",
      "Catch": [
        {
          "ErrorEquals": ["States.ALL"],
          "ResultPath": "$.wave1Error",
          "Next": "SynthesizeBestEffort"
        }
      ]
    },
    "ExecuteWave2": {
      "Type": "Task",
      "Resource": "arn:aws:lambda:us-east-1:123456789012:function:manga-filter-rank",
      "Parameters": {
        "wave1Results.$": "$.wave1Results",
        "originalQuery.$": "$.plan.query"
      },
      "TimeoutSeconds": 2,
      "ResultPath": "$.wave2Results",
      "Next": "SynthesizeResponse"
    },
    "SynthesizeResponse": {
      "Type": "Task",
      "Resource": "arn:aws:lambda:us-east-1:123456789012:function:manga-synthesize",
      "TimeoutSeconds": 2,
      "End": true
    },
    "SynthesizeBestEffort": {
      "Type": "Task",
      "Resource": "arn:aws:lambda:us-east-1:123456789012:function:manga-best-effort",
      "TimeoutSeconds": 2,
      "End": true
    }
  }
}

3. Result Synthesis and Conflict Resolution

3.1 Result Synthesizer

"""
MangaAssist Result Synthesizer — merges outputs from multiple sub-tasks
into a coherent response. Handles conflicts, missing data, and ranking.
"""

import json
import logging
from dataclasses import dataclass, field
from typing import Any

logger = logging.getLogger("manga_synthesizer")


@dataclass
class SynthesisResult:
    """Final synthesized result from the decomposition pipeline."""
    answer: str
    confidence: float
    sources: list[str]
    completeness: float  # 0.0 - 1.0
    conflicts_resolved: int = 0
    sub_tasks_used: int = 0
    sub_tasks_failed: int = 0


class ResultSynthesizer:
    """
    Merges results from parallel sub-tasks into a final answer.

    Strategies:
    - UNION: Combine all results (for search tasks)
    - INTERSECTION: Keep only items appearing in all results (for filter tasks)
    - RANKED_MERGE: Score-weighted combination (for recommendation tasks)
    """

    def synthesize(
        self,
        query: str,
        sub_task_results: dict[str, Any],
        plan: DecompositionPlan,
    ) -> SynthesisResult:
        """
        Synthesize a final answer from sub-task results.
        """
        # Collect all manga candidates from search/classification tasks
        candidates = self._extract_candidates(sub_task_results)

        # Apply filters from filter tasks
        filtered = self._apply_filters(candidates, sub_task_results)

        # Resolve conflicts between sub-task results
        resolved, conflict_count = self._resolve_conflicts(filtered, sub_task_results)

        # Rank the final results
        ranked = self._rank_results(resolved, query)

        # Count task status
        completed = sum(
            1 for t in plan.sub_tasks if t.status == SubTaskStatus.COMPLETED
        )
        failed = sum(
            1 for t in plan.sub_tasks if t.status == SubTaskStatus.FAILED
        )
        total = len(plan.sub_tasks)
        completeness = completed / total if total > 0 else 0.0

        # Generate the response text
        answer = self._format_answer(ranked, query, completeness)

        return SynthesisResult(
            answer=answer,
            confidence=min(completeness, 0.9 if conflict_count == 0 else 0.7),
            sources=[t.tool_name for t in plan.sub_tasks if t.status == SubTaskStatus.COMPLETED],
            completeness=completeness,
            conflicts_resolved=conflict_count,
            sub_tasks_used=completed,
            sub_tasks_failed=failed,
        )

    def _extract_candidates(self, results: dict[str, Any]) -> list[dict]:
        """Extract manga candidates from search-type sub-task results."""
        candidates = []
        for task_id, result in results.items():
            if isinstance(result, dict):
                # Handle search results
                for item in result.get("matches", []):
                    item["_source_task"] = task_id
                    candidates.append(item)
                for item in result.get("results", []):
                    item["_source_task"] = task_id
                    candidates.append(item)
        return candidates

    def _apply_filters(
        self, candidates: list[dict], results: dict[str, Any]
    ) -> list[dict]:
        """Apply filter constraints from filter-type sub-tasks."""
        filtered = list(candidates)

        for task_id, result in results.items():
            if isinstance(result, dict) and "available_ids" in result:
                available = set(result["available_ids"])
                filtered = [c for c in filtered if c.get("title_id") in available]

            if isinstance(result, dict) and "price_data" in result:
                price_data = result["price_data"]
                filtered = [
                    c for c in filtered
                    if c.get("title_id") in price_data
                    and price_data[c["title_id"]].get("price_usd", 999) <= 15.0
                ]

        return filtered

    def _resolve_conflicts(
        self, candidates: list[dict], results: dict[str, Any]
    ) -> tuple[list[dict], int]:
        """
        Resolve conflicts when sub-tasks disagree.
        Example: search says title X is relevant, but classifier says
        its genre does not match.
        """
        conflicts = 0
        resolved = []

        for candidate in candidates:
            # Check if any classification result contradicts the search result
            title_id = candidate.get("title_id", "")
            classification = results.get("classification", {})

            if classification and title_id:
                classified_tone = classification.get(title_id, {}).get("tone", "")
                candidate_tone = candidate.get("tone", "")
                if classified_tone and candidate_tone and classified_tone != candidate_tone:
                    conflicts += 1
                    # Prefer the classification result (more specific)
                    candidate["tone"] = classified_tone

            resolved.append(candidate)

        if conflicts > 0:
            logger.info("Resolved %d conflicts between sub-task results", conflicts)

        return resolved, conflicts

    def _rank_results(self, candidates: list[dict], query: str) -> list[dict]:
        """Rank candidates by relevance score."""
        for c in candidates:
            # Compute composite score
            search_score = c.get("score", 0.0)
            rating = c.get("rating", 0.0) / 5.0  # Normalize to 0-1
            in_stock = 1.0 if c.get("in_stock", False) else 0.3
            c["_composite_score"] = (
                0.5 * search_score + 0.3 * rating + 0.2 * in_stock
            )

        candidates.sort(key=lambda c: c.get("_composite_score", 0), reverse=True)
        return candidates[:5]  # Top 5

    def _format_answer(
        self, ranked: list[dict], query: str, completeness: float
    ) -> str:
        """Format the ranked results into a user-friendly response."""
        if not ranked:
            return (
                "I searched our catalog but could not find manga matching all "
                "your criteria. Try relaxing some constraints (e.g., remove the "
                "volume limit or price cap) and I will search again."
            )

        lines = [f"Here are my top recommendations based on your criteria:\n"]
        for i, manga in enumerate(ranked, 1):
            title = manga.get("title", "Unknown")
            author = manga.get("author", "Unknown")
            volumes = manga.get("volumes", "?")
            price = manga.get("price_usd", "?")
            rating = manga.get("rating", "?")
            lines.append(
                f"{i}. **{title}** by {author}\n"
                f"   - {volumes} volumes | ${price}/vol | Rating: {rating}/5\n"
                f"   - {manga.get('synopsis', 'No synopsis available')[:150]}..."
            )

        if completeness < 0.8:
            lines.append(
                "\n*Note: Some search results may be incomplete due to "
                "temporary service delays. Try again for more complete results.*"
            )

        return "\n".join(lines)

4. Cost-Optimized Model Routing in Decomposition

4.1 Per-Phase Model Selection

"""
MangaAssist cost-optimized model routing — assigns the cheapest
capable model to each phase of the decomposition pipeline.

Cost breakdown at 1M messages/day (30% go through decomposition):
  - Planning (Sonnet): 300K * ~500 input tokens * $3/1M = $450/day
  - Sub-tasks (Haiku): 300K * 5 tasks * ~200 tokens * $0.25/1M = $75/day
  - Synthesis (Haiku): 300K * ~300 tokens * $0.25/1M = $22.50/day
  - Total decomposition: ~$547.50/day

vs. All-Sonnet: 300K * 6 calls * ~400 tokens * $3/1M = $2,160/day
Savings: ~$1,612.50/day (75% reduction)
"""


class DecompositionModelRouter:
    """
    Assigns models to each phase of the decomposition pipeline.
    Planning uses Sonnet (better at complex plan generation).
    Execution and synthesis use Haiku (cheaper, adequate quality).
    """

    MODEL_ASSIGNMENTS = {
        "planning": {
            "model_id": SONNET_MODEL_ID,
            "max_tokens": 1000,
            "temperature": 0.0,
            "rationale": "Complex plan generation needs stronger reasoning",
        },
        "sub_task_execution": {
            "model_id": HAIKU_MODEL_ID,
            "max_tokens": 400,
            "temperature": 0.0,
            "rationale": "Sub-tasks are atomic and well-scoped; Haiku suffices",
        },
        "observation_eval": {
            "model_id": HAIKU_MODEL_ID,
            "max_tokens": 300,
            "temperature": 0.0,
            "rationale": "Evaluating tool output is straightforward classification",
        },
        "synthesis": {
            "model_id": HAIKU_MODEL_ID,
            "max_tokens": 600,
            "temperature": 0.3,
            "rationale": "Merging results and generating response; Haiku is adequate",
        },
        "conflict_resolution": {
            "model_id": SONNET_MODEL_ID,
            "max_tokens": 500,
            "temperature": 0.0,
            "rationale": "Resolving contradictions needs stronger reasoning",
        },
    }

    def get_config(self, phase: str) -> dict:
        """Get the model configuration for a decomposition phase."""
        return self.MODEL_ASSIGNMENTS.get(phase, self.MODEL_ASSIGNMENTS["sub_task_execution"])

    def estimate_cost(self, plan: DecompositionPlan) -> dict:
        """
        Estimate the token cost for executing a decomposition plan.
        """
        planning_config = self.get_config("planning")
        exec_config = self.get_config("sub_task_execution")
        synth_config = self.get_config("synthesis")

        # Estimate token usage per phase
        planning_input = 500  # System prompt + query + tool descriptions
        planning_output = plan.planning_tokens.get("output", 300)

        exec_input_per_task = 200
        exec_output_per_task = 150
        num_tasks = len(plan.sub_tasks)

        synth_input = 300 + (num_tasks * 100)  # Grows with number of results
        synth_output = 400

        # Calculate costs
        sonnet_input_rate = 3.0 / 1_000_000
        sonnet_output_rate = 15.0 / 1_000_000
        haiku_input_rate = 0.25 / 1_000_000
        haiku_output_rate = 1.25 / 1_000_000

        planning_cost = (
            planning_input * sonnet_input_rate +
            planning_output * sonnet_output_rate
        )
        exec_cost = num_tasks * (
            exec_input_per_task * haiku_input_rate +
            exec_output_per_task * haiku_output_rate
        )
        synth_cost = (
            synth_input * haiku_input_rate +
            synth_output * haiku_output_rate
        )

        return {
            "planning_cost_usd": round(planning_cost, 6),
            "execution_cost_usd": round(exec_cost, 6),
            "synthesis_cost_usd": round(synth_cost, 6),
            "total_cost_usd": round(planning_cost + exec_cost + synth_cost, 6),
            "num_tasks": num_tasks,
        }

5. Comparison — Decomposition Strategies

Dimension Recursive Decomposition Parallel Fan-Out Sequential Refinement Hierarchical Planning
Core Idea Break query into nested sub-queries recursively Run all independent sub-tasks simultaneously Each step narrows the solution space Generate a plan first, then execute
Best For Multi-constraint queries with nested logic Independent data lookups from multiple sources Progressively refining search results Complex workflows with known patterns
Latency Medium — depth determines latency Low — bounded by slowest parallel task High — sequential steps add up Medium — planning cost + execution
Token Cost High — planning at each recursion level Low — single planning step, parallel execution Medium — each step uses tokens Medium — one planning call + execution
MangaAssist Example "Find seinen manga similar to Berserk but less graphic, completed, under 20 volumes" "Get ratings + inventory + prices for 10 titles" "Search broadly → filter by genre → filter by price → rank" "Create reading list: pick genre, find 5 titles, check stock, format list"
Error Recovery Good — skip failed sub-tree, proceed with siblings Good — partial results from successful tasks Poor — failure in early step blocks later steps Moderate — re-plan on failure
Max Parallelism Limited by dependency depth Maximum — all tasks run at once None — strictly sequential Defined by plan structure
AWS Implementation Step Functions with nested workflows Step Functions Parallel state Step Functions sequential Task states Step Functions with planning Lambda + Map state
When to Use >3 constraints with logical dependencies >3 independent data sources needed Clear "funnel" pattern in data Well-understood multi-step workflows

6. Key Takeaways

  1. Decomposition saves latency through parallelism — a 6-step sequential pipeline takes ~3s, but decomposed into 3 waves of 2 parallel tasks, it takes ~1.5s. For MangaAssist's 3-second budget, this is the difference between success and timeout.

  2. Use Sonnet for planning, Haiku for everything else — the planning step needs strong reasoning to generate a good sub-task DAG. Execution, evaluation, and synthesis are simpler tasks where Haiku performs adequately. This cuts decomposition costs by ~75%.

  3. Wave-based execution with budget tracking is essential — each wave gets a proportional share of the remaining latency budget. If Wave 1 is slow, Wave 2 gets a tighter timeout, and Wave 3 may be skipped entirely with a best-effort synthesis.

  4. Result synthesis must handle missing data gracefully — at least one sub-task will fail or time out in ~5% of decompositions. The synthesizer should produce a useful answer from whatever data is available, with a completeness disclaimer if needed.

  5. Conflict resolution between parallel sub-tasks is a real problem — when a search tool says a manga is relevant but a classifier says its genre does not match, someone must decide. Default to the more specific tool (classifier over search).

  6. Complexity routing avoids unnecessary overhead — only ~30% of MangaAssist queries need decomposition. Simple queries ("Where is my order?") should go straight to a single tool call, saving the ~200ms planning step and its Sonnet token cost.

  7. Step Functions Express Workflows are the production orchestrator — standard workflows have per-state-transition costs and latency that exceed the budget. Express Workflows support the Parallel state for wave execution and have sub-millisecond transition overhead.


Next file: 03-scenarios-and-runbooks.md — Five production scenarios for problem-solving system failures.