LOCAL PREVIEW View on GitHub

PO-07: Orchestrator Concurrency and Throughput

User Story

As a platform engineer, I want to scale the orchestrator layer to handle 10,000+ concurrent chat sessions with sub-100ms orchestration overhead, So that the chatbot remains responsive and stable under peak traffic without degraded user experience.

Acceptance Criteria

  • Orchestrator supports 10,000+ concurrent chat sessions across the ECS cluster.
  • Orchestration overhead (excluding downstream calls) is under 100ms p95.
  • Auto-scaling responds to traffic spikes within 60 seconds.
  • Circuit breakers prevent cascade failures when a downstream service is degraded.
  • Graceful degradation serves cached or fallback responses when downstream services fail.
  • Connection pool exhaustion rate is below 0.01%.

High-Level Design

Orchestrator Architecture

graph TD
    subgraph "Traffic Ingestion"
        A[CloudFront] --> B[ALB]
        B --> C[ECS Fargate Cluster<br>Auto-scaling: CPU + Request Count]
    end

    subgraph "Orchestrator (per task)"
        C --> D[Request Router]
        D --> E[Async Fan-Out Controller]
        E --> F[Intent Classifier<br>SageMaker]
        E --> G[Context Assembly<br>DynamoDB + Redis]
        E --> H[RAG Pipeline<br>OpenSearch]
        F --> I[Response Generator<br>Bedrock]
        G --> I
        H --> I
    end

    subgraph "Resilience"
        J[Circuit Breaker<br>per downstream]
        K[Retry with Backoff<br>per call]
        L[Fallback Handler<br>cached / default responses]
    end

    E --> J
    J --> K
    K --> L

Scaling Strategy

gantt
    title Scaling Response Timeline
    dateFormat  X
    axisFormat  %ss

    section Detection
    CloudWatch metric breach     :0, 5
    Scaling policy evaluates     :5, 10

    section ECS Scaling
    Task provisioning starts     :10, 20
    Container image pull         :20, 35
    Health check passes          :35, 45
    Task receives traffic        :45, 50

    section Total
    End-to-end scaling           :0, 50

Low-Level Design

1. Async Fan-Out Controller

The orchestrator runs independent downstream calls concurrently using asyncio.gather, reducing total latency from sequential sum to the maximum of parallel branches.

sequenceDiagram
    participant Client
    participant Orchestrator
    participant IntentSvc as Intent Classifier
    participant MemorySvc as Conversation Memory
    participant CacheSvc as Cache Layer

    Client->>Orchestrator: User message

    par Parallel Fan-Out
        Orchestrator->>IntentSvc: Classify intent
        Orchestrator->>MemorySvc: Fetch conversation history
        Orchestrator->>CacheSvc: Check response cache
    end

    IntentSvc-->>Orchestrator: Intent result (~35ms)
    MemorySvc-->>Orchestrator: History (~10ms)
    CacheSvc-->>Orchestrator: Cache miss (~2ms)

    Note over Orchestrator: Total parallel = max(35, 10, 2) = 35ms<br>vs sequential = 47ms

    Orchestrator->>Orchestrator: Assemble context + route

Code Example: Async Fan-Out Controller

import asyncio
import logging
import time
from dataclasses import dataclass
from typing import Any, Optional

logger = logging.getLogger(__name__)


@dataclass
class FanOutResult:
    intent: Optional[dict] = None
    conversation_history: Optional[list] = None
    cache_hit: Optional[dict] = None
    rag_results: Optional[list] = None
    errors: dict = None

    def __post_init__(self):
        if self.errors is None:
            self.errors = {}


