LOCAL PREVIEW View on GitHub

Real-Time AI Interaction Architecture

MangaAssist context: JP Manga store chatbot on AWS — Bedrock Claude 3 (Sonnet at $3/$15 per 1M tokens input/output, Haiku at $0.25/$1.25), OpenSearch Serverless (vector store), DynamoDB (sessions/products), ECS Fargate (orchestrator), API Gateway WebSocket, ElastiCache Redis. Target: useful answer in under 3 seconds, 1M messages/day scale.


Skill Mapping

Field Value
Certification AWS Certified AI Practitioner (AIP-C01)
Domain 2 — Implementation and Integration of Foundation Models
Task 2.4 — Design and implement interaction mechanisms for FM-based applications
Skill 2.4.2 — Develop real-time AI interaction systems to provide immediate feedback from FM
Focus Areas Bedrock streaming APIs for incremental response delivery, WebSockets/SSE for real-time text generation, API Gateway for chunked transfer encoding

Real-Time AI Interaction Mindmap

mindmap
  root((Real-Time AI
    Interaction))
    Streaming APIs
      Bedrock InvokeModelWithResponseStream
        Chunked response body
        Server-side token generation
        Event stream format
        Partial JSON payloads
      Response Stream Processing
        Byte-level parsing
        Token accumulation
        Delta extraction
        End-of-stream detection
      Model-Specific Streaming
        Claude 3 streaming format
        Content block deltas
        Stop reason signaling
        Usage metrics in final chunk
    WebSocket Transport
      API Gateway WebSocket API
        $connect route
        $disconnect route
        $default route
        Custom routes
      Connection Management
        Connection ID tracking
        DynamoDB connection store
        Heartbeat / ping-pong
        Idle timeout handling
      Bidirectional Communication
        Client-to-server messages
        Server-to-client push
        Binary frame support
        Message fragmentation
    Server-Sent Events
      HTTP/1.1 SSE Protocol
        text/event-stream content type
        Event ID for resumption
        Retry directive
        Named event types
      SSE vs WebSocket Tradeoffs
        Unidirectional simplicity
        Auto-reconnect built-in
        HTTP/2 multiplexing
        Proxy/CDN compatibility
      EventSource API
        Browser-native support
        Custom event handling
        Connection state management
        Error recovery
    Chunked Transfer Encoding
      HTTP Chunked Responses
        Transfer-Encoding chunked header
        Chunk size + data format
        Zero-length terminator
        Trailer headers
      API Gateway Integration
        Lambda streaming response
        Response stream flushing
        Content-Type handling
        Timeout considerations
      Unicode Safety
        Multi-byte character boundaries
        UTF-8 continuation bytes
        Buffer-and-flush strategy
        Character boundary detection
    Latency Optimization
      Time-to-First-Token
        Model warm-up
        Connection reuse
        Request pipelining
        Regional endpoint selection
      Buffering Strategies
        Token batching
        Word-boundary buffering
        Sentence-boundary buffering
        Adaptive buffer sizing
      Infrastructure
        ElastiCache for session pre-fetch
        ECS Fargate placement
        VPC endpoint for Bedrock
        Connection pooling

Architecture Flowchart: MangaAssist Streaming Response Pipeline

flowchart TB
    subgraph Client["Client Layer"]
        Browser["Browser / Mobile App"]
        WSClient["WebSocket Client"]
        SSEClient["EventSource Client"]
    end

    subgraph APIGW["API Gateway"]
        WSAPI["WebSocket API<br/>wss://manga.execute-api..."]
        RESTAPI["REST API<br/>(SSE / Chunked)"]
        ConnMgmt["Connection Management<br/>$connect / $disconnect"]
    end

    subgraph Orchestrator["ECS Fargate — Orchestrator"]
        Router["Request Router"]
        SessionMgr["Session Manager"]
        RAGPipeline["RAG Pipeline"]
        StreamOrch["Stream Orchestrator"]
    end

    subgraph Cache["ElastiCache Redis"]
        SessionCache["Session Cache"]
        ConnRegistry["Connection Registry"]
        TokenBuffer["Token Buffer"]
    end

    subgraph Search["OpenSearch Serverless"]
        VectorSearch["Vector Search<br/>(Manga Knowledge)"]
    end

    subgraph FM["Amazon Bedrock"]
        InvokeStream["InvokeModelWithResponseStream"]
        Claude3S["Claude 3 Sonnet"]
        Claude3H["Claude 3 Haiku"]
    end

    subgraph DDB["DynamoDB"]
        Sessions["Sessions Table"]
        Products["Products Table"]
        Connections["Connections Table"]
    end

    Browser -->|"1. WS Connect"| WSClient
    Browser -->|"1. SSE Connect"| SSEClient
    WSClient -->|"2. wss://"| WSAPI
    SSEClient -->|"2. GET /stream"| RESTAPI
    WSAPI -->|"3. Route"| ConnMgmt
    ConnMgmt -->|"4. Register connectionId"| Connections
    WSAPI -->|"5. Message"| Router
    RESTAPI -->|"5. Request"| Router

    Router -->|"6. Load session"| SessionMgr
    SessionMgr -->|"6a. Cache hit?"| SessionCache
    SessionMgr -->|"6b. Cache miss"| Sessions

    Router -->|"7. Retrieve context"| RAGPipeline
    RAGPipeline -->|"7a. Embed + search"| VectorSearch
    RAGPipeline -->|"7b. Product lookup"| Products

    Router -->|"8. Build prompt"| StreamOrch
    StreamOrch -->|"9. InvokeModelWithResponseStream"| InvokeStream
    InvokeStream -->|"10. Route to model"| Claude3S
    InvokeStream -->|"10. Route to model"| Claude3H

    Claude3S -.->|"11. Token stream"| InvokeStream
    Claude3H -.->|"11. Token stream"| InvokeStream
    InvokeStream -.->|"12. Chunked bytes"| StreamOrch

    StreamOrch -.->|"13a. WS frames"| WSAPI
    StreamOrch -.->|"13b. SSE events"| RESTAPI
    StreamOrch -->|"13c. Buffer tokens"| TokenBuffer

    WSAPI -.->|"14a. Push to client"| WSClient
    RESTAPI -.->|"14b. Push to client"| SSEClient

    StreamOrch -->|"15. Save final response"| Sessions
    StreamOrch -->|"15. Update cache"| SessionCache

    style Client fill:#e1f5fe
    style APIGW fill:#fff3e0
    style Orchestrator fill:#e8f5e9
    style Cache fill:#fce4ec
    style Search fill:#f3e5f5
    style FM fill:#fff9c4
    style DDB fill:#e0f2f1

