LOCAL PREVIEW View on GitHub

Real-Time AI Interaction Architecture

MangaAssist context: JP Manga store chatbot on AWS — Bedrock Claude 3 (Sonnet at $3/$15 per 1M tokens input/output, Haiku at $0.25/$1.25), OpenSearch Serverless (vector store), DynamoDB (sessions/products), ECS Fargate (orchestrator), API Gateway WebSocket, ElastiCache Redis. Target: useful answer in under 3 seconds, 1M messages/day scale.


Skill Mapping

Dimension Value
Certification AWS Certified AI Practitioner — Specialty (AIP-C01)
Task 2.4 — Select and implement FM API integration patterns
Skill 2.4.2 — Real-Time AI Interaction
This File 01 — Real-Time Interaction Architecture (streaming architecture, WebSocket connection management, time-to-first-token)

Skill Scope

Skill 2.4.2 focuses on real-time, streaming interactions between users and Foundation Models. While Skill 2.4.1 covers synchronous request/response where the user waits for the complete answer, this skill addresses streaming delivery where tokens appear progressively as the model generates them. For MangaAssist, streaming transforms the user experience from "wait 2-3 seconds for full answer" to "see the first word in 200-400ms and read along as the model types." This file covers the overall streaming architecture, WebSocket connection lifecycle, and time-to-first-token (TTFT) optimization.


Mind Map

mindmap
  root((Skill 2.4.2<br/>Real-Time AI<br/>Interaction))
    Streaming Architecture
      Bedrock InvokeModelWithResponseStream
      Server-Sent Events (SSE)
      Chunked transfer encoding
      Token-by-token delivery
      Stream multiplexing
    WebSocket Management
      API Gateway WebSocket API
      Connection lifecycle ($connect/$disconnect)
      Route selection ($default)
      Connection table (DynamoDB)
      Idle timeout handling
    Time-to-First-Token
      TTFT measurement
      Prompt caching (Bedrock)
      Pre-warming strategies
      Connection keep-alive
      Latency waterfall
    Backpressure Handling
      Client consumption rate
      Buffer management
      Flow control signals
      Memory-bounded queues
      Graceful degradation
    MangaAssist Streaming
      Progressive JP text display
      Character-boundary chunking
      Partial response caching
      Stream recovery on drop
      Typing indicator UX

1. Streaming Architecture Overview

1.1 End-to-End Streaming Flow

sequenceDiagram
    participant User as Manga Reader<br/>(Browser)
    participant WS as API Gateway<br/>WebSocket
    participant Lambda as Lambda<br/>$default Route
    participant ECS as ECS Fargate<br/>Orchestrator
    participant Bedrock as Amazon Bedrock<br/>Claude 3 Streaming

    User->>WS: WebSocket frame:<br/>{"action":"chat","message":"おすすめのマンガは?"}
    WS->>Lambda: Route to $default handler
    Lambda->>ECS: HTTP POST /stream-chat
    ECS->>Bedrock: InvokeModelWithResponseStream

    loop Token-by-token streaming
        Bedrock-->>ECS: Stream chunk (token)
        ECS-->>Lambda: Chunked HTTP response
        Lambda->>WS: PostToConnection (chunk)
        WS-->>User: WebSocket frame (chunk)
    end

    Note over User,Bedrock: User sees tokens appear progressively<br/>TTFT: ~300ms | Full response: ~2500ms

    Bedrock-->>ECS: Stream end (stop_reason)
    ECS-->>Lambda: Final chunk + metadata
    Lambda->>WS: PostToConnection (done + usage)
    WS-->>User: WebSocket frame (complete)

1.2 Streaming vs Non-Streaming Comparison

Metric Non-Streaming (InvokeModel) Streaming (InvokeModelWithResponseStream)
Time to first visible token 2000-3000ms 200-400ms
Perceived latency High (blank → full text) Low (progressive reveal)
Total completion time ~2500ms ~2700ms (slightly longer)
Network overhead 1 response payload N small frames
Error handling Simple (one response) Complex (mid-stream errors)
Client complexity Low Medium (buffer management)
Cost Same Same (per-token pricing)