class AsyncFanOutController:
    """Executes independent downstream calls concurrently."""

    def __init__(
        self,
        intent_service,
        memory_service,
        cache_service,
        rag_service,
    ):
        self.intent_service = intent_service
        self.memory_service = memory_service
        self.cache_service = cache_service
        self.rag_service = rag_service

    async def execute(
        self, session_id: str, user_message: str
    ) -> FanOutResult:
        """Run all independent lookups in parallel."""
        start = time.monotonic()
        result = FanOutResult()

        # Define parallel tasks
        tasks = {
            "intent": self._safe_call(
                "intent",
                self.intent_service.classify(user_message),
            ),
            "history": self._safe_call(
                "history",
                self.memory_service.get_recent(session_id, limit=10),
            ),
            "cache": self._safe_call(
                "cache",
                self.cache_service.get_response(user_message),
            ),
        }

        # Execute all in parallel
        outcomes = await asyncio.gather(
            *tasks.values(), return_exceptions=False
        )

        task_names = list(tasks.keys())
        for i, outcome in enumerate(outcomes):
            name = task_names[i]
            if isinstance(outcome, Exception):
                result.errors[name] = str(outcome)
                logger.warning(f"Fan-out task '{name}' failed: {outcome}")
            elif name == "intent":
                result.intent = outcome
            elif name == "history":
                result.conversation_history = outcome
            elif name == "cache":
                result.cache_hit = outcome

        # Phase 2: RAG depends on intent (sequential after fan-out)
        if result.intent and result.cache_hit is None:
            rag_outcome = await self._safe_call(
                "rag",
                self.rag_service.retrieve(
                    user_message, intent=result.intent.get("intent")
                ),
            )
            if isinstance(rag_outcome, Exception):
                result.errors["rag"] = str(rag_outcome)
            else:
                result.rag_results = rag_outcome

        elapsed = (time.monotonic() - start) * 1000
        logger.info(f"Fan-out completed in {elapsed:.1f}ms")
        return result

    async def _safe_call(
        self, name: str, coro
    ) -> Any:
        """Execute a coroutine with error isolation."""
        try:
            return await asyncio.wait_for(coro, timeout=5.0)
        except asyncio.TimeoutError:
            logger.error(f"Task '{name}' timed out")
            return Exception(f"{name} timed out")
        except Exception as e:
            logger.error(f"Task '{name}' failed: {e}")
            return e

2. Circuit Breaker

Prevent cascading failures by tracking downstream error rates and opening the circuit when a threshold is breached.

stateDiagram-v2
    [*] --> Closed
    Closed --> Open : Failure rate > 50%<br>in 10s window
    Open --> HalfOpen : After 30s cooldown
    HalfOpen --> Closed : Probe succeeds
    HalfOpen --> Open : Probe fails

    state Closed {
        [*] --> TrackingFailures
        TrackingFailures --> TrackingFailures : Record success/failure
    }

    state Open {
        [*] --> RejectAll
        RejectAll --> RejectAll : Return fallback immediately
    }

    state HalfOpen {
        [*] --> SingleProbe
        SingleProbe --> SingleProbe : Allow 1 request through
    }

Code Example: Circuit Breaker

import asyncio
import time
from dataclasses import dataclass
from enum import Enum
from typing import Any, Callable


class CircuitState(Enum):
    CLOSED = "closed"
    OPEN = "open"
    HALF_OPEN = "half_open"


@dataclass
class CircuitStats:
    successes: int = 0
    failures: int = 0
    last_failure_time: float = 0.0
    consecutive_successes_in_half_open: int = 0

    @property
    def total(self) -> int:
        return self.successes + self.failures

    @property
    def failure_rate(self) -> float:
        if self.total == 0:
            return 0.0
        return self.failures / self.total


class CircuitBreaker:
    """Circuit breaker with configurable thresholds and fallback."""

    def __init__(
        self,
        name: str,
        failure_threshold: float = 0.5,
        window_size: int = 20,
        cooldown_seconds: float = 30.0,
        half_open_max_probes: int = 3,
        fallback_fn: Callable | None = None,
    ):
        self.name = name
        self.failure_threshold = failure_threshold
        self.window_size = window_size
        self.cooldown_seconds = cooldown_seconds
        self.half_open_max_probes = half_open_max_probes
        self.fallback_fn = fallback_fn

        self._state = CircuitState.CLOSED
        self._stats = CircuitStats()
        self._opened_at: float = 0.0
        self._lock = asyncio.Lock()

    @property
    def state(self) -> CircuitState:
        if self._state == CircuitState.OPEN:
            if time.monotonic() - self._opened_at >= self.cooldown_seconds:
                self._state = CircuitState.HALF_OPEN
                self._stats.consecutive_successes_in_half_open = 0
        return self._state

    async def call(self, coro) -> Any:
        """Execute a coroutine through the circuit breaker."""
        current_state = self.state

        if current_state == CircuitState.OPEN:
            if self.fallback_fn:
                return await self.fallback_fn()
            raise CircuitOpenError(
                f"Circuit '{self.name}' is open"
            )

        try:
            result = await coro
            await self._record_success()
            return result
        except Exception as e:
            await self._record_failure()
            if self.fallback_fn and self.state == CircuitState.OPEN:
                return await self.fallback_fn()
            raise

    async def _record_success(self) -> None:
        async with self._lock:
            self._stats.successes += 1
            if self._state == CircuitState.HALF_OPEN:
                self._stats.consecutive_successes_in_half_open += 1
                if (
                    self._stats.consecutive_successes_in_half_open
                    >= self.half_open_max_probes
                ):
                    self._state = CircuitState.CLOSED
                    self._stats = CircuitStats()

    async def _record_failure(self) -> None:
        async with self._lock:
            self._stats.failures += 1
            self._stats.last_failure_time = time.monotonic()

            if self._state == CircuitState.HALF_OPEN:
                self._state = CircuitState.OPEN
                self._opened_at = time.monotonic()
            elif (
                self._stats.total >= self.window_size
                and self._stats.failure_rate >= self.failure_threshold
            ):
                self._state = CircuitState.OPEN
                self._opened_at = time.monotonic()
                self._stats = CircuitStats()