1. Bedrock InvokeModelWithResponseStream API

1.1 How Streaming Works in Bedrock

Amazon Bedrock provides the InvokeModelWithResponseStream API as the streaming counterpart to InvokeModel. Instead of waiting for the entire response to be generated, this API returns a response stream that delivers tokens incrementally as the model produces them.

Key characteristics:

Aspect Non-Streaming (InvokeModel) Streaming (InvokeModelWithResponseStream)
Response delivery Single payload after full generation Incremental chunks during generation
Time-to-first-byte High (full generation time) Low (first token ready)
Client perception "Thinking..." then full answer Text appearing word by word
Memory on server Buffer entire response Stream through, low memory
Error handling Clear success/failure Partial response possible on error
MangaAssist TTFB 2-4 seconds (Sonnet) 200-500ms (Sonnet first token)

1.2 Stream Event Format for Claude 3

When streaming from Claude 3, the response body is an event stream where each event contains a chunk of the response. The events follow the application/vnd.amazon.eventstream binary format, but the SDK abstracts this into iterable chunks.

Claude 3 streaming event sequence:

message_start        -> { model, usage: { input_tokens } }
content_block_start  -> { index: 0, content_block: { type: "text", text: "" } }
content_block_delta  -> { index: 0, delta: { type: "text_delta", text: "The" } }
content_block_delta  -> { index: 0, delta: { type: "text_delta", text: " manga" } }
content_block_delta  -> { index: 0, delta: { type: "text_delta", text: " you" } }
...
content_block_stop   -> { index: 0 }
message_delta        -> { stop_reason: "end_turn", usage: { output_tokens } }
message_stop         -> {}

1.3 MangaAssist Token Generation Rate

Model Tokens/sec (streaming) Avg response length Full generation time TTFB
Claude 3 Sonnet ~80 tokens/sec 150 tokens ~1.9s ~300ms
Claude 3 Haiku ~150 tokens/sec 100 tokens ~0.7s ~150ms

For MangaAssist, Haiku handles quick lookups ("What volume is this?") while Sonnet handles nuanced recommendations ("Suggest similar dark fantasy manga").


2. WebSocket Implementation with API Gateway

2.1 API Gateway WebSocket API Architecture

API Gateway WebSocket APIs maintain persistent, bidirectional connections. Each connection gets a unique connectionId that the backend uses to push messages back to the client.

Route configuration for MangaAssist:

Route Key Purpose Integration Target
$connect New connection auth + registration Lambda (authorizer + DynamoDB write)
$disconnect Cleanup connection record Lambda (DynamoDB delete + Redis cleanup)
$default Fallback for unmatched routes Lambda (error response)
sendMessage User sends chat message ECS Fargate via VPC Link
ping Client heartbeat Lambda (pong response)

2.2 Connection Lifecycle

Client                    API Gateway              Lambda/ECS              DynamoDB
  |                           |                        |                      |
  |--- WS Upgrade ---------->|                        |                      |
  |                           |--- $connect ---------->|                      |
  |                           |                        |--- Put connectionId->|
  |                           |                        |--- Set Redis TTL --->|
  |<-- 101 Switching ---------|<-- 200 OK ------------|                      |
  |                           |                        |                      |
  |--- sendMessage { ... } -->|                        |                      |
  |                           |--- Route to ECS ------>|                      |
  |                           |                        |--- Bedrock stream -->|
  |<-- { chunk: "The" } ------|<-- POST @connections --|                      |
  |<-- { chunk: " manga" } ---|<-- POST @connections --|                      |
  |<-- { chunk: " ..." } -----|<-- POST @connections --|                      |
  |<-- { done: true } --------|<-- POST @connections --|                      |
  |                           |                        |                      |
  |--- close ---------------->|                        |                      |
  |                           |--- $disconnect ------->|                      |
  |                           |                        |--- Delete record --->|

2.3 Pushing Messages Back via @connections

The backend pushes streaming tokens to the client using the API Gateway Management API's @connections endpoint:

POST https://{api-id}.execute-api.{region}.amazonaws.com/{stage}/@connections/{connectionId}

Important limits:

Limit Value MangaAssist Impact
Frame payload size 128 KB Far exceeds single token push
Idle connection timeout 10 minutes Must implement heartbeat
Connection duration limit 2 hours Reconnect logic needed
Message send rate 500 messages/sec per connection ~80 tokens/sec is well within
Concurrent connections 500 per account (default) Request increase for 1M msgs/day

3. Server-Sent Events Pattern

3.1 SSE Protocol Fundamentals

SSE provides a unidirectional, server-to-client push mechanism over standard HTTP. The client opens a long-lived GET request, and the server sends events as they become available.

HTTP response headers for SSE:

HTTP/1.1 200 OK
Content-Type: text/event-stream
Cache-Control: no-cache
Connection: keep-alive
X-Accel-Buffering: no

Event format:

event: token
id: msg-001-chunk-42
data: {"text": "The manga", "index": 42}

event: token
id: msg-001-chunk-43
data: {"text": " series", "index": 43}

event: done
id: msg-001-final
data: {"finish_reason": "end_turn", "usage": {"output_tokens": 150}}

3.2 SSE vs WebSocket for MangaAssist

Criteria SSE WebSocket
Direction Server -> Client only Bidirectional
Protocol HTTP/1.1 or HTTP/2 Upgrade from HTTP to WS
Reconnect Built-in with Last-Event-ID Manual implementation
Binary data No (text only) Yes
Proxy support Excellent (it is just HTTP) Varies (some proxies block)
Browser support Native EventSource API Native WebSocket API
Max connections per domain 6 (HTTP/1.1), many (HTTP/2) No hard limit
MangaAssist use case Simple streaming read Full chat with typing indicators

MangaAssist recommendation: Use WebSocket as primary (bidirectional chat), fall back to SSE for clients behind restrictive proxies.

3.3 Auto-Reconnect with Last-Event-ID

SSE has a built-in reconnection mechanism. When the connection drops, the browser automatically reconnects and sends the Last-Event-ID header, allowing the server to resume from where it left off.

# Client reconnects after drop
GET /stream/manga-chat HTTP/1.1
Last-Event-ID: msg-001-chunk-42

# Server resumes from chunk 43
event: token
id: msg-001-chunk-43
data: {"text": " series", "index": 43}

4. Chunked Transfer Encoding

4.1 How Chunked Encoding Works

Chunked transfer encoding allows the server to send the response in pieces without knowing the total content length upfront. Each chunk is prefixed with its size in hexadecimal.