1.3 Bedrock Streaming Client

"""
Bedrock streaming client for MangaAssist real-time chat.
Handles InvokeModelWithResponseStream with proper event parsing.
"""
import json
import time
import logging
from typing import Generator, Dict, Any, Optional
from dataclasses import dataclass, field

import boto3
from botocore.config import Config
from botocore.exceptions import ClientError, EventStreamError

logger = logging.getLogger(__name__)


@dataclass
class StreamChunk:
    """Represents a single chunk from the Bedrock stream."""
    chunk_type: str           # "content_block_delta", "message_start", "message_stop", etc.
    text: str = ""            # Generated text (for content deltas)
    index: int = 0            # Chunk sequence number
    input_tokens: int = 0     # Token count (populated at stream end)
    output_tokens: int = 0
    stop_reason: str = ""     # Why generation stopped
    latency_ms: float = 0.0   # Time since stream start


@dataclass
class StreamMetrics:
    """Metrics collected during a streaming invocation."""
    ttft_ms: float = 0.0          # Time to first token
    total_ms: float = 0.0         # Total stream duration
    chunks_received: int = 0
    total_text_length: int = 0
    input_tokens: int = 0
    output_tokens: int = 0
    stop_reason: str = ""


class BedrockStreamingClient:
    """
    Manages streaming invocations to Amazon Bedrock.
    Yields parsed chunks for real-time relay to WebSocket clients.
    """

    def __init__(self, region: str = "us-east-1"):
        self._client = boto3.client(
            "bedrock-runtime",
            region_name=region,
            config=Config(
                retries={"max_attempts": 2, "mode": "adaptive"},
                connect_timeout=5,
                read_timeout=60,  # Longer for streaming
                max_pool_connections=25,
            ),
        )

    def stream_invoke(
        self,
        model_id: str,
        body: dict,
    ) -> Generator[StreamChunk, None, StreamMetrics]:
        """
        Invoke Bedrock with streaming and yield chunks as they arrive.

        Usage:
            client = BedrockStreamingClient()
            gen = client.stream_invoke("anthropic.claude-3-sonnet-...", body)
            for chunk in gen:
                send_to_websocket(chunk.text)
            # After iteration, gen.value contains StreamMetrics

        Yields:
            StreamChunk for each content delta

        Returns:
            StreamMetrics (accessible via generator .value after StopIteration)
        """
        stream_start = time.time()
        first_token_time = None
        metrics = StreamMetrics()
        chunk_index = 0

        try:
            response = self._client.invoke_model_with_response_stream(
                modelId=model_id,
                contentType="application/json",
                accept="application/json",
                body=json.dumps(body),
            )

            event_stream = response["body"]

            for event in event_stream:
                chunk_data = event.get("chunk")
                if not chunk_data:
                    continue

                payload = json.loads(chunk_data["bytes"].decode("utf-8"))
                event_type = payload.get("type", "")

                if event_type == "content_block_delta":
                    delta = payload.get("delta", {})
                    text = delta.get("text", "")

                    if text and first_token_time is None:
                        first_token_time = time.time()
                        metrics.ttft_ms = (first_token_time - stream_start) * 1000

                    metrics.chunks_received += 1
                    metrics.total_text_length += len(text)

                    yield StreamChunk(
                        chunk_type="content_block_delta",
                        text=text,
                        index=chunk_index,
                        latency_ms=(time.time() - stream_start) * 1000,
                    )
                    chunk_index += 1

                elif event_type == "message_delta":
                    usage = payload.get("usage", {})
                    metrics.output_tokens = usage.get("output_tokens", 0)
                    metrics.stop_reason = payload.get("delta", {}).get("stop_reason", "")

                elif event_type == "message_start":
                    msg = payload.get("message", {})
                    usage = msg.get("usage", {})
                    metrics.input_tokens = usage.get("input_tokens", 0)

                elif event_type == "message_stop":
                    pass  # Stream complete

        except EventStreamError as exc:
            logger.error("Stream event error: %s", exc)
            yield StreamChunk(
                chunk_type="error",
                text=f"Stream error: {str(exc)}",
                index=chunk_index,
            )

        except ClientError as exc:
            error_code = exc.response["Error"]["Code"]
            logger.error("Bedrock client error during stream: %s", error_code)
            yield StreamChunk(
                chunk_type="error",
                text=f"Model error: {error_code}",
                index=chunk_index,
            )

        metrics.total_ms = (time.time() - stream_start) * 1000

        logger.info(
            "Stream complete | ttft=%.0fms | total=%.0fms | chunks=%d | "
            "tokens=%d/%d | stop=%s",
            metrics.ttft_ms, metrics.total_ms, metrics.chunks_received,
            metrics.input_tokens, metrics.output_tokens, metrics.stop_reason,
        )

        return metrics

