Problem Decomposition Strategies for Advanced Problem-Solving Systems
MangaAssist context: JP Manga store chatbot on AWS — Bedrock Claude 3 (Sonnet at $3/$15 per 1M tokens input/output, Haiku at $0.25/$1.25), OpenSearch Serverless (vector store), DynamoDB (sessions/products), ECS Fargate (orchestrator), API Gateway WebSocket, ElastiCache Redis. Target: useful answer in under 3 seconds, 1M messages/day scale.
Skill Mapping
| Dimension | Detail |
|---|---|
| Certification | AWS AIP-C01 — AI Practitioner |
| Domain | 2 — Development and Implementation of GenAI Solutions |
| Task | 2.1 — Develop agentic AI solutions using AWS services |
| Skill | 2.1.2 — Create advanced problem-solving systems to give FMs the ability to break down and solve complex problems by following structured reasoning steps (e.g., Step Functions for ReAct patterns, chain-of-thought reasoning) |
| This File | Deep-dive into multi-step problem decomposition, parallel sub-task execution, result synthesis, and Step Functions orchestration patterns |
Skill Scope Statement
This file extends the ReAct reasoning architecture (covered in File 01) with advanced decomposition strategies for complex, multi-constraint queries. It covers: recursive query decomposition into atomic sub-tasks, parallel execution waves via Step Functions, result synthesis from multiple tool outputs, conflict resolution when parallel sub-tasks return contradictory results, and cost-optimized model routing within decomposition trees. MangaAssist uses these patterns for queries like "Find a completed seinen manga under 20 volumes similar to Berserk but with less graphic content, available in English, under $15 per volume."
Mind Map — Problem Decomposition Strategies
mindmap
root((Problem<br/>Decomposition))
Decomposition Patterns
Recursive Decomposition
Complex Query → Atomic Sub-Queries
Dependency Graph Construction
Base Case Detection
Parallel Fan-Out
Independent Sub-Tasks
Step Functions Parallel State
Concurrent Tool Invocation
Sequential Refinement
Each Step Narrows Solution Space
Progressive Filtering
Iterative Constraint Tightening
Hierarchical Planning
High-Level Plan Generation
Detailed Sub-Plan Expansion
Plan Validation Before Execution
Execution Strategies
Wave-Based Execution
Dependency-Ordered Waves
Maximum Parallelism Per Wave
Budget Allocation Per Wave
Step Functions Orchestration
Express Workflows (sub-3s)
Parallel State with MaxConcurrency
Map State for Dynamic Fan-Out
Choice State for Conditional Paths
Timeout-Aware Scheduling
Per-Wave Time Budgets
Early Termination on Budget Exhaustion
Best-Effort Synthesis
Result Synthesis
Aggregation Strategies
Union (combine all results)
Intersection (common across sub-tasks)
Ranked Merge (score-weighted)
Conflict Resolution
Majority Vote
Confidence-Weighted Selection
Human Escalation
Quality Scoring
Completeness Check
Consistency Validation
Relevance to Original Query
Cost Optimization
Model Routing in Decomposition
Sonnet for Planning ($3/1M)
Haiku for Sub-Task Execution ($0.25/1M)
Haiku for Result Evaluation ($0.25/1M)
Token Budget Distribution
Proportional to Sub-Task Complexity
Reserve Budget for Synthesis
Early Exit on High Confidence
Architecture — Decomposition Pipeline
flowchart TB
subgraph Input["User Query"]
Q["Find completed seinen manga<br/>under 20 volumes, similar to Berserk,<br/>less graphic, English, under $15/vol"]
end
subgraph Planner["Decomposition Planner (Sonnet — $3/1M)"]
PLAN["Generate Sub-Task Plan"]
VALIDATE["Validate Plan Feasibility"]
end
subgraph Wave1["Wave 1 — Independent Sub-Tasks (Parallel)"]
T1["Sub-Task 1:<br/>Classify Berserk<br/>(genre/themes/tone)<br/>Tool: genre_classifier"]
T2["Sub-Task 2:<br/>Search seinen manga<br/>< 20 volumes, completed<br/>Tool: opensearch_search"]
T3["Sub-Task 3:<br/>Check English availability<br/>filter<br/>Tool: inventory_check"]
end
subgraph Wave2["Wave 2 — Dependent Sub-Tasks"]
T4["Sub-Task 4:<br/>Filter by similarity to Berserk<br/>but less graphic<br/>Uses: T1 + T2 results"]
T5["Sub-Task 5:<br/>Price check < $15/vol<br/>Uses: T2 + T3 results"]
end
subgraph Wave3["Wave 3 — Synthesis"]
T6["Sub-Task 6:<br/>Intersect T4 and T5<br/>Rank by overall match"]
end
subgraph Synthesis["Result Synthesizer (Haiku — $0.25/1M)"]
MERGE["Merge and Rank Results"]
FORMAT["Format User Response"]
end
Q --> PLAN
PLAN --> VALIDATE
VALIDATE --> T1 & T2 & T3
T1 --> T4
T2 --> T4 & T5
T3 --> T5
T4 --> T6
T5 --> T6
T6 --> MERGE
MERGE --> FORMAT
1. Recursive Query Decomposition
1.1 The Decomposition Engine
"""
MangaAssist Query Decomposition Engine — breaks complex user queries
into atomic sub-tasks organized as a directed acyclic graph (DAG).
Uses Sonnet for the initial planning step and Haiku for sub-task execution.
"""
import json
import time
import logging
import hashlib
from typing import Any, Optional
from dataclasses import dataclass, field
from enum import Enum
import boto3
from botocore.config import Config
logger = logging.getLogger("manga_decomposer")
BEDROCK_CONFIG = Config(
retries={"max_attempts": 2, "mode": "adaptive"},
read_timeout=5,
connect_timeout=2,
)
bedrock = boto3.client("bedrock-runtime", config=BEDROCK_CONFIG)
SONNET_MODEL_ID = "anthropic.claude-3-sonnet-20240229-v1:0"
HAIKU_MODEL_ID = "anthropic.claude-3-haiku-20240307-v1:0"
class SubTaskStatus(str, Enum):
PENDING = "pending"
RUNNING = "running"
COMPLETED = "completed"
FAILED = "failed"
SKIPPED = "skipped"
@dataclass
class SubTask:
"""An atomic sub-task in the decomposition DAG."""
task_id: str
description: str
tool_name: str
parameters: dict = field(default_factory=dict)
depends_on: list[str] = field(default_factory=list)
priority: int = 1
status: SubTaskStatus = SubTaskStatus.PENDING
result: Any = None
error: Optional[str] = None
latency_ms: float = 0.0
model_for_eval: str = HAIKU_MODEL_ID # Default to cheap model
@property
def is_terminal(self) -> bool:
return self.status in (
SubTaskStatus.COMPLETED,
SubTaskStatus.FAILED,
SubTaskStatus.SKIPPED,
)
@dataclass
class DecompositionPlan:
"""Complete decomposition plan for a user query."""
query: str
sub_tasks: list[SubTask] = field(default_factory=list)
waves: list[list[str]] = field(default_factory=list) # Task ID groups
total_estimated_latency_ms: float = 0.0
planning_latency_ms: float = 0.0
planning_tokens: dict = field(default_factory=dict)
class QueryDecompositionEngine:
"""
Decomposes complex manga queries into sub-task DAGs.
Uses a two-phase approach:
Phase 1: LLM-based planning (Sonnet) to generate the sub-task list
Phase 2: Topological sort to organize into execution waves
"""
DECOMPOSITION_PROMPT = """You are the query planner for MangaAssist.
Decompose this user query into atomic sub-tasks that can be executed by tools.
USER QUERY: {query}
AVAILABLE TOOLS:
- opensearch_vector_search: Semantic search over manga catalog (params: query, filters)
- dynamodb_product_lookup: Get specific manga details (params: manga_id or title)
- elasticache_rating_cache: Get ratings for manga IDs (params: manga_ids)
- genre_theme_classifier: Classify manga genre/theme/tone (params: description or title)
- inventory_check: Check stock and pricing (params: title_ids)
RULES:
1. Each sub-task must use exactly ONE tool.
2. Mark dependencies: if sub-task B needs output from A, list A as a dependency.
3. Independent sub-tasks should have NO dependencies (they run in parallel).
4. Keep total sub-tasks between 3 and 7 for the latency budget.
5. Priority 1 = most critical, 5 = nice-to-have.
Respond with a JSON array:
[
{{
"task_id": "t1",
"description": "What this sub-task accomplishes",
"tool_name": "tool_name",
"parameters": {{}},
"depends_on": [],
"priority": 1
}}
]"""
async def decompose(self, query: str) -> DecompositionPlan:
"""
Generate a decomposition plan for a complex query.
Returns a plan with sub-tasks organized into execution waves.
"""
start = time.monotonic()
# Phase 1: LLM-based planning
response = bedrock.invoke_model(
modelId=SONNET_MODEL_ID,
contentType="application/json",
accept="application/json",
body=json.dumps({
"anthropic_version": "bedrock-2023-05-31",
"max_tokens": 1000,
"temperature": 0.0,
"messages": [{
"role": "user",
"content": self.DECOMPOSITION_PROMPT.format(query=query),
}],
}),
)
result = json.loads(response["body"].read())
usage = result.get("usage", {})
text = result["content"][0]["text"]
tasks_json = json.loads(text)
planning_ms = (time.monotonic() - start) * 1000
# Build sub-tasks
sub_tasks = []
for t in tasks_json:
sub_tasks.append(SubTask(
task_id=t["task_id"],
description=t["description"],
tool_name=t["tool_name"],
parameters=t.get("parameters", {}),
depends_on=t.get("depends_on", []),
priority=t.get("priority", 3),
))
# Phase 2: Organize into execution waves
waves = self._build_waves(sub_tasks)
plan = DecompositionPlan(
query=query,
sub_tasks=sub_tasks,
waves=waves,
planning_latency_ms=planning_ms,
planning_tokens={
"input": usage.get("input_tokens", 0),
"output": usage.get("output_tokens", 0),
},
)
logger.info(
"Decomposed query into %d sub-tasks across %d waves (%.0fms)",
len(sub_tasks), len(waves), planning_ms,
)
return plan
def _build_waves(self, sub_tasks: list[SubTask]) -> list[list[str]]:
"""
Topological sort of sub-tasks into parallel execution waves.
Tasks in the same wave have no dependencies on each other.
"""
completed = set()
waves = []
remaining = {t.task_id: t for t in sub_tasks}
while remaining:
wave = []
for task_id, task in remaining.items():
if all(dep in completed for dep in task.depends_on):
wave.append(task_id)
if not wave:
# Circular dependency — break by taking highest priority
wave = [min(remaining.keys(), key=lambda tid: remaining[tid].priority)]
logger.warning("Circular dependency detected — breaking with %s", wave[0])
for task_id in wave:
del remaining[task_id]
completed.add(task_id)
waves.append(wave)
return waves
1.2 Complexity-Based Decomposition Routing
"""
Routes queries to either direct execution or decomposition based on
estimated complexity. Simple queries skip the planning step entirely.
"""
class ComplexityRouter:
"""
Estimates query complexity and routes to the appropriate handler.
Simple queries (single-constraint, direct lookup) → Direct execution
Complex queries (multi-constraint, comparison, similarity) → Decomposition
"""
COMPLEXITY_SIGNALS = {
"high": [
"similar to", "like", "but", "except", "compare",
"between", "under", "over", "recommend", "best",
"completed", "ongoing", "volumes", "and also",
],
"medium": [
"find", "search", "available", "price", "in stock",
"genre", "author", "rating",
],
"low": [
"order", "track", "status", "delivery", "MNG-",
"what is", "who is", "when",
],
}
def estimate_complexity(self, query: str) -> str:
"""
Return 'high', 'medium', or 'low' complexity estimate.
High → decompose into sub-tasks
Medium → single ReAct loop
Low → direct tool call
"""
query_lower = query.lower()
high_count = sum(
1 for signal in self.COMPLEXITY_SIGNALS["high"]
if signal in query_lower
)
if high_count >= 2:
return "high"
medium_count = sum(
1 for signal in self.COMPLEXITY_SIGNALS["medium"]
if signal in query_lower
)
if medium_count >= 2 or high_count >= 1:
return "medium"
return "low"
def should_decompose(self, query: str) -> bool:
"""Determine if a query should go through the decomposition pipeline."""
return self.estimate_complexity(query) == "high"
2. Parallel Sub-Task Execution
2.1 Wave Executor with Step Functions
"""
MangaAssist Wave Executor — runs sub-task waves in parallel using
asyncio and Step Functions Map state for production deployment.
"""
import asyncio
import time
import logging
from typing import Any
logger = logging.getLogger("manga_wave_executor")
class WaveExecutor:
"""
Executes decomposition waves, running tasks within each wave in parallel
and waves sequentially. Respects the latency budget.
"""
def __init__(self, tool_dispatcher, budget_ms: float = 2500.0):
self._dispatcher = tool_dispatcher
self._budget_ms = budget_ms
async def execute_plan(self, plan: DecompositionPlan) -> dict:
"""
Execute all waves in the decomposition plan.
Returns aggregated results from all sub-tasks.
"""
start = time.monotonic()
all_results = {}
task_lookup = {t.task_id: t for t in plan.sub_tasks}
for wave_idx, wave_task_ids in enumerate(plan.waves):
elapsed_ms = (time.monotonic() - start) * 1000
remaining_ms = self._budget_ms - elapsed_ms
if remaining_ms <= 200:
logger.warning(
"Budget exhausted before wave %d — skipping %d tasks",
wave_idx, len(wave_task_ids),
)
for tid in wave_task_ids:
task_lookup[tid].status = SubTaskStatus.SKIPPED
continue
# Calculate per-task timeout for this wave
waves_remaining = len(plan.waves) - wave_idx
wave_budget_ms = remaining_ms / waves_remaining
per_task_timeout = wave_budget_ms / max(len(wave_task_ids), 1)
logger.info(
"Wave %d: executing %d tasks (budget: %.0fms, per-task: %.0fms)",
wave_idx, len(wave_task_ids), wave_budget_ms, per_task_timeout,
)
# Execute all tasks in this wave concurrently
wave_tasks = []
for task_id in wave_task_ids:
task = task_lookup[task_id]
# Inject results from dependencies
dep_results = {
dep_id: all_results.get(dep_id)
for dep_id in task.depends_on
if dep_id in all_results
}
wave_tasks.append(
self._execute_task(task, dep_results, per_task_timeout)
)
wave_results = await asyncio.gather(*wave_tasks, return_exceptions=True)
# Collect results
for task_id, result in zip(wave_task_ids, wave_results):
task = task_lookup[task_id]
if isinstance(result, Exception):
task.status = SubTaskStatus.FAILED
task.error = str(result)
all_results[task_id] = {"error": str(result)}
else:
task.status = SubTaskStatus.COMPLETED
task.result = result
all_results[task_id] = result
total_ms = (time.monotonic() - start) * 1000
logger.info(
"Plan executed in %.0fms: %d completed, %d failed, %d skipped",
total_ms,
sum(1 for t in plan.sub_tasks if t.status == SubTaskStatus.COMPLETED),
sum(1 for t in plan.sub_tasks if t.status == SubTaskStatus.FAILED),
sum(1 for t in plan.sub_tasks if t.status == SubTaskStatus.SKIPPED),
)
return all_results
async def _execute_task(
self,
task: SubTask,
dependency_results: dict,
timeout_ms: float,
) -> Any:
"""Execute a single sub-task with timeout enforcement."""
task.status = SubTaskStatus.RUNNING
start = time.monotonic()
# Enrich parameters with dependency results
enriched_params = dict(task.parameters)
if dependency_results:
enriched_params["_dependency_results"] = dependency_results
try:
result = await asyncio.wait_for(
self._dispatcher.dispatch(task.tool_name, enriched_params),
timeout=timeout_ms / 1000,
)
task.latency_ms = (time.monotonic() - start) * 1000
return result
except asyncio.TimeoutError:
task.latency_ms = (time.monotonic() - start) * 1000
raise TimeoutError(
f"Sub-task {task.task_id} ({task.tool_name}) timed out "
f"after {task.latency_ms:.0f}ms"
)
2.2 Step Functions ASL for Parallel Decomposition
{
"Comment": "MangaAssist Parallel Decomposition — Express Workflow",
"StartAt": "DecomposeQuery",
"States": {
"DecomposeQuery": {
"Type": "Task",
"Resource": "arn:aws:lambda:us-east-1:123456789012:function:manga-decompose",
"TimeoutSeconds": 3,
"ResultPath": "$.plan",
"Next": "ExecuteWave1"
},
"ExecuteWave1": {
"Type": "Parallel",
"Branches": [
{
"StartAt": "ClassifyReference",
"States": {
"ClassifyReference": {
"Type": "Task",
"Resource": "arn:aws:lambda:us-east-1:123456789012:function:manga-classify",
"TimeoutSeconds": 2,
"End": true
}
}
},
{
"StartAt": "SearchCatalog",
"States": {
"SearchCatalog": {
"Type": "Task",
"Resource": "arn:aws:lambda:us-east-1:123456789012:function:manga-search",
"TimeoutSeconds": 2,
"End": true
}
}
},
{
"StartAt": "CheckAvailability",
"States": {
"CheckAvailability": {
"Type": "Task",
"Resource": "arn:aws:lambda:us-east-1:123456789012:function:manga-inventory",
"TimeoutSeconds": 2,
"End": true
}
}
}
],
"ResultPath": "$.wave1Results",
"Next": "ExecuteWave2",
"Catch": [
{
"ErrorEquals": ["States.ALL"],
"ResultPath": "$.wave1Error",
"Next": "SynthesizeBestEffort"
}
]
},
"ExecuteWave2": {
"Type": "Task",
"Resource": "arn:aws:lambda:us-east-1:123456789012:function:manga-filter-rank",
"Parameters": {
"wave1Results.$": "$.wave1Results",
"originalQuery.$": "$.plan.query"
},
"TimeoutSeconds": 2,
"ResultPath": "$.wave2Results",
"Next": "SynthesizeResponse"
},
"SynthesizeResponse": {
"Type": "Task",
"Resource": "arn:aws:lambda:us-east-1:123456789012:function:manga-synthesize",
"TimeoutSeconds": 2,
"End": true
},
"SynthesizeBestEffort": {
"Type": "Task",
"Resource": "arn:aws:lambda:us-east-1:123456789012:function:manga-best-effort",
"TimeoutSeconds": 2,
"End": true
}
}
}
3. Result Synthesis and Conflict Resolution
3.1 Result Synthesizer
"""
MangaAssist Result Synthesizer — merges outputs from multiple sub-tasks
into a coherent response. Handles conflicts, missing data, and ranking.
"""
import json
import logging
from dataclasses import dataclass, field
from typing import Any
logger = logging.getLogger("manga_synthesizer")
@dataclass
class SynthesisResult:
"""Final synthesized result from the decomposition pipeline."""
answer: str
confidence: float
sources: list[str]
completeness: float # 0.0 - 1.0
conflicts_resolved: int = 0
sub_tasks_used: int = 0
sub_tasks_failed: int = 0
class ResultSynthesizer:
"""
Merges results from parallel sub-tasks into a final answer.
Strategies:
- UNION: Combine all results (for search tasks)
- INTERSECTION: Keep only items appearing in all results (for filter tasks)
- RANKED_MERGE: Score-weighted combination (for recommendation tasks)
"""
def synthesize(
self,
query: str,
sub_task_results: dict[str, Any],
plan: DecompositionPlan,
) -> SynthesisResult:
"""
Synthesize a final answer from sub-task results.
"""
# Collect all manga candidates from search/classification tasks
candidates = self._extract_candidates(sub_task_results)
# Apply filters from filter tasks
filtered = self._apply_filters(candidates, sub_task_results)
# Resolve conflicts between sub-task results
resolved, conflict_count = self._resolve_conflicts(filtered, sub_task_results)
# Rank the final results
ranked = self._rank_results(resolved, query)
# Count task status
completed = sum(
1 for t in plan.sub_tasks if t.status == SubTaskStatus.COMPLETED
)
failed = sum(
1 for t in plan.sub_tasks if t.status == SubTaskStatus.FAILED
)
total = len(plan.sub_tasks)
completeness = completed / total if total > 0 else 0.0
# Generate the response text
answer = self._format_answer(ranked, query, completeness)
return SynthesisResult(
answer=answer,
confidence=min(completeness, 0.9 if conflict_count == 0 else 0.7),
sources=[t.tool_name for t in plan.sub_tasks if t.status == SubTaskStatus.COMPLETED],
completeness=completeness,
conflicts_resolved=conflict_count,
sub_tasks_used=completed,
sub_tasks_failed=failed,
)
def _extract_candidates(self, results: dict[str, Any]) -> list[dict]:
"""Extract manga candidates from search-type sub-task results."""
candidates = []
for task_id, result in results.items():
if isinstance(result, dict):
# Handle search results
for item in result.get("matches", []):
item["_source_task"] = task_id
candidates.append(item)
for item in result.get("results", []):
item["_source_task"] = task_id
candidates.append(item)
return candidates
def _apply_filters(
self, candidates: list[dict], results: dict[str, Any]
) -> list[dict]:
"""Apply filter constraints from filter-type sub-tasks."""
filtered = list(candidates)
for task_id, result in results.items():
if isinstance(result, dict) and "available_ids" in result:
available = set(result["available_ids"])
filtered = [c for c in filtered if c.get("title_id") in available]
if isinstance(result, dict) and "price_data" in result:
price_data = result["price_data"]
filtered = [
c for c in filtered
if c.get("title_id") in price_data
and price_data[c["title_id"]].get("price_usd", 999) <= 15.0
]
return filtered
def _resolve_conflicts(
self, candidates: list[dict], results: dict[str, Any]
) -> tuple[list[dict], int]:
"""
Resolve conflicts when sub-tasks disagree.
Example: search says title X is relevant, but classifier says
its genre does not match.
"""
conflicts = 0
resolved = []
for candidate in candidates:
# Check if any classification result contradicts the search result
title_id = candidate.get("title_id", "")
classification = results.get("classification", {})
if classification and title_id:
classified_tone = classification.get(title_id, {}).get("tone", "")
candidate_tone = candidate.get("tone", "")
if classified_tone and candidate_tone and classified_tone != candidate_tone:
conflicts += 1
# Prefer the classification result (more specific)
candidate["tone"] = classified_tone
resolved.append(candidate)
if conflicts > 0:
logger.info("Resolved %d conflicts between sub-task results", conflicts)
return resolved, conflicts
def _rank_results(self, candidates: list[dict], query: str) -> list[dict]:
"""Rank candidates by relevance score."""
for c in candidates:
# Compute composite score
search_score = c.get("score", 0.0)
rating = c.get("rating", 0.0) / 5.0 # Normalize to 0-1
in_stock = 1.0 if c.get("in_stock", False) else 0.3
c["_composite_score"] = (
0.5 * search_score + 0.3 * rating + 0.2 * in_stock
)
candidates.sort(key=lambda c: c.get("_composite_score", 0), reverse=True)
return candidates[:5] # Top 5
def _format_answer(
self, ranked: list[dict], query: str, completeness: float
) -> str:
"""Format the ranked results into a user-friendly response."""
if not ranked:
return (
"I searched our catalog but could not find manga matching all "
"your criteria. Try relaxing some constraints (e.g., remove the "
"volume limit or price cap) and I will search again."
)
lines = [f"Here are my top recommendations based on your criteria:\n"]
for i, manga in enumerate(ranked, 1):
title = manga.get("title", "Unknown")
author = manga.get("author", "Unknown")
volumes = manga.get("volumes", "?")
price = manga.get("price_usd", "?")
rating = manga.get("rating", "?")
lines.append(
f"{i}. **{title}** by {author}\n"
f" - {volumes} volumes | ${price}/vol | Rating: {rating}/5\n"
f" - {manga.get('synopsis', 'No synopsis available')[:150]}..."
)
if completeness < 0.8:
lines.append(
"\n*Note: Some search results may be incomplete due to "
"temporary service delays. Try again for more complete results.*"
)
return "\n".join(lines)
4. Cost-Optimized Model Routing in Decomposition
4.1 Per-Phase Model Selection
"""
MangaAssist cost-optimized model routing — assigns the cheapest
capable model to each phase of the decomposition pipeline.
Cost breakdown at 1M messages/day (30% go through decomposition):
- Planning (Sonnet): 300K * ~500 input tokens * $3/1M = $450/day
- Sub-tasks (Haiku): 300K * 5 tasks * ~200 tokens * $0.25/1M = $75/day
- Synthesis (Haiku): 300K * ~300 tokens * $0.25/1M = $22.50/day
- Total decomposition: ~$547.50/day
vs. All-Sonnet: 300K * 6 calls * ~400 tokens * $3/1M = $2,160/day
Savings: ~$1,612.50/day (75% reduction)
"""
class DecompositionModelRouter:
"""
Assigns models to each phase of the decomposition pipeline.
Planning uses Sonnet (better at complex plan generation).
Execution and synthesis use Haiku (cheaper, adequate quality).
"""
MODEL_ASSIGNMENTS = {
"planning": {
"model_id": SONNET_MODEL_ID,
"max_tokens": 1000,
"temperature": 0.0,
"rationale": "Complex plan generation needs stronger reasoning",
},
"sub_task_execution": {
"model_id": HAIKU_MODEL_ID,
"max_tokens": 400,
"temperature": 0.0,
"rationale": "Sub-tasks are atomic and well-scoped; Haiku suffices",
},
"observation_eval": {
"model_id": HAIKU_MODEL_ID,
"max_tokens": 300,
"temperature": 0.0,
"rationale": "Evaluating tool output is straightforward classification",
},
"synthesis": {
"model_id": HAIKU_MODEL_ID,
"max_tokens": 600,
"temperature": 0.3,
"rationale": "Merging results and generating response; Haiku is adequate",
},
"conflict_resolution": {
"model_id": SONNET_MODEL_ID,
"max_tokens": 500,
"temperature": 0.0,
"rationale": "Resolving contradictions needs stronger reasoning",
},
}
def get_config(self, phase: str) -> dict:
"""Get the model configuration for a decomposition phase."""
return self.MODEL_ASSIGNMENTS.get(phase, self.MODEL_ASSIGNMENTS["sub_task_execution"])
def estimate_cost(self, plan: DecompositionPlan) -> dict:
"""
Estimate the token cost for executing a decomposition plan.
"""
planning_config = self.get_config("planning")
exec_config = self.get_config("sub_task_execution")
synth_config = self.get_config("synthesis")
# Estimate token usage per phase
planning_input = 500 # System prompt + query + tool descriptions
planning_output = plan.planning_tokens.get("output", 300)
exec_input_per_task = 200
exec_output_per_task = 150
num_tasks = len(plan.sub_tasks)
synth_input = 300 + (num_tasks * 100) # Grows with number of results
synth_output = 400
# Calculate costs
sonnet_input_rate = 3.0 / 1_000_000
sonnet_output_rate = 15.0 / 1_000_000
haiku_input_rate = 0.25 / 1_000_000
haiku_output_rate = 1.25 / 1_000_000
planning_cost = (
planning_input * sonnet_input_rate +
planning_output * sonnet_output_rate
)
exec_cost = num_tasks * (
exec_input_per_task * haiku_input_rate +
exec_output_per_task * haiku_output_rate
)
synth_cost = (
synth_input * haiku_input_rate +
synth_output * haiku_output_rate
)
return {
"planning_cost_usd": round(planning_cost, 6),
"execution_cost_usd": round(exec_cost, 6),
"synthesis_cost_usd": round(synth_cost, 6),
"total_cost_usd": round(planning_cost + exec_cost + synth_cost, 6),
"num_tasks": num_tasks,
}
5. Comparison — Decomposition Strategies
| Dimension | Recursive Decomposition | Parallel Fan-Out | Sequential Refinement | Hierarchical Planning |
|---|---|---|---|---|
| Core Idea | Break query into nested sub-queries recursively | Run all independent sub-tasks simultaneously | Each step narrows the solution space | Generate a plan first, then execute |
| Best For | Multi-constraint queries with nested logic | Independent data lookups from multiple sources | Progressively refining search results | Complex workflows with known patterns |
| Latency | Medium — depth determines latency | Low — bounded by slowest parallel task | High — sequential steps add up | Medium — planning cost + execution |
| Token Cost | High — planning at each recursion level | Low — single planning step, parallel execution | Medium — each step uses tokens | Medium — one planning call + execution |
| MangaAssist Example | "Find seinen manga similar to Berserk but less graphic, completed, under 20 volumes" | "Get ratings + inventory + prices for 10 titles" | "Search broadly → filter by genre → filter by price → rank" | "Create reading list: pick genre, find 5 titles, check stock, format list" |
| Error Recovery | Good — skip failed sub-tree, proceed with siblings | Good — partial results from successful tasks | Poor — failure in early step blocks later steps | Moderate — re-plan on failure |
| Max Parallelism | Limited by dependency depth | Maximum — all tasks run at once | None — strictly sequential | Defined by plan structure |
| AWS Implementation | Step Functions with nested workflows | Step Functions Parallel state | Step Functions sequential Task states | Step Functions with planning Lambda + Map state |
| When to Use | >3 constraints with logical dependencies | >3 independent data sources needed | Clear "funnel" pattern in data | Well-understood multi-step workflows |
6. Key Takeaways
-
Decomposition saves latency through parallelism — a 6-step sequential pipeline takes ~3s, but decomposed into 3 waves of 2 parallel tasks, it takes ~1.5s. For MangaAssist's 3-second budget, this is the difference between success and timeout.
-
Use Sonnet for planning, Haiku for everything else — the planning step needs strong reasoning to generate a good sub-task DAG. Execution, evaluation, and synthesis are simpler tasks where Haiku performs adequately. This cuts decomposition costs by ~75%.
-
Wave-based execution with budget tracking is essential — each wave gets a proportional share of the remaining latency budget. If Wave 1 is slow, Wave 2 gets a tighter timeout, and Wave 3 may be skipped entirely with a best-effort synthesis.
-
Result synthesis must handle missing data gracefully — at least one sub-task will fail or time out in ~5% of decompositions. The synthesizer should produce a useful answer from whatever data is available, with a completeness disclaimer if needed.
-
Conflict resolution between parallel sub-tasks is a real problem — when a search tool says a manga is relevant but a classifier says its genre does not match, someone must decide. Default to the more specific tool (classifier over search).
-
Complexity routing avoids unnecessary overhead — only ~30% of MangaAssist queries need decomposition. Simple queries ("Where is my order?") should go straight to a single tool call, saving the ~200ms planning step and its Sonnet token cost.
-
Step Functions Express Workflows are the production orchestrator — standard workflows have per-state-transition costs and latency that exceed the budget. Express Workflows support the Parallel state for wave execution and have sub-millisecond transition overhead.
Next file: 03-scenarios-and-runbooks.md — Five production scenarios for problem-solving system failures.