HTTP/1.1 200 OK
Transfer-Encoding: chunked
Content-Type: application/json

1a\r\n
{"text":"The manga seri"}\r\n
18\r\n
{"text":"es is called"}\r\n
0\r\n
\r\n

4.2 Lambda Response Streaming

AWS Lambda supports response streaming via Lambda Function URLs and the InvokeWithResponseStream API. This pairs naturally with Bedrock streaming.

Lambda streaming flow for MangaAssist:

Client -> API Gateway -> Lambda Function URL -> Bedrock Stream -> Chunked response back
Feature Standard Lambda Streaming Lambda
Response delivery After function completes During function execution
Max response size 6 MB 20 MB (soft limit)
Billing Full duration Full duration
TTFB Function duration First write to stream
Content-Type Set once Set with metadata
MangaAssist fit Short lookups Long streaming answers

4.3 API Gateway Chunked Response Handling

API Gateway REST APIs buffer the full response before sending to the client. For true chunked streaming, use:

  1. API Gateway WebSocket API (push model)
  2. Lambda Function URLs (bypass API Gateway buffering)
  3. ALB + ECS Fargate (direct chunked HTTP)
  4. CloudFront + Lambda@Edge (edge-streamed)

For MangaAssist, the ECS Fargate orchestrator can send chunked HTTP responses directly through an ALB, bypassing API Gateway's buffering limitation for the streaming path.


5. Token-by-Token Delivery and Buffering Strategies

5.1 Buffering Strategy Comparison

Strategy Description Latency UX Quality MangaAssist Use
No buffering Send every token immediately Lowest Jittery, characters appear one-by-one Not recommended
Token batching Accumulate N tokens, then send Low-medium Smoother, words appear in groups Default (batch of 3-5)
Word boundary Buffer until whitespace detected Medium Natural word-by-word appearance Chat responses
Sentence boundary Buffer until punctuation Higher Full sentences appear at once Manga synopsis
Adaptive Adjust buffer size based on rate Variable Consistent perceived speed Production recommendation
Time-based Flush every N milliseconds Consistent Regular rhythm Fallback strategy

5.2 Adaptive Buffering Algorithm

The adaptive approach adjusts the buffer flush interval based on the token generation rate:

  • Fast generation (>100 tokens/sec): Buffer for 100ms, send word groups
  • Medium generation (50-100 tokens/sec): Buffer for 50ms, send partial words
  • Slow generation (<50 tokens/sec): No buffering, send immediately

This ensures the user always perceives a smooth, consistent stream regardless of backend generation speed.

5.3 Japanese Text Buffering Considerations

MangaAssist serves Japanese content, which introduces special buffering considerations:

Issue Description Solution
No word spaces Japanese has no whitespace between words Buffer by character count, not word boundaries
Multi-byte UTF-8 Japanese chars are 3 bytes in UTF-8 Never split mid-character (check continuation bytes)
Mixed scripts Kanji + hiragana + katakana + romaji Detect script transitions for natural breaks
Particle boundaries は、が、を etc. are natural break points Use particle detection for sentence-feel buffering
Manga titles Often mix of Japanese and English Buffer entire title strings together

6. Code Implementations

6.1 StreamingFMClient

"""
StreamingFMClient — Handles Bedrock InvokeModelWithResponseStream
for MangaAssist real-time response generation.
"""

import json
import time
import logging
from typing import AsyncIterator, Optional, Dict, Any, Callable
from dataclasses import dataclass, field
import boto3
from botocore.config import Config
from botocore.exceptions import ClientError, EventStreamError

logger = logging.getLogger(__name__)


@dataclass
class StreamChunk:
    """Represents a single chunk from the Bedrock stream."""
    text: str
    index: int
    is_first: bool = False
    is_last: bool = False
    stop_reason: Optional[str] = None
    input_tokens: int = 0
    output_tokens: int = 0
    latency_ms: float = 0.0


@dataclass
class StreamMetrics:
    """Tracks streaming performance metrics."""
    request_id: str = ""
    model_id: str = ""
    time_to_first_token_ms: float = 0.0
    total_duration_ms: float = 0.0
    total_tokens: int = 0
    input_tokens: int = 0
    output_tokens: int = 0
    tokens_per_second: float = 0.0
    chunks_sent: int = 0
    errors: list = field(default_factory=list)