2. WebSocket Connection Management

2.1 Connection Lifecycle

statechart-v2
flowchart TD
    subgraph "Connection Lifecycle"
        A[Client initiates<br/>WebSocket handshake] --> B[$connect Route<br/>Lambda Handler]
        B --> C{Authenticate<br/>& Authorize}
        C -->|Valid| D[Store connection<br/>in DynamoDB]
        C -->|Invalid| E[Return 403<br/>Reject connection]

        D --> F[Connection ACTIVE]

        F --> G{Message received}
        G -->|chat message| H[$default Route<br/>Lambda Handler]
        G -->|ping| I[Heartbeat<br/>Response]
        G -->|no activity 10min| J[Idle timeout<br/>API GW disconnects]

        H --> K[Process & Stream<br/>Response]
        K --> F

        J --> L[$disconnect Route<br/>Lambda Handler]
        L --> M[Remove from<br/>DynamoDB]
        M --> N[Connection CLOSED]

        F -->|Client closes| L
    end

2.2 Connection Registry

"""
WebSocket connection registry backed by DynamoDB.
Tracks active connections, supports multi-device sessions,
and handles connection lifecycle events.
"""
import time
import logging
from typing import Optional, List, Dict
from dataclasses import dataclass

import boto3
from boto3.dynamodb.conditions import Key, Attr
from botocore.exceptions import ClientError

logger = logging.getLogger(__name__)


@dataclass
class ConnectionRecord:
    """Represents an active WebSocket connection."""
    connection_id: str
    user_id: str
    session_id: str
    connected_at: int          # epoch ms
    last_active_at: int        # epoch ms
    client_info: Dict = None
    ttl: int = 0               # DynamoDB TTL (epoch seconds)

    def __post_init__(self):
        self.client_info = self.client_info or {}
        # Auto-expire connections after 2 hours
        if not self.ttl:
            self.ttl = int(time.time()) + 7200