class CircuitOpenError(Exception):
    pass

3. ECS Auto-Scaling Configuration

graph TD
    subgraph "Scaling Policies"
        A[Target Tracking<br>CPU Utilization: 65%]
        B[Target Tracking<br>Request Count / Target: 500]
        C[Step Scaling<br>Active WebSocket > 8K]
    end

    subgraph "Scale Limits"
        D[Min: 3 tasks<br>Desired: 5 tasks<br>Max: 50 tasks]
    end

    subgraph "Scheduled Scaling"
        E[Mon-Fri 6AM: Min 10]
        F[Mon-Fri 10PM: Min 3]
        G[Events / Sales: Min 20]
    end

    A --> H[ECS Service<br>Auto-scaling]
    B --> H
    C --> H
    D --> H
    E --> H
    F --> H

Code Example: Auto-Scaling Configuration (CDK-style)

"""ECS Auto-scaling configuration expressed as infrastructure-as-code."""


def configure_ecs_autoscaling(service, cluster_name: str) -> dict:
    """Return auto-scaling policies for the orchestrator ECS service."""
    return {
        "service": {
            "cluster": cluster_name,
            "service_name": "mangaassist-orchestrator",
            "desired_count": 5,
            "deployment_configuration": {
                "minimum_healthy_percent": 100,
                "maximum_percent": 200,
            },
        },
        "scaling": {
            "min_capacity": 3,
            "max_capacity": 50,
            "policies": [
                {
                    "name": "cpu-target-tracking",
                    "type": "TargetTrackingScaling",
                    "metric": "ECSServiceAverageCPUUtilization",
                    "target_value": 65.0,
                    "scale_in_cooldown": 300,
                    "scale_out_cooldown": 60,
                },
                {
                    "name": "request-count-target-tracking",
                    "type": "TargetTrackingScaling",
                    "metric": "ALBRequestCountPerTarget",
                    "target_value": 500,
                    "scale_in_cooldown": 300,
                    "scale_out_cooldown": 60,
                },
                {
                    "name": "websocket-connections-step",
                    "type": "StepScaling",
                    "metric": "ActiveWebSocketConnections",
                    "steps": [
                        {"lower": 8000, "upper": 9000, "adjustment": 2},
                        {"lower": 9000, "upper": 10000, "adjustment": 4},
                        {"lower": 10000, "adjustment": 6},
                    ],
                },
            ],
            "scheduled_actions": [
                {
                    "name": "weekday-peak",
                    "schedule": "cron(0 6 ? * MON-FRI *)",
                    "min_capacity": 10,
                },
                {
                    "name": "weekday-off-peak",
                    "schedule": "cron(0 22 ? * MON-FRI *)",
                    "min_capacity": 3,
                },
            ],
        },
    }

4. Connection Pool Manager

Prevent connection exhaustion across downstream HTTP and database clients.

graph TD
    subgraph "Orchestrator Task"
        A[Request Handler] --> B[Connection Pool Manager]
        B --> C[HTTP Pool: SageMaker<br>max=50, idle_timeout=120s]
        B --> D[HTTP Pool: Bedrock<br>max=100, idle_timeout=120s]
        B --> E[DynamoDB Pool<br>max=50, via boto3 session]
        B --> F[Redis Pool<br>max=100, min_idle=10]
        B --> G[OpenSearch Pool<br>max=30]
    end

    subgraph "Health"
        H[Pool Utilization<br>CloudWatch metric]
        I{> 80% used?}
        H --> I
        I -->|Yes| J[Warn + Pre-scale]
    end

