PO-07: Orchestrator Concurrency and Throughput
User Story
As a platform engineer, I want to scale the orchestrator layer to handle 10,000+ concurrent chat sessions with sub-100ms orchestration overhead, So that the chatbot remains responsive and stable under peak traffic without degraded user experience.
Acceptance Criteria
- Orchestrator supports 10,000+ concurrent chat sessions across the ECS cluster.
- Orchestration overhead (excluding downstream calls) is under 100ms p95.
- Auto-scaling responds to traffic spikes within 60 seconds.
- Circuit breakers prevent cascade failures when a downstream service is degraded.
- Graceful degradation serves cached or fallback responses when downstream services fail.
- Connection pool exhaustion rate is below 0.01%.
High-Level Design
Orchestrator Architecture
graph TD
subgraph "Traffic Ingestion"
A[CloudFront] --> B[ALB]
B --> C[ECS Fargate Cluster<br>Auto-scaling: CPU + Request Count]
end
subgraph "Orchestrator (per task)"
C --> D[Request Router]
D --> E[Async Fan-Out Controller]
E --> F[Intent Classifier<br>SageMaker]
E --> G[Context Assembly<br>DynamoDB + Redis]
E --> H[RAG Pipeline<br>OpenSearch]
F --> I[Response Generator<br>Bedrock]
G --> I
H --> I
end
subgraph "Resilience"
J[Circuit Breaker<br>per downstream]
K[Retry with Backoff<br>per call]
L[Fallback Handler<br>cached / default responses]
end
E --> J
J --> K
K --> L
Scaling Strategy
gantt
title Scaling Response Timeline
dateFormat X
axisFormat %ss
section Detection
CloudWatch metric breach :0, 5
Scaling policy evaluates :5, 10
section ECS Scaling
Task provisioning starts :10, 20
Container image pull :20, 35
Health check passes :35, 45
Task receives traffic :45, 50
section Total
End-to-end scaling :0, 50
Low-Level Design
1. Async Fan-Out Controller
The orchestrator runs independent downstream calls concurrently using asyncio.gather, reducing total latency from sequential sum to the maximum of parallel branches.
sequenceDiagram
participant Client
participant Orchestrator
participant IntentSvc as Intent Classifier
participant MemorySvc as Conversation Memory
participant CacheSvc as Cache Layer
Client->>Orchestrator: User message
par Parallel Fan-Out
Orchestrator->>IntentSvc: Classify intent
Orchestrator->>MemorySvc: Fetch conversation history
Orchestrator->>CacheSvc: Check response cache
end
IntentSvc-->>Orchestrator: Intent result (~35ms)
MemorySvc-->>Orchestrator: History (~10ms)
CacheSvc-->>Orchestrator: Cache miss (~2ms)
Note over Orchestrator: Total parallel = max(35, 10, 2) = 35ms<br>vs sequential = 47ms
Orchestrator->>Orchestrator: Assemble context + route
Code Example: Async Fan-Out Controller
import asyncio
import logging
import time
from dataclasses import dataclass
from typing import Any, Optional
logger = logging.getLogger(__name__)
@dataclass
class FanOutResult:
intent: Optional[dict] = None
conversation_history: Optional[list] = None
cache_hit: Optional[dict] = None
rag_results: Optional[list] = None
errors: dict = None
def __post_init__(self):
if self.errors is None:
self.errors = {}
class AsyncFanOutController:
"""Executes independent downstream calls concurrently."""
def __init__(
self,
intent_service,
memory_service,
cache_service,
rag_service,
):
self.intent_service = intent_service
self.memory_service = memory_service
self.cache_service = cache_service
self.rag_service = rag_service
async def execute(
self, session_id: str, user_message: str
) -> FanOutResult:
"""Run all independent lookups in parallel."""
start = time.monotonic()
result = FanOutResult()
# Define parallel tasks
tasks = {
"intent": self._safe_call(
"intent",
self.intent_service.classify(user_message),
),
"history": self._safe_call(
"history",
self.memory_service.get_recent(session_id, limit=10),
),
"cache": self._safe_call(
"cache",
self.cache_service.get_response(user_message),
),
}
# Execute all in parallel
outcomes = await asyncio.gather(
*tasks.values(), return_exceptions=False
)
task_names = list(tasks.keys())
for i, outcome in enumerate(outcomes):
name = task_names[i]
if isinstance(outcome, Exception):
result.errors[name] = str(outcome)
logger.warning(f"Fan-out task '{name}' failed: {outcome}")
elif name == "intent":
result.intent = outcome
elif name == "history":
result.conversation_history = outcome
elif name == "cache":
result.cache_hit = outcome
# Phase 2: RAG depends on intent (sequential after fan-out)
if result.intent and result.cache_hit is None:
rag_outcome = await self._safe_call(
"rag",
self.rag_service.retrieve(
user_message, intent=result.intent.get("intent")
),
)
if isinstance(rag_outcome, Exception):
result.errors["rag"] = str(rag_outcome)
else:
result.rag_results = rag_outcome
elapsed = (time.monotonic() - start) * 1000
logger.info(f"Fan-out completed in {elapsed:.1f}ms")
return result
async def _safe_call(
self, name: str, coro
) -> Any:
"""Execute a coroutine with error isolation."""
try:
return await asyncio.wait_for(coro, timeout=5.0)
except asyncio.TimeoutError:
logger.error(f"Task '{name}' timed out")
return Exception(f"{name} timed out")
except Exception as e:
logger.error(f"Task '{name}' failed: {e}")
return e
2. Circuit Breaker
Prevent cascading failures by tracking downstream error rates and opening the circuit when a threshold is breached.
stateDiagram-v2
[*] --> Closed
Closed --> Open : Failure rate > 50%<br>in 10s window
Open --> HalfOpen : After 30s cooldown
HalfOpen --> Closed : Probe succeeds
HalfOpen --> Open : Probe fails
state Closed {
[*] --> TrackingFailures
TrackingFailures --> TrackingFailures : Record success/failure
}
state Open {
[*] --> RejectAll
RejectAll --> RejectAll : Return fallback immediately
}
state HalfOpen {
[*] --> SingleProbe
SingleProbe --> SingleProbe : Allow 1 request through
}
Code Example: Circuit Breaker
import asyncio
import time
from dataclasses import dataclass
from enum import Enum
from typing import Any, Callable
class CircuitState(Enum):
CLOSED = "closed"
OPEN = "open"
HALF_OPEN = "half_open"
@dataclass
class CircuitStats:
successes: int = 0
failures: int = 0
last_failure_time: float = 0.0
consecutive_successes_in_half_open: int = 0
@property
def total(self) -> int:
return self.successes + self.failures
@property
def failure_rate(self) -> float:
if self.total == 0:
return 0.0
return self.failures / self.total
class CircuitBreaker:
"""Circuit breaker with configurable thresholds and fallback."""
def __init__(
self,
name: str,
failure_threshold: float = 0.5,
window_size: int = 20,
cooldown_seconds: float = 30.0,
half_open_max_probes: int = 3,
fallback_fn: Callable | None = None,
):
self.name = name
self.failure_threshold = failure_threshold
self.window_size = window_size
self.cooldown_seconds = cooldown_seconds
self.half_open_max_probes = half_open_max_probes
self.fallback_fn = fallback_fn
self._state = CircuitState.CLOSED
self._stats = CircuitStats()
self._opened_at: float = 0.0
self._lock = asyncio.Lock()
@property
def state(self) -> CircuitState:
if self._state == CircuitState.OPEN:
if time.monotonic() - self._opened_at >= self.cooldown_seconds:
self._state = CircuitState.HALF_OPEN
self._stats.consecutive_successes_in_half_open = 0
return self._state
async def call(self, coro) -> Any:
"""Execute a coroutine through the circuit breaker."""
current_state = self.state
if current_state == CircuitState.OPEN:
if self.fallback_fn:
return await self.fallback_fn()
raise CircuitOpenError(
f"Circuit '{self.name}' is open"
)
try:
result = await coro
await self._record_success()
return result
except Exception as e:
await self._record_failure()
if self.fallback_fn and self.state == CircuitState.OPEN:
return await self.fallback_fn()
raise
async def _record_success(self) -> None:
async with self._lock:
self._stats.successes += 1
if self._state == CircuitState.HALF_OPEN:
self._stats.consecutive_successes_in_half_open += 1
if (
self._stats.consecutive_successes_in_half_open
>= self.half_open_max_probes
):
self._state = CircuitState.CLOSED
self._stats = CircuitStats()
async def _record_failure(self) -> None:
async with self._lock:
self._stats.failures += 1
self._stats.last_failure_time = time.monotonic()
if self._state == CircuitState.HALF_OPEN:
self._state = CircuitState.OPEN
self._opened_at = time.monotonic()
elif (
self._stats.total >= self.window_size
and self._stats.failure_rate >= self.failure_threshold
):
self._state = CircuitState.OPEN
self._opened_at = time.monotonic()
self._stats = CircuitStats()
class CircuitOpenError(Exception):
pass
3. ECS Auto-Scaling Configuration
graph TD
subgraph "Scaling Policies"
A[Target Tracking<br>CPU Utilization: 65%]
B[Target Tracking<br>Request Count / Target: 500]
C[Step Scaling<br>Active WebSocket > 8K]
end
subgraph "Scale Limits"
D[Min: 3 tasks<br>Desired: 5 tasks<br>Max: 50 tasks]
end
subgraph "Scheduled Scaling"
E[Mon-Fri 6AM: Min 10]
F[Mon-Fri 10PM: Min 3]
G[Events / Sales: Min 20]
end
A --> H[ECS Service<br>Auto-scaling]
B --> H
C --> H
D --> H
E --> H
F --> H
Code Example: Auto-Scaling Configuration (CDK-style)
"""ECS Auto-scaling configuration expressed as infrastructure-as-code."""
def configure_ecs_autoscaling(service, cluster_name: str) -> dict:
"""Return auto-scaling policies for the orchestrator ECS service."""
return {
"service": {
"cluster": cluster_name,
"service_name": "mangaassist-orchestrator",
"desired_count": 5,
"deployment_configuration": {
"minimum_healthy_percent": 100,
"maximum_percent": 200,
},
},
"scaling": {
"min_capacity": 3,
"max_capacity": 50,
"policies": [
{
"name": "cpu-target-tracking",
"type": "TargetTrackingScaling",
"metric": "ECSServiceAverageCPUUtilization",
"target_value": 65.0,
"scale_in_cooldown": 300,
"scale_out_cooldown": 60,
},
{
"name": "request-count-target-tracking",
"type": "TargetTrackingScaling",
"metric": "ALBRequestCountPerTarget",
"target_value": 500,
"scale_in_cooldown": 300,
"scale_out_cooldown": 60,
},
{
"name": "websocket-connections-step",
"type": "StepScaling",
"metric": "ActiveWebSocketConnections",
"steps": [
{"lower": 8000, "upper": 9000, "adjustment": 2},
{"lower": 9000, "upper": 10000, "adjustment": 4},
{"lower": 10000, "adjustment": 6},
],
},
],
"scheduled_actions": [
{
"name": "weekday-peak",
"schedule": "cron(0 6 ? * MON-FRI *)",
"min_capacity": 10,
},
{
"name": "weekday-off-peak",
"schedule": "cron(0 22 ? * MON-FRI *)",
"min_capacity": 3,
},
],
},
}
4. Connection Pool Manager
Prevent connection exhaustion across downstream HTTP and database clients.
graph TD
subgraph "Orchestrator Task"
A[Request Handler] --> B[Connection Pool Manager]
B --> C[HTTP Pool: SageMaker<br>max=50, idle_timeout=120s]
B --> D[HTTP Pool: Bedrock<br>max=100, idle_timeout=120s]
B --> E[DynamoDB Pool<br>max=50, via boto3 session]
B --> F[Redis Pool<br>max=100, min_idle=10]
B --> G[OpenSearch Pool<br>max=30]
end
subgraph "Health"
H[Pool Utilization<br>CloudWatch metric]
I{> 80% used?}
H --> I
I -->|Yes| J[Warn + Pre-scale]
end
Code Example: Connection Pool Manager
import asyncio
import logging
from dataclasses import dataclass, field
import aiohttp
import aioboto3
import redis.asyncio as aioredis
logger = logging.getLogger(__name__)
@dataclass
class PoolConfig:
name: str
max_connections: int
min_idle: int = 0
idle_timeout_seconds: int = 120
connect_timeout_seconds: float = 5.0
class ConnectionPoolManager:
"""Centralized connection pool management for all downstream services."""
def __init__(self):
self._http_sessions: dict[str, aiohttp.ClientSession] = {}
self._redis_pool: aioredis.ConnectionPool | None = None
self._initialized = False
async def initialize(
self,
http_pools: list[PoolConfig],
redis_config: PoolConfig,
) -> None:
"""Initialize all connection pools at startup."""
for pool_cfg in http_pools:
connector = aiohttp.TCPConnector(
limit=pool_cfg.max_connections,
keepalive_timeout=pool_cfg.idle_timeout_seconds,
enable_cleanup_closed=True,
)
timeout = aiohttp.ClientTimeout(
total=30,
connect=pool_cfg.connect_timeout_seconds,
)
session = aiohttp.ClientSession(
connector=connector, timeout=timeout
)
self._http_sessions[pool_cfg.name] = session
logger.info(
f"HTTP pool '{pool_cfg.name}' initialized: "
f"max={pool_cfg.max_connections}"
)
self._redis_pool = aioredis.ConnectionPool(
max_connections=redis_config.max_connections,
socket_timeout=redis_config.connect_timeout_seconds,
socket_connect_timeout=redis_config.connect_timeout_seconds,
)
logger.info(
f"Redis pool initialized: max={redis_config.max_connections}"
)
self._initialized = True
def get_http_session(self, name: str) -> aiohttp.ClientSession:
"""Get the HTTP session for a named downstream service."""
session = self._http_sessions.get(name)
if session is None:
raise ValueError(f"No HTTP pool configured for '{name}'")
return session
def get_redis_pool(self) -> aioredis.ConnectionPool:
"""Get the shared Redis connection pool."""
if self._redis_pool is None:
raise RuntimeError("Redis pool not initialized")
return self._redis_pool
async def get_pool_stats(self) -> dict:
"""Report connection pool utilization for monitoring."""
stats = {}
for name, session in self._http_sessions.items():
connector = session.connector
if isinstance(connector, aiohttp.TCPConnector):
stats[name] = {
"limit": connector.limit,
"active": len(connector._acquired),
"utilization_pct": (
len(connector._acquired) / connector.limit * 100
if connector.limit > 0
else 0
),
}
return stats
async def shutdown(self) -> None:
"""Gracefully close all pools."""
for name, session in self._http_sessions.items():
await session.close()
logger.info(f"HTTP pool '{name}' closed")
if self._redis_pool:
await self._redis_pool.disconnect()
logger.info("Redis pool closed")
Metrics and Monitoring
| Metric | Target | Alarm Threshold |
|---|---|---|
orchestrator.overhead_ms |
p95 < 100ms | p95 > 200ms |
orchestrator.concurrent_sessions |
Monitor trend | > capacity × 0.85 |
orchestrator.fanout_parallel_ms |
p95 < 50ms | p95 > 100ms |
circuit_breaker.open_count |
0 in steady state | Any circuit open > 2 min |
circuit_breaker.fallback_invocations |
Monitor trend | > 100/min |
connection_pool.utilization_pct |
< 70% per pool | > 85% per pool |
ecs.scaling_response_time_s |
< 60s | > 120s |
ecs.task_count |
Within scheduled bounds | > max_capacity × 0.9 |
graph LR
subgraph "Orchestrator Dashboard"
A[Fan-Out Time]
B[Circuit Breaker<br>State per service]
C[Pool Utilization %]
D[Active ECS Tasks]
end
A --> E{> 100ms?}
E -->|Yes| F[Check downstream<br>latency]
B --> G{Any Open?}
G -->|Yes| H[Page on-call +<br>serve fallback]
C --> I{> 85%?}
I -->|Yes| J[Increase pool<br>or scale tasks]