class ConnectionRegistry:
    """
    Manages WebSocket connections in DynamoDB.

    Table schema:
      PK: connectionId (String)
      GSI: userId-index (userId -> connectionId)
      TTL: ttl (Number) — auto-cleanup of stale connections
    """

    def __init__(self, table_name: str = "manga-ws-connections"):
        self.table = boto3.resource("dynamodb").Table(table_name)

    def register(self, record: ConnectionRecord) -> None:
        """Register a new WebSocket connection."""
        try:
            self.table.put_item(
                Item={
                    "connectionId": record.connection_id,
                    "userId": record.user_id,
                    "sessionId": record.session_id,
                    "connectedAt": record.connected_at,
                    "lastActiveAt": record.last_active_at,
                    "clientInfo": record.client_info,
                    "ttl": record.ttl,
                },
                ConditionExpression="attribute_not_exists(connectionId)",
            )
            logger.info(
                "Connection registered | connId=%s | userId=%s",
                record.connection_id, record.user_id,
            )
        except ClientError as exc:
            if exc.response["Error"]["Code"] == "ConditionalCheckFailedException":
                logger.warning("Connection already registered: %s", record.connection_id)
            else:
                raise

    def unregister(self, connection_id: str) -> None:
        """Remove a connection on disconnect."""
        try:
            self.table.delete_item(Key={"connectionId": connection_id})
            logger.info("Connection unregistered | connId=%s", connection_id)
        except Exception as exc:
            logger.error("Failed to unregister: %s | error=%s", connection_id, exc)

    def get_connection(self, connection_id: str) -> Optional[ConnectionRecord]:
        """Retrieve a connection record."""
        response = self.table.get_item(Key={"connectionId": connection_id})
        item = response.get("Item")
        if not item:
            return None
        return ConnectionRecord(
            connection_id=item["connectionId"],
            user_id=item["userId"],
            session_id=item["sessionId"],
            connected_at=item["connectedAt"],
            last_active_at=item["lastActiveAt"],
            client_info=item.get("clientInfo", {}),
        )

    def update_activity(self, connection_id: str) -> None:
        """Update last active timestamp (prevents premature TTL expiry)."""
        now = int(time.time() * 1000)
        self.table.update_item(
            Key={"connectionId": connection_id},
            UpdateExpression="SET lastActiveAt = :ts, #ttl = :ttl",
            ExpressionAttributeNames={"#ttl": "ttl"},
            ExpressionAttributeValues={
                ":ts": now,
                ":ttl": int(time.time()) + 7200,
            },
        )

    def get_user_connections(self, user_id: str) -> List[str]:
        """Get all active connections for a user (multi-device support)."""
        response = self.table.query(
            IndexName="userId-index",
            KeyConditionExpression=Key("userId").eq(user_id),
        )
        return [item["connectionId"] for item in response.get("Items", [])]

3. Time-to-First-Token Optimization

3.1 TTFT Waterfall Analysis

gantt
    title Time-to-First-Token Waterfall (Target: < 400ms)
    dateFormat X
    axisFormat %Lms

    section Network
    WS frame parse        :a1, 0, 10
    Lambda cold start     :crit, a2, 10, 110
    Lambda → ECS HTTP     :a3, 110, 140

    section Preparation
    Session lookup (Redis) :b1, 140, 155
    Prompt assembly        :b2, 155, 165
    Bedrock TCP connect    :b3, 165, 185

    section Model
    Bedrock prefill        :c1, 185, 350
    First token generated  :milestone, m1, 350, 350

    section Delivery
    ECS → Lambda chunk     :d1, 350, 360
    Lambda → PostToConnection :d2, 360, 380
    WS frame to client     :d3, 380, 395

3.2 TTFT Optimization Techniques

"""
Time-to-first-token optimization strategies for MangaAssist.
Combines connection pre-warming, prompt caching, and pipeline parallelism.
"""
import time
import logging
import asyncio
from typing import Dict, Any, Optional
from concurrent.futures import ThreadPoolExecutor

import boto3
import redis

logger = logging.getLogger(__name__)

# Shared resources — initialized once per Lambda container / ECS task
_bedrock_client = None
_redis_client = None
_ddb_resource = None


def get_bedrock_client():
    """Lazy singleton — avoids cold-start overhead on repeated invocations."""
    global _bedrock_client
    if _bedrock_client is None:
        from botocore.config import Config
        _bedrock_client = boto3.client(
            "bedrock-runtime",
            config=Config(
                connect_timeout=3,
                read_timeout=60,
                max_pool_connections=25,
                tcp_keepalive=True,
            ),
        )
    return _bedrock_client


def get_redis_client():
    """Lazy singleton Redis connection."""
    global _redis_client
    if _redis_client is None:
        import os
        _redis_client = redis.Redis(
            host=os.environ.get("REDIS_HOST", "localhost"),
            port=6379,
            socket_connect_timeout=2,
            socket_timeout=2,
            decode_responses=True,
        )
    return _redis_client