Code Example: Connection Pool Manager

import asyncio
import logging
from dataclasses import dataclass, field

import aiohttp
import aioboto3
import redis.asyncio as aioredis

logger = logging.getLogger(__name__)


@dataclass
class PoolConfig:
    name: str
    max_connections: int
    min_idle: int = 0
    idle_timeout_seconds: int = 120
    connect_timeout_seconds: float = 5.0


class ConnectionPoolManager:
    """Centralized connection pool management for all downstream services."""

    def __init__(self):
        self._http_sessions: dict[str, aiohttp.ClientSession] = {}
        self._redis_pool: aioredis.ConnectionPool | None = None
        self._initialized = False

    async def initialize(
        self,
        http_pools: list[PoolConfig],
        redis_config: PoolConfig,
    ) -> None:
        """Initialize all connection pools at startup."""
        for pool_cfg in http_pools:
            connector = aiohttp.TCPConnector(
                limit=pool_cfg.max_connections,
                keepalive_timeout=pool_cfg.idle_timeout_seconds,
                enable_cleanup_closed=True,
            )
            timeout = aiohttp.ClientTimeout(
                total=30,
                connect=pool_cfg.connect_timeout_seconds,
            )
            session = aiohttp.ClientSession(
                connector=connector, timeout=timeout
            )
            self._http_sessions[pool_cfg.name] = session
            logger.info(
                f"HTTP pool '{pool_cfg.name}' initialized: "
                f"max={pool_cfg.max_connections}"
            )

        self._redis_pool = aioredis.ConnectionPool(
            max_connections=redis_config.max_connections,
            socket_timeout=redis_config.connect_timeout_seconds,
            socket_connect_timeout=redis_config.connect_timeout_seconds,
        )
        logger.info(
            f"Redis pool initialized: max={redis_config.max_connections}"
        )
        self._initialized = True

    def get_http_session(self, name: str) -> aiohttp.ClientSession:
        """Get the HTTP session for a named downstream service."""
        session = self._http_sessions.get(name)
        if session is None:
            raise ValueError(f"No HTTP pool configured for '{name}'")
        return session

    def get_redis_pool(self) -> aioredis.ConnectionPool:
        """Get the shared Redis connection pool."""
        if self._redis_pool is None:
            raise RuntimeError("Redis pool not initialized")
        return self._redis_pool

    async def get_pool_stats(self) -> dict:
        """Report connection pool utilization for monitoring."""
        stats = {}
        for name, session in self._http_sessions.items():
            connector = session.connector
            if isinstance(connector, aiohttp.TCPConnector):
                stats[name] = {
                    "limit": connector.limit,
                    "active": len(connector._acquired),
                    "utilization_pct": (
                        len(connector._acquired) / connector.limit * 100
                        if connector.limit > 0
                        else 0
                    ),
                }
        return stats

    async def shutdown(self) -> None:
        """Gracefully close all pools."""
        for name, session in self._http_sessions.items():
            await session.close()
            logger.info(f"HTTP pool '{name}' closed")
        if self._redis_pool:
            await self._redis_pool.disconnect()
            logger.info("Redis pool closed")

Metrics and Monitoring

Metric Target Alarm Threshold
orchestrator.overhead_ms p95 < 100ms p95 > 200ms
orchestrator.concurrent_sessions Monitor trend > capacity × 0.85
orchestrator.fanout_parallel_ms p95 < 50ms p95 > 100ms
circuit_breaker.open_count 0 in steady state Any circuit open > 2 min
circuit_breaker.fallback_invocations Monitor trend > 100/min
connection_pool.utilization_pct < 70% per pool > 85% per pool
ecs.scaling_response_time_s < 60s > 120s
ecs.task_count Within scheduled bounds > max_capacity × 0.9
graph LR
    subgraph "Orchestrator Dashboard"
        A[Fan-Out Time]
        B[Circuit Breaker<br>State per service]
        C[Pool Utilization %]
        D[Active ECS Tasks]
    end

    A --> E{> 100ms?}
    E -->|Yes| F[Check downstream<br>latency]
    B --> G{Any Open?}
    G -->|Yes| H[Page on-call +<br>serve fallback]
    C --> I{> 85%?}
    I -->|Yes| J[Increase pool<br>or scale tasks]