class StreamingFMClient:
    """
    Client for streaming responses from Amazon Bedrock Foundation Models.

    Wraps InvokeModelWithResponseStream with:
    - Retry logic for transient failures
    - Metrics collection (TTFT, throughput, token counts)
    - Model-specific response parsing (Claude 3 format)
    - Graceful error handling mid-stream
    - Connection reuse via session management

    Usage:
        client = StreamingFMClient(region="us-east-1")
        async for chunk in client.stream_response(prompt, model="sonnet"):
            send_to_client(chunk.text)
    """

    MODEL_MAP = {
        "sonnet": "anthropic.claude-3-sonnet-20240229-v1:0",
        "haiku": "anthropic.claude-3-haiku-20240307-v1:0",
    }

    # Cost per 1M tokens
    COST_MAP = {
        "sonnet": {"input": 3.00, "output": 15.00},
        "haiku": {"input": 0.25, "output": 1.25},
    }

    def __init__(
        self,
        region: str = "us-east-1",
        max_retries: int = 3,
        connect_timeout: int = 5,
        read_timeout: int = 60,
        max_tokens: int = 1024,
    ):
        self.region = region
        self.max_retries = max_retries
        self.max_tokens = max_tokens

        config = Config(
            region_name=region,
            retries={"max_attempts": max_retries, "mode": "adaptive"},
            connect_timeout=connect_timeout,
            read_timeout=read_timeout,
        )
        self._client = boto3.client("bedrock-runtime", config=config)
        logger.info(f"StreamingFMClient initialized for region={region}")

    def _build_claude3_body(
        self,
        prompt: str,
        system: Optional[str] = None,
        max_tokens: Optional[int] = None,
        temperature: float = 0.7,
        top_p: float = 0.9,
        stop_sequences: Optional[list] = None,
    ) -> str:
        """Build the request body for Claude 3 Messages API format."""
        body = {
            "anthropic_version": "bedrock-2023-05-31",
            "max_tokens": max_tokens or self.max_tokens,
            "temperature": temperature,
            "top_p": top_p,
            "messages": [
                {"role": "user", "content": prompt}
            ],
        }
        if system:
            body["system"] = system
        if stop_sequences:
            body["stop_sequences"] = stop_sequences
        return json.dumps(body)

    def _resolve_model_id(self, model: str) -> str:
        """Resolve friendly model name to full Bedrock model ID."""
        if model in self.MODEL_MAP:
            return self.MODEL_MAP[model]
        return model  # Assume it is already a full model ID

    async def stream_response(
        self,
        prompt: str,
        model: str = "haiku",
        system: Optional[str] = None,
        max_tokens: Optional[int] = None,
        temperature: float = 0.7,
        on_metrics: Optional[Callable[[StreamMetrics], None]] = None,
    ) -> AsyncIterator[StreamChunk]:
        """
        Stream a response from Bedrock, yielding StreamChunk objects.

        Args:
            prompt: User message text
            model: "sonnet", "haiku", or full model ID
            system: Optional system prompt
            max_tokens: Override default max tokens
            temperature: Sampling temperature
            on_metrics: Callback invoked with final metrics

        Yields:
            StreamChunk objects with incremental text
        """
        model_id = self._resolve_model_id(model)
        body = self._build_claude3_body(prompt, system, max_tokens, temperature)
        metrics = StreamMetrics(model_id=model_id)

        start_time = time.monotonic()
        first_token_time = None
        chunk_index = 0
        accumulated_text = ""

        try:
            response = self._client.invoke_model_with_response_stream(
                modelId=model_id,
                contentType="application/json",
                accept="application/json",
                body=body,
            )
            metrics.request_id = response.get("ResponseMetadata", {}).get("RequestId", "")

            stream = response.get("body")
            if not stream:
                raise RuntimeError("No response body stream returned from Bedrock")

            for event in stream:
                chunk_data = event.get("chunk")
                if not chunk_data:
                    continue

                chunk_bytes = chunk_data.get("bytes", b"")
                chunk_json = json.loads(chunk_bytes.decode("utf-8"))
                event_type = chunk_json.get("type", "")

                # --- message_start: capture input token count ---
                if event_type == "message_start":
                    message = chunk_json.get("message", {})
                    usage = message.get("usage", {})
                    metrics.input_tokens = usage.get("input_tokens", 0)

                # --- content_block_delta: the actual text tokens ---
                elif event_type == "content_block_delta":
                    delta = chunk_json.get("delta", {})
                    text = delta.get("text", "")
                    if not text:
                        continue

                    now = time.monotonic()
                    if first_token_time is None:
                        first_token_time = now
                        metrics.time_to_first_token_ms = (now - start_time) * 1000

                    accumulated_text += text

                    chunk = StreamChunk(
                        text=text,
                        index=chunk_index,
                        is_first=(chunk_index == 0),
                        latency_ms=(now - start_time) * 1000,
                    )
                    chunk_index += 1
                    yield chunk

                # --- message_delta: stop reason and output tokens ---
                elif event_type == "message_delta":
                    delta = chunk_json.get("delta", {})
                    usage = chunk_json.get("usage", {})
                    stop_reason = delta.get("stop_reason")
                    metrics.output_tokens = usage.get("output_tokens", 0)

                    final_chunk = StreamChunk(
                        text="",
                        index=chunk_index,
                        is_last=True,
                        stop_reason=stop_reason,
                        input_tokens=metrics.input_tokens,
                        output_tokens=metrics.output_tokens,
                        latency_ms=(time.monotonic() - start_time) * 1000,
                    )
                    yield final_chunk

                # --- message_stop: stream complete ---
                elif event_type == "message_stop":
                    break

        except EventStreamError as e:
            logger.error(f"Stream event error: {e}")
            metrics.errors.append(str(e))
            raise
        except ClientError as e:
            error_code = e.response["Error"]["Code"]
            logger.error(f"Bedrock client error: {error_code} - {e}")
            metrics.errors.append(f"{error_code}: {e}")
            raise
        finally:
            end_time = time.monotonic()
            metrics.total_duration_ms = (end_time - start_time) * 1000
            metrics.total_tokens = metrics.input_tokens + metrics.output_tokens
            metrics.chunks_sent = chunk_index
            if metrics.total_duration_ms > 0:
                metrics.tokens_per_second = (
                    metrics.output_tokens / (metrics.total_duration_ms / 1000)
                )
            if on_metrics:
                on_metrics(metrics)
            logger.info(
                f"Stream complete: model={model_id}, "
                f"TTFT={metrics.time_to_first_token_ms:.0f}ms, "
                f"total={metrics.total_duration_ms:.0f}ms, "
                f"tokens={metrics.output_tokens}, "
                f"rate={metrics.tokens_per_second:.1f} tok/s"
            )

    def estimate_cost(self, metrics: StreamMetrics) -> Dict[str, float]:
        """Estimate the cost of a streaming request based on collected metrics."""
        model_key = None
        for key, model_id in self.MODEL_MAP.items():
            if model_id == metrics.model_id:
                model_key = key
                break

        if not model_key or model_key not in self.COST_MAP:
            return {"input_cost": 0.0, "output_cost": 0.0, "total_cost": 0.0}

        costs = self.COST_MAP[model_key]
        input_cost = (metrics.input_tokens / 1_000_000) * costs["input"]
        output_cost = (metrics.output_tokens / 1_000_000) * costs["output"]

        return {
            "input_cost": input_cost,
            "output_cost": output_cost,
            "total_cost": input_cost + output_cost,
        }

6.2 WebSocketStreamHandler

"""
WebSocketStreamHandler — Bridges Bedrock streaming to API Gateway WebSocket
connections for MangaAssist real-time chat.
"""

import json
import time
import asyncio
import logging
from typing import Optional, Dict, Any
from dataclasses import dataclass
import boto3
from botocore.exceptions import ClientError

logger = logging.getLogger(__name__)


@dataclass
class WebSocketConfig:
    """Configuration for the WebSocket stream handler."""
    api_id: str
    stage: str
    region: str = "us-east-1"
    max_frame_size: int = 32_768          # 32 KB per frame (conservative under 128 KB limit)
    send_rate_limit: int = 400            # Messages per second (under 500 limit)
    heartbeat_interval_sec: int = 300     # 5 minutes (under 10-min idle timeout)
    connection_ttl_sec: int = 7000        # ~1.9 hours (under 2-hour limit)
    flush_interval_ms: int = 50           # Minimum ms between sends