class TTFTOptimizer:
    """
    Optimizes time-to-first-token through parallelism and caching.

    Key strategies:
    1. Parallel session load + prompt assembly
    2. Connection keep-alive (TCP reuse)
    3. Minimal prompt for fast prefill
    4. Pre-computed system prompts
    """

    # Pre-computed system prompts — avoid re-encoding on every request
    SYSTEM_PROMPTS = {
        "ja": "あなたはMangaAssistです。日本のマンガ書店のチャットボットです。敬語を使用してください。",
        "en": "You are MangaAssist, a helpful JP manga store chatbot.",
    }

    def __init__(self):
        self._executor = ThreadPoolExecutor(max_workers=3)

    async def prepare_stream_request(
        self,
        connection_id: str,
        message: str,
        session_id: str,
        language: str = "ja",
    ) -> Dict[str, Any]:
        """
        Prepare a streaming request with parallel I/O operations.

        Runs session load and cache check concurrently to minimize
        the time before we can start the Bedrock stream.
        """
        loop = asyncio.get_event_loop()

        # Run session load and cache check in parallel
        session_future = loop.run_in_executor(
            self._executor,
            self._load_session_fast,
            session_id,
        )
        cache_future = loop.run_in_executor(
            self._executor,
            self._check_streaming_cache,
            session_id,
            message,
        )

        # Wait for both to complete
        history, cached_prefix = await asyncio.gather(session_future, cache_future)

        # Build minimal prompt for fastest prefill
        body = self._build_minimal_body(message, history, language)

        return {
            "body": body,
            "cached_prefix": cached_prefix,
            "model_id": self._select_streaming_model(message),
        }

    def _load_session_fast(self, session_id: str) -> list:
        """Load session from Redis first, DynamoDB as fallback."""
        r = get_redis_client()
        import json

        # Try Redis (< 5ms)
        cached = r.get(f"session:{session_id}")
        if cached:
            return json.loads(cached)

        # Fallback to DynamoDB (~50ms)
        ddb = boto3.resource("dynamodb").Table("manga-assist-sessions")
        from boto3.dynamodb.conditions import Key
        response = ddb.query(
            KeyConditionExpression=Key("sessionId").eq(session_id),
            ScanIndexForward=False,
            Limit=6,  # Only last 3 turns (6 messages = 3 user + 3 assistant)
        )
        items = list(reversed(response.get("Items", [])))

        # Cache in Redis for next request
        r.setex(f"session:{session_id}", 300, json.dumps(items, default=str))

        return items

    def _check_streaming_cache(self, session_id: str, message: str) -> Optional[str]:
        """Check if we have a cached partial response for this exact query."""
        import hashlib
        r = get_redis_client()
        key = f"stream_prefix:{hashlib.sha256(f'{session_id}:{message}'.encode()).hexdigest()[:12]}"
        return r.get(key)

    def _build_minimal_body(self, message: str, history: list, language: str) -> dict:
        """
        Build the smallest valid request body for fastest Bedrock prefill.

        Prefill time scales with input tokens — keep context minimal.
        Last 3 turns (6 messages) ≈ 500-1000 tokens → ~150ms prefill.
        Full history (20 messages) ≈ 3000-5000 tokens → ~400ms prefill.
        """
        messages = []
        for item in history[-6:]:  # Last 3 turns only for streaming
            messages.append({"role": item["role"], "content": item["content"]})
        messages.append({"role": "user", "content": message})

        return {
            "anthropic_version": "bedrock-2023-05-31",
            "max_tokens": 2048,  # Shorter max for faster streaming
            "temperature": 0.3,
            "system": self.SYSTEM_PROMPTS.get(language, self.SYSTEM_PROMPTS["en"]),
            "messages": messages,
        }

    def _select_streaming_model(self, message: str) -> str:
        """Select model for streaming — prefer Haiku for faster TTFT."""
        import re
        # Haiku: ~150ms TTFT, Sonnet: ~300ms TTFT
        # Use Haiku for simple queries, Sonnet for complex
        jp_chars = len(re.findall(r"[\u3000-\u9fff]", message))
        if len(message) < 100 and jp_chars < 30:
            return "anthropic.claude-3-haiku-20240307-v1:0"
        return "anthropic.claude-3-sonnet-20240229-v1:0"

4. Stream Relay Architecture

4.1 ECS Stream Relay

The ECS orchestrator acts as a relay between Bedrock's streaming response and the WebSocket API. It reads chunks from Bedrock and pushes them to the client via API Gateway's PostToConnection API.

flowchart LR
    subgraph "Bedrock Stream"
        B1[message_start] --> B2[content_block_start]
        B2 --> B3[content_block_delta<br/>text: お]
        B3 --> B4[content_block_delta<br/>text: す]
        B4 --> B5[content_block_delta<br/>text: す]
        B5 --> B6[content_block_delta<br/>text: め]
        B6 --> B7[content_block_stop]
        B7 --> B8[message_delta<br/>usage + stop_reason]
        B8 --> B9[message_stop]
    end

    subgraph "Relay Processing"
        B3 --> R1[Buffer until<br/>char boundary]
        B4 --> R1
        B5 --> R1
        B6 --> R1
        R1 --> R2[Batch small<br/>chunks: おすすめ]
        R2 --> R3[PostToConnection]
    end

    subgraph "Client WebSocket"
        R3 --> C1[Frame: おすすめ]
        C1 --> C2[Render text<br/>progressively]
    end

4.2 Stream Relay Implementation

"""
WebSocket stream relay — bridges Bedrock streaming to API Gateway WebSocket.
Handles chunk batching, character-boundary alignment, and heartbeats.
"""
import json
import time
import logging
import threading
from typing import Optional

import boto3
from botocore.exceptions import ClientError

logger = logging.getLogger(__name__)