class WebSocketStreamHandler:
    """
    Handles pushing Bedrock streaming tokens to clients via API Gateway
    WebSocket @connections endpoint.

    Features:
    - Token buffering with configurable flush intervals
    - Automatic heartbeat to prevent idle timeouts
    - Connection state validation before sends
    - Graceful handling of stale/disconnected connections
    - Rate limiting to stay within API Gateway limits
    - Frame size enforcement

    Usage:
        handler = WebSocketStreamHandler(config)
        async for chunk in bedrock_stream:
            await handler.send_token(connection_id, chunk.text)
        await handler.send_complete(connection_id)
    """

    def __init__(self, config: WebSocketConfig):
        self.config = config
        self._endpoint = (
            f"https://{config.api_id}.execute-api."
            f"{config.region}.amazonaws.com/{config.stage}"
        )
        self._apigw_client = boto3.client(
            "apigatewaymanagementapi",
            endpoint_url=self._endpoint,
            region_name=config.region,
        )
        self._send_timestamps: Dict[str, list] = {}  # connection_id -> [timestamps]
        self._active_connections: Dict[str, float] = {}  # connection_id -> last_send_time
        logger.info(f"WebSocketStreamHandler initialized: endpoint={self._endpoint}")

    async def send_token(
        self,
        connection_id: str,
        text: str,
        chunk_index: int = 0,
        is_first: bool = False,
    ) -> bool:
        """
        Send a token chunk to a specific WebSocket connection.

        Args:
            connection_id: API Gateway WebSocket connection ID
            text: Token text to send
            chunk_index: Sequential index of this chunk
            is_first: Whether this is the first chunk (triggers typing indicator)

        Returns:
            True if sent successfully, False if connection is stale
        """
        payload = {
            "action": "token",
            "data": {
                "text": text,
                "index": chunk_index,
                "timestamp": int(time.time() * 1000),
            },
        }
        if is_first:
            payload["data"]["is_first"] = True

        return await self._post_to_connection(connection_id, payload)

    async def send_complete(
        self,
        connection_id: str,
        stop_reason: str = "end_turn",
        usage: Optional[Dict[str, int]] = None,
        metadata: Optional[Dict[str, Any]] = None,
    ) -> bool:
        """Send stream completion signal to the client."""
        payload = {
            "action": "stream_complete",
            "data": {
                "stop_reason": stop_reason,
                "timestamp": int(time.time() * 1000),
            },
        }
        if usage:
            payload["data"]["usage"] = usage
        if metadata:
            payload["data"]["metadata"] = metadata

        return await self._post_to_connection(connection_id, payload)

    async def send_error(
        self,
        connection_id: str,
        error_code: str,
        message: str,
        retryable: bool = False,
    ) -> bool:
        """Send error notification to the client."""
        payload = {
            "action": "error",
            "data": {
                "code": error_code,
                "message": message,
                "retryable": retryable,
                "timestamp": int(time.time() * 1000),
            },
        }
        return await self._post_to_connection(connection_id, payload)

    async def send_heartbeat(self, connection_id: str) -> bool:
        """Send heartbeat to keep connection alive."""
        payload = {"action": "heartbeat", "timestamp": int(time.time() * 1000)}
        return await self._post_to_connection(connection_id, payload)

    async def _post_to_connection(
        self, connection_id: str, payload: Dict[str, Any]
    ) -> bool:
        """
        Post data to a WebSocket connection via @connections API.
        Handles stale connections, rate limiting, and frame size.
        """
        data_bytes = json.dumps(payload).encode("utf-8")

        # Enforce frame size limit
        if len(data_bytes) > self.config.max_frame_size:
            logger.warning(
                f"Frame too large ({len(data_bytes)} bytes) for "
                f"connection {connection_id}, truncating"
            )
            # Truncate text content, preserve structure
            if "data" in payload and "text" in payload["data"]:
                max_text = self.config.max_frame_size - 200  # headroom for JSON wrapper
                payload["data"]["text"] = payload["data"]["text"][:max_text]
                payload["data"]["truncated"] = True
                data_bytes = json.dumps(payload).encode("utf-8")

        # Rate limiting
        await self._enforce_rate_limit(connection_id)

        try:
            self._apigw_client.post_to_connection(
                ConnectionId=connection_id,
                Data=data_bytes,
            )
            self._active_connections[connection_id] = time.time()
            return True

        except ClientError as e:
            error_code = e.response["Error"]["Code"]
            if error_code == "GoneException":
                logger.info(f"Connection {connection_id} is gone (client disconnected)")
                self._cleanup_connection(connection_id)
                return False
            elif error_code == "LimitExceededException":
                logger.warning(f"Rate limit exceeded for {connection_id}, backing off")
                await asyncio.sleep(0.1)
                return await self._post_to_connection(connection_id, payload)
            else:
                logger.error(f"Error posting to {connection_id}: {error_code} - {e}")
                raise

    async def _enforce_rate_limit(self, connection_id: str) -> None:
        """Enforce per-connection message rate limit."""
        now = time.time()
        if connection_id not in self._send_timestamps:
            self._send_timestamps[connection_id] = []

        timestamps = self._send_timestamps[connection_id]
        # Remove timestamps older than 1 second
        timestamps[:] = [t for t in timestamps if now - t < 1.0]

        if len(timestamps) >= self.config.send_rate_limit:
            sleep_time = 1.0 - (now - timestamps[0])
            if sleep_time > 0:
                await asyncio.sleep(sleep_time)

        timestamps.append(now)

    def _cleanup_connection(self, connection_id: str) -> None:
        """Clean up tracking data for a disconnected connection."""
        self._send_timestamps.pop(connection_id, None)
        self._active_connections.pop(connection_id, None)

    def is_connection_active(self, connection_id: str) -> bool:
        """Check if a connection is still considered active."""
        if connection_id not in self._active_connections:
            return False
        last_send = self._active_connections[connection_id]
        return (time.time() - last_send) < self.config.connection_ttl_sec

6.3 SSEResponseManager

"""
SSEResponseManager — Manages Server-Sent Events streaming for MangaAssist
as a fallback transport when WebSocket is unavailable.
"""

import json
import time
import asyncio
import logging
from typing import AsyncIterator, Optional, Dict, Any, Callable
from dataclasses import dataclass, field
from enum import Enum

logger = logging.getLogger(__name__)


class SSEEventType(Enum):
    """Standard SSE event types for MangaAssist streaming."""
    TOKEN = "token"
    DONE = "done"
    ERROR = "error"
    HEARTBEAT = "heartbeat"
    METADATA = "metadata"
    RETRY = "retry"