class WebSocketStreamRelay:
    """
    Relays Bedrock streaming responses to WebSocket clients.

    Features:
    - Batches small chunks to reduce PostToConnection calls
    - Aligns on character boundaries (important for JP text)
    - Sends periodic heartbeats during long prefill
    - Handles client disconnection mid-stream
    """

    def __init__(
        self,
        api_endpoint: str,
        region: str = "us-east-1",
        batch_interval_ms: int = 50,
        heartbeat_interval_ms: int = 5000,
    ):
        self.apigw = boto3.client(
            "apigatewaymanagementapi",
            endpoint_url=api_endpoint,
            region_name=region,
        )
        self.batch_interval_ms = batch_interval_ms
        self.heartbeat_interval_ms = heartbeat_interval_ms

    def relay_stream(
        self,
        connection_id: str,
        stream_generator,
        request_id: str,
    ) -> dict:
        """
        Relay a Bedrock stream to a WebSocket connection.

        Args:
            connection_id: API Gateway WebSocket connection ID
            stream_generator: Generator from BedrockStreamingClient.stream_invoke
            request_id: Unique request identifier for correlation

        Returns:
            Dict with relay metrics
        """
        relay_start = time.time()
        text_buffer = ""
        last_send_time = time.time()
        chunks_sent = 0
        total_text = ""
        client_connected = True

        # Start heartbeat thread for long prefill periods
        heartbeat_stop = threading.Event()
        heartbeat_thread = threading.Thread(
            target=self._heartbeat_loop,
            args=(connection_id, request_id, heartbeat_stop),
            daemon=True,
        )
        heartbeat_thread.start()

        try:
            for chunk in stream_generator:
                if not client_connected:
                    break

                if chunk.chunk_type == "error":
                    self._send_to_client(connection_id, {
                        "type": "error",
                        "requestId": request_id,
                        "error": chunk.text,
                    })
                    break

                if chunk.chunk_type == "content_block_delta" and chunk.text:
                    # Stop heartbeat once content starts flowing
                    heartbeat_stop.set()

                    text_buffer += chunk.text
                    total_text += chunk.text

                    # Send when buffer is large enough or time threshold passed
                    now = time.time()
                    elapsed_ms = (now - last_send_time) * 1000

                    should_send = (
                        len(text_buffer) >= 10  # Enough text to send
                        or elapsed_ms >= self.batch_interval_ms  # Time threshold
                    )

                    if should_send and text_buffer:
                        success = self._send_to_client(connection_id, {
                            "type": "chunk",
                            "requestId": request_id,
                            "text": text_buffer,
                            "index": chunks_sent,
                        })
                        if not success:
                            client_connected = False
                            break
                        chunks_sent += 1
                        text_buffer = ""
                        last_send_time = now

            # Flush remaining buffer
            if text_buffer and client_connected:
                self._send_to_client(connection_id, {
                    "type": "chunk",
                    "requestId": request_id,
                    "text": text_buffer,
                    "index": chunks_sent,
                })
                chunks_sent += 1

            # Send completion message
            if client_connected:
                self._send_to_client(connection_id, {
                    "type": "done",
                    "requestId": request_id,
                    "totalLength": len(total_text),
                    "chunksDelivered": chunks_sent,
                })

        finally:
            heartbeat_stop.set()
            heartbeat_thread.join(timeout=1)

        relay_ms = (time.time() - relay_start) * 1000
        logger.info(
            "Stream relay complete | conn=%s | chunks=%d | text=%d chars | duration=%.0fms",
            connection_id, chunks_sent, len(total_text), relay_ms,
        )

        return {
            "chunks_sent": chunks_sent,
            "total_text_length": len(total_text),
            "relay_duration_ms": relay_ms,
            "client_connected": client_connected,
        }

    def _send_to_client(self, connection_id: str, payload: dict) -> bool:
        """Send a message to a WebSocket client. Returns False if disconnected."""
        try:
            self.apigw.post_to_connection(
                ConnectionId=connection_id,
                Data=json.dumps(payload, ensure_ascii=False).encode("utf-8"),
            )
            return True
        except ClientError as exc:
            error_code = exc.response["Error"]["Code"]
            if error_code == "GoneException":
                logger.info("Client disconnected mid-stream | conn=%s", connection_id)
                return False
            logger.error(
                "PostToConnection failed | conn=%s | error=%s",
                connection_id, error_code,
            )
            return False

    def _heartbeat_loop(
        self,
        connection_id: str,
        request_id: str,
        stop_event: threading.Event,
    ):
        """Send heartbeat pings during Bedrock prefill period."""
        while not stop_event.is_set():
            stop_event.wait(self.heartbeat_interval_ms / 1000)
            if stop_event.is_set():
                break
            try:
                self.apigw.post_to_connection(
                    ConnectionId=connection_id,
                    Data=json.dumps({
                        "type": "heartbeat",
                        "requestId": request_id,
                        "timestamp": int(time.time() * 1000),
                    }).encode("utf-8"),
                )
            except ClientError:
                break  # Client disconnected

5. Connection Health and Metrics

5.1 Connection Monitoring

flowchart TD
    subgraph "Health Signals"
        H1[WebSocket ping/pong]
        H2[Heartbeat messages]
        H3[PostToConnection success]
        H4[DynamoDB TTL check]
    end

    subgraph "Metrics"
        M1[Active connections<br/>gauge]
        M2[TTFT histogram<br/>p50/p95/p99]
        M3[Stream completion<br/>rate]
        M4[Chunks per stream<br/>histogram]
        M5[Client disconnect<br/>during stream rate]
    end

    H1 --> M1
    H2 --> M1
    H3 --> M3
    H3 --> M5
    H4 --> M1

    M1 --> CW[CloudWatch<br/>Dashboard]
    M2 --> CW
    M3 --> CW
    M4 --> CW
    M5 --> CW
"""
Streaming metrics collector for MangaAssist.
Publishes TTFT, stream completion, and connection health metrics.
"""
import time
import logging
from typing import Optional
from dataclasses import dataclass
import boto3

logger = logging.getLogger(__name__)


@dataclass
class StreamingMetrics:
    """Collected metrics for a single streaming interaction."""
    connection_id: str
    ttft_ms: float
    total_duration_ms: float
    chunks_sent: int
    text_length: int
    model_id: str
    completed: bool
    disconnect_during_stream: bool = False


class StreamMetricsPublisher:
    """Publishes streaming metrics to CloudWatch."""

    NAMESPACE = "MangaAssist/Streaming"

    def __init__(self, environment: str = "prod"):
        self.cw = boto3.client("cloudwatch")
        self.env = environment

    def publish(self, metrics: StreamingMetrics) -> None:
        """Publish a set of streaming metrics."""
        from datetime import datetime

        model_short = "sonnet" if "sonnet" in metrics.model_id else "haiku"

        metric_data = [
            {
                "MetricName": "TimeToFirstToken",
                "Value": metrics.ttft_ms,
                "Unit": "Milliseconds",
                "Dimensions": [
                    {"Name": "Environment", "Value": self.env},
                    {"Name": "Model", "Value": model_short},
                ],
                "Timestamp": datetime.utcnow(),
            },
            {
                "MetricName": "StreamDuration",
                "Value": metrics.total_duration_ms,
                "Unit": "Milliseconds",
                "Dimensions": [
                    {"Name": "Environment", "Value": self.env},
                    {"Name": "Model", "Value": model_short},
                ],
                "Timestamp": datetime.utcnow(),
            },
            {
                "MetricName": "ChunksPerStream",
                "Value": metrics.chunks_sent,
                "Unit": "Count",
                "Dimensions": [
                    {"Name": "Environment", "Value": self.env},
                ],
                "Timestamp": datetime.utcnow(),
            },
            {
                "MetricName": "StreamCompleted",
                "Value": 1 if metrics.completed else 0,
                "Unit": "Count",
                "Dimensions": [
                    {"Name": "Environment", "Value": self.env},
                ],
                "Timestamp": datetime.utcnow(),
            },
        ]

        if metrics.disconnect_during_stream:
            metric_data.append({
                "MetricName": "MidStreamDisconnect",
                "Value": 1,
                "Unit": "Count",
                "Dimensions": [
                    {"Name": "Environment", "Value": self.env},
                ],
                "Timestamp": datetime.utcnow(),
            })

        try:
            self.cw.put_metric_data(
                Namespace=self.NAMESPACE,
                MetricData=metric_data,
            )
        except Exception as exc:
            logger.error("Failed to publish streaming metrics: %s", exc)

Key Takeaways

# Takeaway
1 Streaming transforms UX — TTFT of 200-400ms vs 2-3s for full response. Users perceive the bot as faster even though total time is slightly longer.
2 InvokeModelWithResponseStream yields content_block_delta events with incremental text — parse the event stream and relay chunks to the WebSocket.
3 WebSocket connection registry in DynamoDB with TTL auto-cleans stale connections. Use a GSI on userId for multi-device support.
4 Chunk batching (every 50ms or 10+ characters) reduces PostToConnection API calls while keeping perceived latency low.
5 Heartbeat during prefill — send periodic {"type":"heartbeat"} frames while Bedrock processes the prompt, preventing client-side timeout assumptions.
6 GoneException from PostToConnection means the client disconnected — stop the Bedrock stream to save cost (you still pay for generated tokens).
7 TTFT optimization — keep conversation context to last 3 turns for streaming; fewer input tokens means faster Bedrock prefill.
8 Lazy singleton clients — initialize boto3 clients outside the handler function to reuse TCP connections across invocations, reducing TTFT by 50-100ms.