@dataclass
class SSEEvent:
    """Represents a single Server-Sent Event."""
    event_type: SSEEventType
    data: Dict[str, Any]
    event_id: Optional[str] = None
    retry_ms: Optional[int] = None

    def serialize(self) -> str:
        """Serialize this event to SSE wire format."""
        lines = []
        if self.event_type != SSEEventType.HEARTBEAT:
            lines.append(f"event: {self.event_type.value}")
        if self.event_id:
            lines.append(f"id: {self.event_id}")
        if self.retry_ms is not None:
            lines.append(f"retry: {self.retry_ms}")
        data_str = json.dumps(self.data)
        lines.append(f"data: {data_str}")
        lines.append("")  # Blank line terminates the event
        lines.append("")
        return "\n".join(lines)


@dataclass
class SSESessionState:
    """Tracks state for an SSE connection for resume support."""
    session_id: str
    last_event_id: str = ""
    chunk_index: int = 0
    accumulated_text: str = ""
    started_at: float = field(default_factory=time.time)
    last_sent_at: float = 0.0
    is_complete: bool = False


class SSEResponseManager:
    """
    Manages SSE streaming responses for MangaAssist.

    Features:
    - Event ID generation for client resume (Last-Event-ID)
    - Heartbeat events to prevent proxy/load-balancer timeouts
    - Session state tracking for reconnection
    - Retry directive management
    - Clean connection closure
    - Metrics collection

    Usage:
        manager = SSEResponseManager()
        session = manager.create_session("msg-001")

        async def generate():
            headers = manager.get_sse_headers()
            yield manager.format_retry_directive(3000)
            async for chunk in bedrock_stream:
                event = manager.create_token_event(session, chunk.text)
                yield event.serialize()
            yield manager.create_done_event(session).serialize()
    """

    def __init__(
        self,
        heartbeat_interval_sec: float = 15.0,
        default_retry_ms: int = 3000,
        max_reconnect_window_sec: int = 30,
    ):
        self.heartbeat_interval_sec = heartbeat_interval_sec
        self.default_retry_ms = default_retry_ms
        self.max_reconnect_window_sec = max_reconnect_window_sec
        self._sessions: Dict[str, SSESessionState] = {}

    @staticmethod
    def get_sse_headers() -> Dict[str, str]:
        """Return the required HTTP headers for an SSE response."""
        return {
            "Content-Type": "text/event-stream",
            "Cache-Control": "no-cache, no-store, must-revalidate",
            "Connection": "keep-alive",
            "X-Accel-Buffering": "no",  # Disable nginx buffering
            "Access-Control-Allow-Origin": "*",
            "Transfer-Encoding": "chunked",
        }

    def create_session(self, session_id: str) -> SSESessionState:
        """Create a new SSE session for tracking."""
        session = SSESessionState(session_id=session_id)
        self._sessions[session_id] = session
        logger.info(f"SSE session created: {session_id}")
        return session

    def get_session(self, session_id: str) -> Optional[SSESessionState]:
        """Retrieve an existing session."""
        return self._sessions.get(session_id)

    def create_token_event(
        self, session: SSESessionState, text: str
    ) -> SSEEvent:
        """Create a token event from streaming text."""
        event_id = f"{session.session_id}-{session.chunk_index}"
        session.chunk_index += 1
        session.accumulated_text += text
        session.last_event_id = event_id
        session.last_sent_at = time.time()

        return SSEEvent(
            event_type=SSEEventType.TOKEN,
            data={"text": text, "index": session.chunk_index - 1},
            event_id=event_id,
        )

    def create_done_event(
        self,
        session: SSESessionState,
        stop_reason: str = "end_turn",
        usage: Optional[Dict[str, int]] = None,
    ) -> SSEEvent:
        """Create the stream completion event."""
        session.is_complete = True
        event_id = f"{session.session_id}-done"

        data = {
            "stop_reason": stop_reason,
            "total_chunks": session.chunk_index,
        }
        if usage:
            data["usage"] = usage

        return SSEEvent(
            event_type=SSEEventType.DONE,
            data=data,
            event_id=event_id,
        )

    def create_error_event(
        self,
        session: SSESessionState,
        error_code: str,
        message: str,
        retryable: bool = False,
    ) -> SSEEvent:
        """Create an error event."""
        return SSEEvent(
            event_type=SSEEventType.ERROR,
            data={
                "code": error_code,
                "message": message,
                "retryable": retryable,
            },
            event_id=f"{session.session_id}-error",
            retry_ms=self.default_retry_ms if retryable else None,
        )

    def create_heartbeat_event(self) -> SSEEvent:
        """Create a heartbeat event (comment-style keepalive)."""
        return SSEEvent(
            event_type=SSEEventType.HEARTBEAT,
            data={"ts": int(time.time() * 1000)},
        )

    def can_resume(
        self, session_id: str, last_event_id: str
    ) -> Optional[int]:
        """
        Check if a session can be resumed from the given Last-Event-ID.

        Returns the chunk index to resume from, or None if not resumable.
        """
        session = self._sessions.get(session_id)
        if not session:
            return None

        if session.is_complete:
            return None

        elapsed = time.time() - session.last_sent_at
        if elapsed > self.max_reconnect_window_sec:
            logger.info(f"Session {session_id} reconnect window expired ({elapsed:.1f}s)")
            return None

        # Parse the chunk index from the last event ID
        try:
            parts = last_event_id.rsplit("-", 1)
            resume_index = int(parts[-1]) + 1
            return resume_index
        except (ValueError, IndexError):
            return None

    async def heartbeat_loop(
        self,
        send_fn: Callable[[str], Any],
        stop_event: asyncio.Event,
    ) -> None:
        """
        Run a heartbeat loop that sends keepalive events.

        Args:
            send_fn: Async function that sends a string to the client
            stop_event: Event to signal loop termination
        """
        while not stop_event.is_set():
            try:
                await asyncio.wait_for(
                    stop_event.wait(),
                    timeout=self.heartbeat_interval_sec,
                )
                break  # stop_event was set
            except asyncio.TimeoutError:
                heartbeat = self.create_heartbeat_event()
                await send_fn(heartbeat.serialize())

    def cleanup_session(self, session_id: str) -> None:
        """Remove session state after connection closes."""
        self._sessions.pop(session_id, None)
        logger.info(f"SSE session cleaned up: {session_id}")

6.4 ChunkBufferManager

"""
ChunkBufferManager — Implements intelligent token buffering strategies
for smooth real-time text delivery in MangaAssist.
"""

import time
import logging
import unicodedata
from typing import Optional, List, Callable, Awaitable
from dataclasses import dataclass, field
from enum import Enum

logger = logging.getLogger(__name__)


class BufferStrategy(Enum):
    """Available buffering strategies."""
    NONE = "none"                      # Send every token immediately
    TOKEN_BATCH = "token_batch"        # Batch N tokens
    WORD_BOUNDARY = "word_boundary"    # Buffer to word boundaries
    SENTENCE_BOUNDARY = "sentence"     # Buffer to sentence boundaries
    TIME_BASED = "time_based"          # Flush every N milliseconds
    ADAPTIVE = "adaptive"             # Adjust based on generation rate
    JAPANESE_AWARE = "japanese_aware"  # Handles JP text without word spaces


@dataclass
class BufferConfig:
    """Configuration for the buffer manager."""
    strategy: BufferStrategy = BufferStrategy.ADAPTIVE
    token_batch_size: int = 4               # For TOKEN_BATCH strategy
    time_flush_ms: int = 80                 # For TIME_BASED strategy
    adaptive_fast_threshold: float = 100.0  # tokens/sec
    adaptive_slow_threshold: float = 50.0   # tokens/sec
    max_buffer_size: int = 512              # Max chars before forced flush
    jp_char_batch_size: int = 6             # Characters per flush for Japanese


@dataclass
class BufferState:
    """Internal state of the buffer."""
    buffer: str = ""
    token_count: int = 0
    total_tokens: int = 0
    first_token_time: float = 0.0
    last_token_time: float = 0.0
    last_flush_time: float = 0.0
    flush_count: int = 0
    current_rate: float = 0.0  # tokens per second


# Japanese punctuation and particles that make good break points
JP_BREAK_CHARS = set("。、!?」』)】〉》~…・\n")
JP_PARTICLES = {"は", "が", "を", "に", "で", "と", "の", "へ", "も", "や", "か", "ね", "よ", "ぞ"}


class ChunkBufferManager:
    """
    Intelligent token buffering for smooth real-time text delivery.

    Strategies:
    - NONE: Zero buffering, immediate forward of every token
    - TOKEN_BATCH: Accumulate N tokens then flush
    - WORD_BOUNDARY: Buffer until whitespace (English/romaji text)
    - SENTENCE_BOUNDARY: Buffer until sentence-ending punctuation
    - TIME_BASED: Flush at fixed time intervals regardless of tokens
    - ADAPTIVE: Dynamically switch strategy based on generation speed
    - JAPANESE_AWARE: Special handling for Japanese text (no word spaces)

    The ADAPTIVE strategy is recommended for production MangaAssist use.
    It monitors the token generation rate and adjusts:
    - Fast (>100 tok/s): Word-boundary buffering for smooth groups
    - Medium (50-100 tok/s): Small token batches (2-3 tokens)
    - Slow (<50 tok/s): No buffering, forward immediately

    Usage:
        buffer = ChunkBufferManager(BufferConfig(strategy=BufferStrategy.ADAPTIVE))

        async for chunk in bedrock_stream:
            flushed = await buffer.add_token(chunk.text, send_fn)
            # send_fn is called when buffer flushes

        await buffer.flush_remaining(send_fn)
    """

    def __init__(self, config: Optional[BufferConfig] = None):
        self.config = config or BufferConfig()
        self._state = BufferState()

    async def add_token(
        self,
        text: str,
        send_fn: Callable[[str], Awaitable[None]],
    ) -> bool:
        """
        Add a token to the buffer. May trigger a flush via send_fn.

        Args:
            text: The token text to buffer
            send_fn: Async function to call when buffer flushes

        Returns:
            True if a flush occurred, False otherwise
        """
        now = time.monotonic()
        self._state.token_count += 1
        self._state.total_tokens += 1
        self._state.buffer += text

        if self._state.first_token_time == 0:
            self._state.first_token_time = now
            self._state.last_flush_time = now

        self._state.last_token_time = now
        self._update_rate(now)

        # Force flush if buffer exceeds max size
        if len(self._state.buffer) >= self.config.max_buffer_size:
            await self._flush(send_fn)
            return True

        strategy = self.config.strategy
        if strategy == BufferStrategy.ADAPTIVE:
            strategy = self._select_adaptive_strategy()

        should_flush = self._check_flush(strategy, now)
        if should_flush:
            await self._flush(send_fn)
            return True

        return False

    async def flush_remaining(
        self, send_fn: Callable[[str], Awaitable[None]]
    ) -> None:
        """Flush any remaining buffered text (call at end of stream)."""
        if self._state.buffer:
            await self._flush(send_fn)

    def get_stats(self) -> dict:
        """Return buffer performance statistics."""
        elapsed = self._state.last_token_time - self._state.first_token_time
        return {
            "total_tokens": self._state.total_tokens,
            "flush_count": self._state.flush_count,
            "avg_tokens_per_flush": (
                self._state.total_tokens / max(self._state.flush_count, 1)
            ),
            "current_rate_tok_s": self._state.current_rate,
            "elapsed_sec": elapsed,
            "strategy": self.config.strategy.value,
        }

    def _update_rate(self, now: float) -> None:
        """Update the current token generation rate estimate."""
        elapsed = now - self._state.first_token_time
        if elapsed > 0.1:  # Need at least 100ms of data
            self._state.current_rate = self._state.total_tokens / elapsed

    def _select_adaptive_strategy(self) -> BufferStrategy:
        """Select the best strategy based on current generation rate."""
        rate = self._state.current_rate

        # Check if content is primarily Japanese
        if self._is_japanese_content():
            return BufferStrategy.JAPANESE_AWARE

        if rate > self.config.adaptive_fast_threshold:
            return BufferStrategy.WORD_BOUNDARY
        elif rate > self.config.adaptive_slow_threshold:
            return BufferStrategy.TOKEN_BATCH
        else:
            return BufferStrategy.NONE

    def _is_japanese_content(self) -> bool:
        """Check if the current buffer contains primarily Japanese text."""
        if not self._state.buffer:
            return False
        jp_chars = sum(
            1 for ch in self._state.buffer
            if unicodedata.category(ch).startswith("Lo")  # Letter, other (CJK)
            or "\u3040" <= ch <= "\u309f"   # Hiragana
            or "\u30a0" <= ch <= "\u30ff"   # Katakana
        )
        return jp_chars > len(self._state.buffer) * 0.3

    def _check_flush(self, strategy: BufferStrategy, now: float) -> bool:
        """Determine if the buffer should be flushed given the current strategy."""
        buf = self._state.buffer

        if strategy == BufferStrategy.NONE:
            return True

        elif strategy == BufferStrategy.TOKEN_BATCH:
            return self._state.token_count >= self.config.token_batch_size

        elif strategy == BufferStrategy.WORD_BOUNDARY:
            return len(buf) > 0 and buf[-1] in (" ", "\n", "\t")

        elif strategy == BufferStrategy.SENTENCE_BOUNDARY:
            if not buf:
                return False
            return buf[-1] in ".!?\n" or buf.endswith("...") or buf[-1] in JP_BREAK_CHARS

        elif strategy == BufferStrategy.TIME_BASED:
            elapsed_ms = (now - self._state.last_flush_time) * 1000
            return elapsed_ms >= self.config.time_flush_ms

        elif strategy == BufferStrategy.JAPANESE_AWARE:
            return self._check_japanese_flush(buf)

        return False

    def _check_japanese_flush(self, buf: str) -> bool:
        """Check if buffer should flush for Japanese text."""
        if not buf:
            return False

        # Flush on Japanese punctuation
        if buf[-1] in JP_BREAK_CHARS:
            return True

        # Flush on particle boundaries (particle followed by next char)
        if len(buf) >= 2 and buf[-2] in JP_PARTICLES:
            return True

        # Flush by character count (since no word boundaries)
        jp_chars_since_flush = sum(
            1 for ch in buf
            if ord(ch) > 0x2FFF  # Rough CJK range
        )
        return jp_chars_since_flush >= self.config.jp_char_batch_size

    async def _flush(self, send_fn: Callable[[str], Awaitable[None]]) -> None:
        """Flush the buffer contents via send_fn."""
        text = self._state.buffer
        if not text:
            return

        await send_fn(text)
        self._state.buffer = ""
        self._state.token_count = 0
        self._state.flush_count += 1
        self._state.last_flush_time = time.monotonic()

    def reset(self) -> None:
        """Reset buffer state for a new stream."""
        self._state = BufferState()

7. Latency Comparison Tables

7.1 Transport Mechanism Latency

Transport Connection Setup TTFB Overhead Per-Token Overhead Reconnect Time Best For
WebSocket (API GW) ~100ms (upgrade) ~5ms ~2ms ~150ms Primary MangaAssist chat
SSE (REST API) ~50ms (HTTP GET) ~10ms ~3ms ~50ms (auto) Fallback transport
Chunked HTTP (ALB) ~30ms ~8ms ~1ms ~30ms High-throughput batch
Lambda Streaming ~80ms (cold)/~10ms (warm) ~15ms ~2ms N/A (new request) Serverless streaming
Long Polling ~30ms ~full generation ~0ms (single response) ~30ms Legacy clients

7.2 End-to-End Latency Breakdown (MangaAssist)

Stage Sonnet Haiku Notes
WebSocket frame receipt 5ms 5ms API Gateway processing
Auth + session lookup 15ms 15ms Redis cache hit
RAG context retrieval 120ms 120ms OpenSearch vector search
Prompt construction 5ms 5ms Template + context assembly
Bedrock TTFT 300ms 150ms Time to first token
Token streaming (avg response) 1,875ms (150 tok) 667ms (100 tok) Model generation
Per-token delivery overhead 2ms 2ms WS frame send
Total (first token visible) ~445ms ~295ms User sees text start
Total (complete response) ~2,320ms ~960ms Full answer delivered

7.3 Buffering Strategy Impact on Perceived Latency

Strategy Tokens Per Flush Perceived TTFB (added) Smoothness (1-5) Recommended For
None 1 +0ms 2 (jittery) Debug / testing
Token batch (3) 3 +30-40ms 3 General English
Token batch (5) 5 +50-65ms 4 Longer English answers
Word boundary 1-4 +20-60ms 4 English chat
Sentence boundary 10-30 +200-500ms 5 Synopsis/description
Japanese-aware (6 chars) 2-6 +30-80ms 4 Japanese content
Adaptive Variable +10-60ms 5 Production (recommended)

7.4 Scale Projections

Metric Current Target Infrastructure Impact
Messages per day 1,000,000 ~11.6 messages/sec average
Peak messages/sec (3x avg) ~35 ~35 concurrent Bedrock streams
Concurrent WebSocket connections ~50,000 Default limit: 500 (request increase)
Bedrock stream throughput ~35 parallel On-demand quota sufficient
ElastiCache connection pool ~200 per ECS task r6g.large supports 65,000 connections
DynamoDB session writes/sec ~70 (writes at start+end) On-demand capacity handles this

7.5 Cost Per Message (Streaming)

Model Avg Input Tokens Avg Output Tokens Input Cost Output Cost Total per Message
Claude 3 Haiku 500 100 $0.000125 $0.000125 $0.000250
Claude 3 Sonnet 500 150 $0.001500 $0.002250 $0.003750
Model Daily Cost (1M msgs, 100% this model) Monthly Cost
Haiku only $250 $7,500
Sonnet only $3,750 $112,500
80% Haiku / 20% Sonnet $950 $28,500

8. Decision Matrix: Choosing the Right Transport

                    ┌─────────────────────────────────────────────────┐
                    │           Does the client need to send          │
                    │           messages during streaming?            │
                    └──────────────┬──────────────┬───────────────────┘
                                   │              │
                              Yes  │              │  No
                                   ▼              ▼
                    ┌──────────────────┐  ┌──────────────────────┐
                    │   WebSocket      │  │  Is HTTP/2 available? │
                    │   (Primary)      │  └─────┬───────┬────────┘
                    └──────────────────┘        │       │
                                           Yes  │       │  No
                                                ▼       ▼
                                 ┌──────────────┐ ┌──────────────────┐
                                 │     SSE      │ │ Chunked HTTP     │
                                 │ (Preferred)  │ │ (Fallback)       │
                                 └──────────────┘ └──────────────────┘

MangaAssist final architecture decision: - Primary: WebSocket via API Gateway (bidirectional chat, typing indicators, heartbeats) - Fallback: SSE over ALB (clients behind WebSocket-blocking proxies) - Batch/Internal: Chunked HTTP via ALB (admin tools, analytics pipelines)


9. Summary

Real-time AI interaction in MangaAssist depends on four pillars working together:

  1. Bedrock InvokeModelWithResponseStream produces tokens incrementally from Claude 3, enabling sub-500ms time-to-first-token instead of waiting 2+ seconds for full generation.

  2. API Gateway WebSocket provides bidirectional, persistent connections for the primary chat experience, with connection management in DynamoDB and heartbeats via ElastiCache Redis.

  3. Server-Sent Events offer a simpler, unidirectional fallback with built-in reconnection and Last-Event-ID resume support for clients behind restrictive proxies.

  4. Chunked transfer encoding enables streaming HTTP responses for internal services and batch operations, with careful handling of UTF-8 multi-byte boundaries for Japanese content.

The adaptive buffering strategy ties it all together, dynamically adjusting token delivery based on generation speed and content language to ensure a smooth, responsive user experience at 1M messages/day scale.