Real-Time AI Interaction Architecture
MangaAssist context: JP Manga store chatbot on AWS — Bedrock Claude 3 (Sonnet at $3/$15 per 1M tokens input/output, Haiku at $0.25/$1.25), OpenSearch Serverless (vector store), DynamoDB (sessions/products), ECS Fargate (orchestrator), API Gateway WebSocket, ElastiCache Redis. Target: useful answer in under 3 seconds, 1M messages/day scale.
Skill Mapping
| Field | Value |
|---|---|
| Certification | AWS Certified AI Practitioner (AIP-C01) |
| Domain | 2 — Implementation and Integration of Foundation Models |
| Task | 2.4 — Design and implement interaction mechanisms for FM-based applications |
| Skill | 2.4.2 — Develop real-time AI interaction systems to provide immediate feedback from FM |
| Focus Areas | Bedrock streaming APIs for incremental response delivery, WebSockets/SSE for real-time text generation, API Gateway for chunked transfer encoding |
Real-Time AI Interaction Mindmap
mindmap
root((Real-Time AI
Interaction))
Streaming APIs
Bedrock InvokeModelWithResponseStream
Chunked response body
Server-side token generation
Event stream format
Partial JSON payloads
Response Stream Processing
Byte-level parsing
Token accumulation
Delta extraction
End-of-stream detection
Model-Specific Streaming
Claude 3 streaming format
Content block deltas
Stop reason signaling
Usage metrics in final chunk
WebSocket Transport
API Gateway WebSocket API
$connect route
$disconnect route
$default route
Custom routes
Connection Management
Connection ID tracking
DynamoDB connection store
Heartbeat / ping-pong
Idle timeout handling
Bidirectional Communication
Client-to-server messages
Server-to-client push
Binary frame support
Message fragmentation
Server-Sent Events
HTTP/1.1 SSE Protocol
text/event-stream content type
Event ID for resumption
Retry directive
Named event types
SSE vs WebSocket Tradeoffs
Unidirectional simplicity
Auto-reconnect built-in
HTTP/2 multiplexing
Proxy/CDN compatibility
EventSource API
Browser-native support
Custom event handling
Connection state management
Error recovery
Chunked Transfer Encoding
HTTP Chunked Responses
Transfer-Encoding chunked header
Chunk size + data format
Zero-length terminator
Trailer headers
API Gateway Integration
Lambda streaming response
Response stream flushing
Content-Type handling
Timeout considerations
Unicode Safety
Multi-byte character boundaries
UTF-8 continuation bytes
Buffer-and-flush strategy
Character boundary detection
Latency Optimization
Time-to-First-Token
Model warm-up
Connection reuse
Request pipelining
Regional endpoint selection
Buffering Strategies
Token batching
Word-boundary buffering
Sentence-boundary buffering
Adaptive buffer sizing
Infrastructure
ElastiCache for session pre-fetch
ECS Fargate placement
VPC endpoint for Bedrock
Connection pooling
Architecture Flowchart: MangaAssist Streaming Response Pipeline
flowchart TB
subgraph Client["Client Layer"]
Browser["Browser / Mobile App"]
WSClient["WebSocket Client"]
SSEClient["EventSource Client"]
end
subgraph APIGW["API Gateway"]
WSAPI["WebSocket API<br/>wss://manga.execute-api..."]
RESTAPI["REST API<br/>(SSE / Chunked)"]
ConnMgmt["Connection Management<br/>$connect / $disconnect"]
end
subgraph Orchestrator["ECS Fargate — Orchestrator"]
Router["Request Router"]
SessionMgr["Session Manager"]
RAGPipeline["RAG Pipeline"]
StreamOrch["Stream Orchestrator"]
end
subgraph Cache["ElastiCache Redis"]
SessionCache["Session Cache"]
ConnRegistry["Connection Registry"]
TokenBuffer["Token Buffer"]
end
subgraph Search["OpenSearch Serverless"]
VectorSearch["Vector Search<br/>(Manga Knowledge)"]
end
subgraph FM["Amazon Bedrock"]
InvokeStream["InvokeModelWithResponseStream"]
Claude3S["Claude 3 Sonnet"]
Claude3H["Claude 3 Haiku"]
end
subgraph DDB["DynamoDB"]
Sessions["Sessions Table"]
Products["Products Table"]
Connections["Connections Table"]
end
Browser -->|"1. WS Connect"| WSClient
Browser -->|"1. SSE Connect"| SSEClient
WSClient -->|"2. wss://"| WSAPI
SSEClient -->|"2. GET /stream"| RESTAPI
WSAPI -->|"3. Route"| ConnMgmt
ConnMgmt -->|"4. Register connectionId"| Connections
WSAPI -->|"5. Message"| Router
RESTAPI -->|"5. Request"| Router
Router -->|"6. Load session"| SessionMgr
SessionMgr -->|"6a. Cache hit?"| SessionCache
SessionMgr -->|"6b. Cache miss"| Sessions
Router -->|"7. Retrieve context"| RAGPipeline
RAGPipeline -->|"7a. Embed + search"| VectorSearch
RAGPipeline -->|"7b. Product lookup"| Products
Router -->|"8. Build prompt"| StreamOrch
StreamOrch -->|"9. InvokeModelWithResponseStream"| InvokeStream
InvokeStream -->|"10. Route to model"| Claude3S
InvokeStream -->|"10. Route to model"| Claude3H
Claude3S -.->|"11. Token stream"| InvokeStream
Claude3H -.->|"11. Token stream"| InvokeStream
InvokeStream -.->|"12. Chunked bytes"| StreamOrch
StreamOrch -.->|"13a. WS frames"| WSAPI
StreamOrch -.->|"13b. SSE events"| RESTAPI
StreamOrch -->|"13c. Buffer tokens"| TokenBuffer
WSAPI -.->|"14a. Push to client"| WSClient
RESTAPI -.->|"14b. Push to client"| SSEClient
StreamOrch -->|"15. Save final response"| Sessions
StreamOrch -->|"15. Update cache"| SessionCache
style Client fill:#e1f5fe
style APIGW fill:#fff3e0
style Orchestrator fill:#e8f5e9
style Cache fill:#fce4ec
style Search fill:#f3e5f5
style FM fill:#fff9c4
style DDB fill:#e0f2f1
1. Bedrock InvokeModelWithResponseStream API
1.1 How Streaming Works in Bedrock
Amazon Bedrock provides the InvokeModelWithResponseStream API as the streaming counterpart to InvokeModel. Instead of waiting for the entire response to be generated, this API returns a response stream that delivers tokens incrementally as the model produces them.
Key characteristics:
| Aspect | Non-Streaming (InvokeModel) |
Streaming (InvokeModelWithResponseStream) |
|---|---|---|
| Response delivery | Single payload after full generation | Incremental chunks during generation |
| Time-to-first-byte | High (full generation time) | Low (first token ready) |
| Client perception | "Thinking..." then full answer | Text appearing word by word |
| Memory on server | Buffer entire response | Stream through, low memory |
| Error handling | Clear success/failure | Partial response possible on error |
| MangaAssist TTFB | 2-4 seconds (Sonnet) | 200-500ms (Sonnet first token) |
1.2 Stream Event Format for Claude 3
When streaming from Claude 3, the response body is an event stream where each event contains a chunk of the response. The events follow the application/vnd.amazon.eventstream binary format, but the SDK abstracts this into iterable chunks.
Claude 3 streaming event sequence:
message_start -> { model, usage: { input_tokens } }
content_block_start -> { index: 0, content_block: { type: "text", text: "" } }
content_block_delta -> { index: 0, delta: { type: "text_delta", text: "The" } }
content_block_delta -> { index: 0, delta: { type: "text_delta", text: " manga" } }
content_block_delta -> { index: 0, delta: { type: "text_delta", text: " you" } }
...
content_block_stop -> { index: 0 }
message_delta -> { stop_reason: "end_turn", usage: { output_tokens } }
message_stop -> {}
1.3 MangaAssist Token Generation Rate
| Model | Tokens/sec (streaming) | Avg response length | Full generation time | TTFB |
|---|---|---|---|---|
| Claude 3 Sonnet | ~80 tokens/sec | 150 tokens | ~1.9s | ~300ms |
| Claude 3 Haiku | ~150 tokens/sec | 100 tokens | ~0.7s | ~150ms |
For MangaAssist, Haiku handles quick lookups ("What volume is this?") while Sonnet handles nuanced recommendations ("Suggest similar dark fantasy manga").
2. WebSocket Implementation with API Gateway
2.1 API Gateway WebSocket API Architecture
API Gateway WebSocket APIs maintain persistent, bidirectional connections. Each connection gets a unique connectionId that the backend uses to push messages back to the client.
Route configuration for MangaAssist:
| Route Key | Purpose | Integration Target |
|---|---|---|
$connect |
New connection auth + registration | Lambda (authorizer + DynamoDB write) |
$disconnect |
Cleanup connection record | Lambda (DynamoDB delete + Redis cleanup) |
$default |
Fallback for unmatched routes | Lambda (error response) |
sendMessage |
User sends chat message | ECS Fargate via VPC Link |
ping |
Client heartbeat | Lambda (pong response) |
2.2 Connection Lifecycle
Client API Gateway Lambda/ECS DynamoDB
| | | |
|--- WS Upgrade ---------->| | |
| |--- $connect ---------->| |
| | |--- Put connectionId->|
| | |--- Set Redis TTL --->|
|<-- 101 Switching ---------|<-- 200 OK ------------| |
| | | |
|--- sendMessage { ... } -->| | |
| |--- Route to ECS ------>| |
| | |--- Bedrock stream -->|
|<-- { chunk: "The" } ------|<-- POST @connections --| |
|<-- { chunk: " manga" } ---|<-- POST @connections --| |
|<-- { chunk: " ..." } -----|<-- POST @connections --| |
|<-- { done: true } --------|<-- POST @connections --| |
| | | |
|--- close ---------------->| | |
| |--- $disconnect ------->| |
| | |--- Delete record --->|
2.3 Pushing Messages Back via @connections
The backend pushes streaming tokens to the client using the API Gateway Management API's @connections endpoint:
POST https://{api-id}.execute-api.{region}.amazonaws.com/{stage}/@connections/{connectionId}
Important limits:
| Limit | Value | MangaAssist Impact |
|---|---|---|
| Frame payload size | 128 KB | Far exceeds single token push |
| Idle connection timeout | 10 minutes | Must implement heartbeat |
| Connection duration limit | 2 hours | Reconnect logic needed |
| Message send rate | 500 messages/sec per connection | ~80 tokens/sec is well within |
| Concurrent connections | 500 per account (default) | Request increase for 1M msgs/day |
3. Server-Sent Events Pattern
3.1 SSE Protocol Fundamentals
SSE provides a unidirectional, server-to-client push mechanism over standard HTTP. The client opens a long-lived GET request, and the server sends events as they become available.
HTTP response headers for SSE:
HTTP/1.1 200 OK
Content-Type: text/event-stream
Cache-Control: no-cache
Connection: keep-alive
X-Accel-Buffering: no
Event format:
event: token
id: msg-001-chunk-42
data: {"text": "The manga", "index": 42}
event: token
id: msg-001-chunk-43
data: {"text": " series", "index": 43}
event: done
id: msg-001-final
data: {"finish_reason": "end_turn", "usage": {"output_tokens": 150}}
3.2 SSE vs WebSocket for MangaAssist
| Criteria | SSE | WebSocket |
|---|---|---|
| Direction | Server -> Client only | Bidirectional |
| Protocol | HTTP/1.1 or HTTP/2 | Upgrade from HTTP to WS |
| Reconnect | Built-in with Last-Event-ID | Manual implementation |
| Binary data | No (text only) | Yes |
| Proxy support | Excellent (it is just HTTP) | Varies (some proxies block) |
| Browser support | Native EventSource API | Native WebSocket API |
| Max connections per domain | 6 (HTTP/1.1), many (HTTP/2) | No hard limit |
| MangaAssist use case | Simple streaming read | Full chat with typing indicators |
MangaAssist recommendation: Use WebSocket as primary (bidirectional chat), fall back to SSE for clients behind restrictive proxies.
3.3 Auto-Reconnect with Last-Event-ID
SSE has a built-in reconnection mechanism. When the connection drops, the browser automatically reconnects and sends the Last-Event-ID header, allowing the server to resume from where it left off.
# Client reconnects after drop
GET /stream/manga-chat HTTP/1.1
Last-Event-ID: msg-001-chunk-42
# Server resumes from chunk 43
event: token
id: msg-001-chunk-43
data: {"text": " series", "index": 43}
4. Chunked Transfer Encoding
4.1 How Chunked Encoding Works
Chunked transfer encoding allows the server to send the response in pieces without knowing the total content length upfront. Each chunk is prefixed with its size in hexadecimal.
HTTP/1.1 200 OK
Transfer-Encoding: chunked
Content-Type: application/json
1a\r\n
{"text":"The manga seri"}\r\n
18\r\n
{"text":"es is called"}\r\n
0\r\n
\r\n
4.2 Lambda Response Streaming
AWS Lambda supports response streaming via Lambda Function URLs and the InvokeWithResponseStream API. This pairs naturally with Bedrock streaming.
Lambda streaming flow for MangaAssist:
Client -> API Gateway -> Lambda Function URL -> Bedrock Stream -> Chunked response back
| Feature | Standard Lambda | Streaming Lambda |
|---|---|---|
| Response delivery | After function completes | During function execution |
| Max response size | 6 MB | 20 MB (soft limit) |
| Billing | Full duration | Full duration |
| TTFB | Function duration | First write to stream |
| Content-Type | Set once | Set with metadata |
| MangaAssist fit | Short lookups | Long streaming answers |
4.3 API Gateway Chunked Response Handling
API Gateway REST APIs buffer the full response before sending to the client. For true chunked streaming, use:
- API Gateway WebSocket API (push model)
- Lambda Function URLs (bypass API Gateway buffering)
- ALB + ECS Fargate (direct chunked HTTP)
- CloudFront + Lambda@Edge (edge-streamed)
For MangaAssist, the ECS Fargate orchestrator can send chunked HTTP responses directly through an ALB, bypassing API Gateway's buffering limitation for the streaming path.
5. Token-by-Token Delivery and Buffering Strategies
5.1 Buffering Strategy Comparison
| Strategy | Description | Latency | UX Quality | MangaAssist Use |
|---|---|---|---|---|
| No buffering | Send every token immediately | Lowest | Jittery, characters appear one-by-one | Not recommended |
| Token batching | Accumulate N tokens, then send | Low-medium | Smoother, words appear in groups | Default (batch of 3-5) |
| Word boundary | Buffer until whitespace detected | Medium | Natural word-by-word appearance | Chat responses |
| Sentence boundary | Buffer until punctuation | Higher | Full sentences appear at once | Manga synopsis |
| Adaptive | Adjust buffer size based on rate | Variable | Consistent perceived speed | Production recommendation |
| Time-based | Flush every N milliseconds | Consistent | Regular rhythm | Fallback strategy |
5.2 Adaptive Buffering Algorithm
The adaptive approach adjusts the buffer flush interval based on the token generation rate:
- Fast generation (>100 tokens/sec): Buffer for 100ms, send word groups
- Medium generation (50-100 tokens/sec): Buffer for 50ms, send partial words
- Slow generation (<50 tokens/sec): No buffering, send immediately
This ensures the user always perceives a smooth, consistent stream regardless of backend generation speed.
5.3 Japanese Text Buffering Considerations
MangaAssist serves Japanese content, which introduces special buffering considerations:
| Issue | Description | Solution |
|---|---|---|
| No word spaces | Japanese has no whitespace between words | Buffer by character count, not word boundaries |
| Multi-byte UTF-8 | Japanese chars are 3 bytes in UTF-8 | Never split mid-character (check continuation bytes) |
| Mixed scripts | Kanji + hiragana + katakana + romaji | Detect script transitions for natural breaks |
| Particle boundaries | は、が、を etc. are natural break points | Use particle detection for sentence-feel buffering |
| Manga titles | Often mix of Japanese and English | Buffer entire title strings together |
6. Code Implementations
6.1 StreamingFMClient
"""
StreamingFMClient — Handles Bedrock InvokeModelWithResponseStream
for MangaAssist real-time response generation.
"""
import json
import time
import logging
from typing import AsyncIterator, Optional, Dict, Any, Callable
from dataclasses import dataclass, field
import boto3
from botocore.config import Config
from botocore.exceptions import ClientError, EventStreamError
logger = logging.getLogger(__name__)
@dataclass
class StreamChunk:
"""Represents a single chunk from the Bedrock stream."""
text: str
index: int
is_first: bool = False
is_last: bool = False
stop_reason: Optional[str] = None
input_tokens: int = 0
output_tokens: int = 0
latency_ms: float = 0.0
@dataclass
class StreamMetrics:
"""Tracks streaming performance metrics."""
request_id: str = ""
model_id: str = ""
time_to_first_token_ms: float = 0.0
total_duration_ms: float = 0.0
total_tokens: int = 0
input_tokens: int = 0
output_tokens: int = 0
tokens_per_second: float = 0.0
chunks_sent: int = 0
errors: list = field(default_factory=list)
class StreamingFMClient:
"""
Client for streaming responses from Amazon Bedrock Foundation Models.
Wraps InvokeModelWithResponseStream with:
- Retry logic for transient failures
- Metrics collection (TTFT, throughput, token counts)
- Model-specific response parsing (Claude 3 format)
- Graceful error handling mid-stream
- Connection reuse via session management
Usage:
client = StreamingFMClient(region="us-east-1")
async for chunk in client.stream_response(prompt, model="sonnet"):
send_to_client(chunk.text)
"""
MODEL_MAP = {
"sonnet": "anthropic.claude-3-sonnet-20240229-v1:0",
"haiku": "anthropic.claude-3-haiku-20240307-v1:0",
}
# Cost per 1M tokens
COST_MAP = {
"sonnet": {"input": 3.00, "output": 15.00},
"haiku": {"input": 0.25, "output": 1.25},
}
def __init__(
self,
region: str = "us-east-1",
max_retries: int = 3,
connect_timeout: int = 5,
read_timeout: int = 60,
max_tokens: int = 1024,
):
self.region = region
self.max_retries = max_retries
self.max_tokens = max_tokens
config = Config(
region_name=region,
retries={"max_attempts": max_retries, "mode": "adaptive"},
connect_timeout=connect_timeout,
read_timeout=read_timeout,
)
self._client = boto3.client("bedrock-runtime", config=config)
logger.info(f"StreamingFMClient initialized for region={region}")
def _build_claude3_body(
self,
prompt: str,
system: Optional[str] = None,
max_tokens: Optional[int] = None,
temperature: float = 0.7,
top_p: float = 0.9,
stop_sequences: Optional[list] = None,
) -> str:
"""Build the request body for Claude 3 Messages API format."""
body = {
"anthropic_version": "bedrock-2023-05-31",
"max_tokens": max_tokens or self.max_tokens,
"temperature": temperature,
"top_p": top_p,
"messages": [
{"role": "user", "content": prompt}
],
}
if system:
body["system"] = system
if stop_sequences:
body["stop_sequences"] = stop_sequences
return json.dumps(body)
def _resolve_model_id(self, model: str) -> str:
"""Resolve friendly model name to full Bedrock model ID."""
if model in self.MODEL_MAP:
return self.MODEL_MAP[model]
return model # Assume it is already a full model ID
async def stream_response(
self,
prompt: str,
model: str = "haiku",
system: Optional[str] = None,
max_tokens: Optional[int] = None,
temperature: float = 0.7,
on_metrics: Optional[Callable[[StreamMetrics], None]] = None,
) -> AsyncIterator[StreamChunk]:
"""
Stream a response from Bedrock, yielding StreamChunk objects.
Args:
prompt: User message text
model: "sonnet", "haiku", or full model ID
system: Optional system prompt
max_tokens: Override default max tokens
temperature: Sampling temperature
on_metrics: Callback invoked with final metrics
Yields:
StreamChunk objects with incremental text
"""
model_id = self._resolve_model_id(model)
body = self._build_claude3_body(prompt, system, max_tokens, temperature)
metrics = StreamMetrics(model_id=model_id)
start_time = time.monotonic()
first_token_time = None
chunk_index = 0
accumulated_text = ""
try:
response = self._client.invoke_model_with_response_stream(
modelId=model_id,
contentType="application/json",
accept="application/json",
body=body,
)
metrics.request_id = response.get("ResponseMetadata", {}).get("RequestId", "")
stream = response.get("body")
if not stream:
raise RuntimeError("No response body stream returned from Bedrock")
for event in stream:
chunk_data = event.get("chunk")
if not chunk_data:
continue
chunk_bytes = chunk_data.get("bytes", b"")
chunk_json = json.loads(chunk_bytes.decode("utf-8"))
event_type = chunk_json.get("type", "")
# --- message_start: capture input token count ---
if event_type == "message_start":
message = chunk_json.get("message", {})
usage = message.get("usage", {})
metrics.input_tokens = usage.get("input_tokens", 0)
# --- content_block_delta: the actual text tokens ---
elif event_type == "content_block_delta":
delta = chunk_json.get("delta", {})
text = delta.get("text", "")
if not text:
continue
now = time.monotonic()
if first_token_time is None:
first_token_time = now
metrics.time_to_first_token_ms = (now - start_time) * 1000
accumulated_text += text
chunk = StreamChunk(
text=text,
index=chunk_index,
is_first=(chunk_index == 0),
latency_ms=(now - start_time) * 1000,
)
chunk_index += 1
yield chunk
# --- message_delta: stop reason and output tokens ---
elif event_type == "message_delta":
delta = chunk_json.get("delta", {})
usage = chunk_json.get("usage", {})
stop_reason = delta.get("stop_reason")
metrics.output_tokens = usage.get("output_tokens", 0)
final_chunk = StreamChunk(
text="",
index=chunk_index,
is_last=True,
stop_reason=stop_reason,
input_tokens=metrics.input_tokens,
output_tokens=metrics.output_tokens,
latency_ms=(time.monotonic() - start_time) * 1000,
)
yield final_chunk
# --- message_stop: stream complete ---
elif event_type == "message_stop":
break
except EventStreamError as e:
logger.error(f"Stream event error: {e}")
metrics.errors.append(str(e))
raise
except ClientError as e:
error_code = e.response["Error"]["Code"]
logger.error(f"Bedrock client error: {error_code} - {e}")
metrics.errors.append(f"{error_code}: {e}")
raise
finally:
end_time = time.monotonic()
metrics.total_duration_ms = (end_time - start_time) * 1000
metrics.total_tokens = metrics.input_tokens + metrics.output_tokens
metrics.chunks_sent = chunk_index
if metrics.total_duration_ms > 0:
metrics.tokens_per_second = (
metrics.output_tokens / (metrics.total_duration_ms / 1000)
)
if on_metrics:
on_metrics(metrics)
logger.info(
f"Stream complete: model={model_id}, "
f"TTFT={metrics.time_to_first_token_ms:.0f}ms, "
f"total={metrics.total_duration_ms:.0f}ms, "
f"tokens={metrics.output_tokens}, "
f"rate={metrics.tokens_per_second:.1f} tok/s"
)
def estimate_cost(self, metrics: StreamMetrics) -> Dict[str, float]:
"""Estimate the cost of a streaming request based on collected metrics."""
model_key = None
for key, model_id in self.MODEL_MAP.items():
if model_id == metrics.model_id:
model_key = key
break
if not model_key or model_key not in self.COST_MAP:
return {"input_cost": 0.0, "output_cost": 0.0, "total_cost": 0.0}
costs = self.COST_MAP[model_key]
input_cost = (metrics.input_tokens / 1_000_000) * costs["input"]
output_cost = (metrics.output_tokens / 1_000_000) * costs["output"]
return {
"input_cost": input_cost,
"output_cost": output_cost,
"total_cost": input_cost + output_cost,
}
6.2 WebSocketStreamHandler
"""
WebSocketStreamHandler — Bridges Bedrock streaming to API Gateway WebSocket
connections for MangaAssist real-time chat.
"""
import json
import time
import asyncio
import logging
from typing import Optional, Dict, Any
from dataclasses import dataclass
import boto3
from botocore.exceptions import ClientError
logger = logging.getLogger(__name__)
@dataclass
class WebSocketConfig:
"""Configuration for the WebSocket stream handler."""
api_id: str
stage: str
region: str = "us-east-1"
max_frame_size: int = 32_768 # 32 KB per frame (conservative under 128 KB limit)
send_rate_limit: int = 400 # Messages per second (under 500 limit)
heartbeat_interval_sec: int = 300 # 5 minutes (under 10-min idle timeout)
connection_ttl_sec: int = 7000 # ~1.9 hours (under 2-hour limit)
flush_interval_ms: int = 50 # Minimum ms between sends
class WebSocketStreamHandler:
"""
Handles pushing Bedrock streaming tokens to clients via API Gateway
WebSocket @connections endpoint.
Features:
- Token buffering with configurable flush intervals
- Automatic heartbeat to prevent idle timeouts
- Connection state validation before sends
- Graceful handling of stale/disconnected connections
- Rate limiting to stay within API Gateway limits
- Frame size enforcement
Usage:
handler = WebSocketStreamHandler(config)
async for chunk in bedrock_stream:
await handler.send_token(connection_id, chunk.text)
await handler.send_complete(connection_id)
"""
def __init__(self, config: WebSocketConfig):
self.config = config
self._endpoint = (
f"https://{config.api_id}.execute-api."
f"{config.region}.amazonaws.com/{config.stage}"
)
self._apigw_client = boto3.client(
"apigatewaymanagementapi",
endpoint_url=self._endpoint,
region_name=config.region,
)
self._send_timestamps: Dict[str, list] = {} # connection_id -> [timestamps]
self._active_connections: Dict[str, float] = {} # connection_id -> last_send_time
logger.info(f"WebSocketStreamHandler initialized: endpoint={self._endpoint}")
async def send_token(
self,
connection_id: str,
text: str,
chunk_index: int = 0,
is_first: bool = False,
) -> bool:
"""
Send a token chunk to a specific WebSocket connection.
Args:
connection_id: API Gateway WebSocket connection ID
text: Token text to send
chunk_index: Sequential index of this chunk
is_first: Whether this is the first chunk (triggers typing indicator)
Returns:
True if sent successfully, False if connection is stale
"""
payload = {
"action": "token",
"data": {
"text": text,
"index": chunk_index,
"timestamp": int(time.time() * 1000),
},
}
if is_first:
payload["data"]["is_first"] = True
return await self._post_to_connection(connection_id, payload)
async def send_complete(
self,
connection_id: str,
stop_reason: str = "end_turn",
usage: Optional[Dict[str, int]] = None,
metadata: Optional[Dict[str, Any]] = None,
) -> bool:
"""Send stream completion signal to the client."""
payload = {
"action": "stream_complete",
"data": {
"stop_reason": stop_reason,
"timestamp": int(time.time() * 1000),
},
}
if usage:
payload["data"]["usage"] = usage
if metadata:
payload["data"]["metadata"] = metadata
return await self._post_to_connection(connection_id, payload)
async def send_error(
self,
connection_id: str,
error_code: str,
message: str,
retryable: bool = False,
) -> bool:
"""Send error notification to the client."""
payload = {
"action": "error",
"data": {
"code": error_code,
"message": message,
"retryable": retryable,
"timestamp": int(time.time() * 1000),
},
}
return await self._post_to_connection(connection_id, payload)
async def send_heartbeat(self, connection_id: str) -> bool:
"""Send heartbeat to keep connection alive."""
payload = {"action": "heartbeat", "timestamp": int(time.time() * 1000)}
return await self._post_to_connection(connection_id, payload)
async def _post_to_connection(
self, connection_id: str, payload: Dict[str, Any]
) -> bool:
"""
Post data to a WebSocket connection via @connections API.
Handles stale connections, rate limiting, and frame size.
"""
data_bytes = json.dumps(payload).encode("utf-8")
# Enforce frame size limit
if len(data_bytes) > self.config.max_frame_size:
logger.warning(
f"Frame too large ({len(data_bytes)} bytes) for "
f"connection {connection_id}, truncating"
)
# Truncate text content, preserve structure
if "data" in payload and "text" in payload["data"]:
max_text = self.config.max_frame_size - 200 # headroom for JSON wrapper
payload["data"]["text"] = payload["data"]["text"][:max_text]
payload["data"]["truncated"] = True
data_bytes = json.dumps(payload).encode("utf-8")
# Rate limiting
await self._enforce_rate_limit(connection_id)
try:
self._apigw_client.post_to_connection(
ConnectionId=connection_id,
Data=data_bytes,
)
self._active_connections[connection_id] = time.time()
return True
except ClientError as e:
error_code = e.response["Error"]["Code"]
if error_code == "GoneException":
logger.info(f"Connection {connection_id} is gone (client disconnected)")
self._cleanup_connection(connection_id)
return False
elif error_code == "LimitExceededException":
logger.warning(f"Rate limit exceeded for {connection_id}, backing off")
await asyncio.sleep(0.1)
return await self._post_to_connection(connection_id, payload)
else:
logger.error(f"Error posting to {connection_id}: {error_code} - {e}")
raise
async def _enforce_rate_limit(self, connection_id: str) -> None:
"""Enforce per-connection message rate limit."""
now = time.time()
if connection_id not in self._send_timestamps:
self._send_timestamps[connection_id] = []
timestamps = self._send_timestamps[connection_id]
# Remove timestamps older than 1 second
timestamps[:] = [t for t in timestamps if now - t < 1.0]
if len(timestamps) >= self.config.send_rate_limit:
sleep_time = 1.0 - (now - timestamps[0])
if sleep_time > 0:
await asyncio.sleep(sleep_time)
timestamps.append(now)
def _cleanup_connection(self, connection_id: str) -> None:
"""Clean up tracking data for a disconnected connection."""
self._send_timestamps.pop(connection_id, None)
self._active_connections.pop(connection_id, None)
def is_connection_active(self, connection_id: str) -> bool:
"""Check if a connection is still considered active."""
if connection_id not in self._active_connections:
return False
last_send = self._active_connections[connection_id]
return (time.time() - last_send) < self.config.connection_ttl_sec
6.3 SSEResponseManager
"""
SSEResponseManager — Manages Server-Sent Events streaming for MangaAssist
as a fallback transport when WebSocket is unavailable.
"""
import json
import time
import asyncio
import logging
from typing import AsyncIterator, Optional, Dict, Any, Callable
from dataclasses import dataclass, field
from enum import Enum
logger = logging.getLogger(__name__)
class SSEEventType(Enum):
"""Standard SSE event types for MangaAssist streaming."""
TOKEN = "token"
DONE = "done"
ERROR = "error"
HEARTBEAT = "heartbeat"
METADATA = "metadata"
RETRY = "retry"
@dataclass
class SSEEvent:
"""Represents a single Server-Sent Event."""
event_type: SSEEventType
data: Dict[str, Any]
event_id: Optional[str] = None
retry_ms: Optional[int] = None
def serialize(self) -> str:
"""Serialize this event to SSE wire format."""
lines = []
if self.event_type != SSEEventType.HEARTBEAT:
lines.append(f"event: {self.event_type.value}")
if self.event_id:
lines.append(f"id: {self.event_id}")
if self.retry_ms is not None:
lines.append(f"retry: {self.retry_ms}")
data_str = json.dumps(self.data)
lines.append(f"data: {data_str}")
lines.append("") # Blank line terminates the event
lines.append("")
return "\n".join(lines)
@dataclass
class SSESessionState:
"""Tracks state for an SSE connection for resume support."""
session_id: str
last_event_id: str = ""
chunk_index: int = 0
accumulated_text: str = ""
started_at: float = field(default_factory=time.time)
last_sent_at: float = 0.0
is_complete: bool = False
class SSEResponseManager:
"""
Manages SSE streaming responses for MangaAssist.
Features:
- Event ID generation for client resume (Last-Event-ID)
- Heartbeat events to prevent proxy/load-balancer timeouts
- Session state tracking for reconnection
- Retry directive management
- Clean connection closure
- Metrics collection
Usage:
manager = SSEResponseManager()
session = manager.create_session("msg-001")
async def generate():
headers = manager.get_sse_headers()
yield manager.format_retry_directive(3000)
async for chunk in bedrock_stream:
event = manager.create_token_event(session, chunk.text)
yield event.serialize()
yield manager.create_done_event(session).serialize()
"""
def __init__(
self,
heartbeat_interval_sec: float = 15.0,
default_retry_ms: int = 3000,
max_reconnect_window_sec: int = 30,
):
self.heartbeat_interval_sec = heartbeat_interval_sec
self.default_retry_ms = default_retry_ms
self.max_reconnect_window_sec = max_reconnect_window_sec
self._sessions: Dict[str, SSESessionState] = {}
@staticmethod
def get_sse_headers() -> Dict[str, str]:
"""Return the required HTTP headers for an SSE response."""
return {
"Content-Type": "text/event-stream",
"Cache-Control": "no-cache, no-store, must-revalidate",
"Connection": "keep-alive",
"X-Accel-Buffering": "no", # Disable nginx buffering
"Access-Control-Allow-Origin": "*",
"Transfer-Encoding": "chunked",
}
def create_session(self, session_id: str) -> SSESessionState:
"""Create a new SSE session for tracking."""
session = SSESessionState(session_id=session_id)
self._sessions[session_id] = session
logger.info(f"SSE session created: {session_id}")
return session
def get_session(self, session_id: str) -> Optional[SSESessionState]:
"""Retrieve an existing session."""
return self._sessions.get(session_id)
def create_token_event(
self, session: SSESessionState, text: str
) -> SSEEvent:
"""Create a token event from streaming text."""
event_id = f"{session.session_id}-{session.chunk_index}"
session.chunk_index += 1
session.accumulated_text += text
session.last_event_id = event_id
session.last_sent_at = time.time()
return SSEEvent(
event_type=SSEEventType.TOKEN,
data={"text": text, "index": session.chunk_index - 1},
event_id=event_id,
)
def create_done_event(
self,
session: SSESessionState,
stop_reason: str = "end_turn",
usage: Optional[Dict[str, int]] = None,
) -> SSEEvent:
"""Create the stream completion event."""
session.is_complete = True
event_id = f"{session.session_id}-done"
data = {
"stop_reason": stop_reason,
"total_chunks": session.chunk_index,
}
if usage:
data["usage"] = usage
return SSEEvent(
event_type=SSEEventType.DONE,
data=data,
event_id=event_id,
)
def create_error_event(
self,
session: SSESessionState,
error_code: str,
message: str,
retryable: bool = False,
) -> SSEEvent:
"""Create an error event."""
return SSEEvent(
event_type=SSEEventType.ERROR,
data={
"code": error_code,
"message": message,
"retryable": retryable,
},
event_id=f"{session.session_id}-error",
retry_ms=self.default_retry_ms if retryable else None,
)
def create_heartbeat_event(self) -> SSEEvent:
"""Create a heartbeat event (comment-style keepalive)."""
return SSEEvent(
event_type=SSEEventType.HEARTBEAT,
data={"ts": int(time.time() * 1000)},
)
def can_resume(
self, session_id: str, last_event_id: str
) -> Optional[int]:
"""
Check if a session can be resumed from the given Last-Event-ID.
Returns the chunk index to resume from, or None if not resumable.
"""
session = self._sessions.get(session_id)
if not session:
return None
if session.is_complete:
return None
elapsed = time.time() - session.last_sent_at
if elapsed > self.max_reconnect_window_sec:
logger.info(f"Session {session_id} reconnect window expired ({elapsed:.1f}s)")
return None
# Parse the chunk index from the last event ID
try:
parts = last_event_id.rsplit("-", 1)
resume_index = int(parts[-1]) + 1
return resume_index
except (ValueError, IndexError):
return None
async def heartbeat_loop(
self,
send_fn: Callable[[str], Any],
stop_event: asyncio.Event,
) -> None:
"""
Run a heartbeat loop that sends keepalive events.
Args:
send_fn: Async function that sends a string to the client
stop_event: Event to signal loop termination
"""
while not stop_event.is_set():
try:
await asyncio.wait_for(
stop_event.wait(),
timeout=self.heartbeat_interval_sec,
)
break # stop_event was set
except asyncio.TimeoutError:
heartbeat = self.create_heartbeat_event()
await send_fn(heartbeat.serialize())
def cleanup_session(self, session_id: str) -> None:
"""Remove session state after connection closes."""
self._sessions.pop(session_id, None)
logger.info(f"SSE session cleaned up: {session_id}")
6.4 ChunkBufferManager
"""
ChunkBufferManager — Implements intelligent token buffering strategies
for smooth real-time text delivery in MangaAssist.
"""
import time
import logging
import unicodedata
from typing import Optional, List, Callable, Awaitable
from dataclasses import dataclass, field
from enum import Enum
logger = logging.getLogger(__name__)
class BufferStrategy(Enum):
"""Available buffering strategies."""
NONE = "none" # Send every token immediately
TOKEN_BATCH = "token_batch" # Batch N tokens
WORD_BOUNDARY = "word_boundary" # Buffer to word boundaries
SENTENCE_BOUNDARY = "sentence" # Buffer to sentence boundaries
TIME_BASED = "time_based" # Flush every N milliseconds
ADAPTIVE = "adaptive" # Adjust based on generation rate
JAPANESE_AWARE = "japanese_aware" # Handles JP text without word spaces
@dataclass
class BufferConfig:
"""Configuration for the buffer manager."""
strategy: BufferStrategy = BufferStrategy.ADAPTIVE
token_batch_size: int = 4 # For TOKEN_BATCH strategy
time_flush_ms: int = 80 # For TIME_BASED strategy
adaptive_fast_threshold: float = 100.0 # tokens/sec
adaptive_slow_threshold: float = 50.0 # tokens/sec
max_buffer_size: int = 512 # Max chars before forced flush
jp_char_batch_size: int = 6 # Characters per flush for Japanese
@dataclass
class BufferState:
"""Internal state of the buffer."""
buffer: str = ""
token_count: int = 0
total_tokens: int = 0
first_token_time: float = 0.0
last_token_time: float = 0.0
last_flush_time: float = 0.0
flush_count: int = 0
current_rate: float = 0.0 # tokens per second
# Japanese punctuation and particles that make good break points
JP_BREAK_CHARS = set("。、!?」』)】〉》~…・\n")
JP_PARTICLES = {"は", "が", "を", "に", "で", "と", "の", "へ", "も", "や", "か", "ね", "よ", "ぞ"}
class ChunkBufferManager:
"""
Intelligent token buffering for smooth real-time text delivery.
Strategies:
- NONE: Zero buffering, immediate forward of every token
- TOKEN_BATCH: Accumulate N tokens then flush
- WORD_BOUNDARY: Buffer until whitespace (English/romaji text)
- SENTENCE_BOUNDARY: Buffer until sentence-ending punctuation
- TIME_BASED: Flush at fixed time intervals regardless of tokens
- ADAPTIVE: Dynamically switch strategy based on generation speed
- JAPANESE_AWARE: Special handling for Japanese text (no word spaces)
The ADAPTIVE strategy is recommended for production MangaAssist use.
It monitors the token generation rate and adjusts:
- Fast (>100 tok/s): Word-boundary buffering for smooth groups
- Medium (50-100 tok/s): Small token batches (2-3 tokens)
- Slow (<50 tok/s): No buffering, forward immediately
Usage:
buffer = ChunkBufferManager(BufferConfig(strategy=BufferStrategy.ADAPTIVE))
async for chunk in bedrock_stream:
flushed = await buffer.add_token(chunk.text, send_fn)
# send_fn is called when buffer flushes
await buffer.flush_remaining(send_fn)
"""
def __init__(self, config: Optional[BufferConfig] = None):
self.config = config or BufferConfig()
self._state = BufferState()
async def add_token(
self,
text: str,
send_fn: Callable[[str], Awaitable[None]],
) -> bool:
"""
Add a token to the buffer. May trigger a flush via send_fn.
Args:
text: The token text to buffer
send_fn: Async function to call when buffer flushes
Returns:
True if a flush occurred, False otherwise
"""
now = time.monotonic()
self._state.token_count += 1
self._state.total_tokens += 1
self._state.buffer += text
if self._state.first_token_time == 0:
self._state.first_token_time = now
self._state.last_flush_time = now
self._state.last_token_time = now
self._update_rate(now)
# Force flush if buffer exceeds max size
if len(self._state.buffer) >= self.config.max_buffer_size:
await self._flush(send_fn)
return True
strategy = self.config.strategy
if strategy == BufferStrategy.ADAPTIVE:
strategy = self._select_adaptive_strategy()
should_flush = self._check_flush(strategy, now)
if should_flush:
await self._flush(send_fn)
return True
return False
async def flush_remaining(
self, send_fn: Callable[[str], Awaitable[None]]
) -> None:
"""Flush any remaining buffered text (call at end of stream)."""
if self._state.buffer:
await self._flush(send_fn)
def get_stats(self) -> dict:
"""Return buffer performance statistics."""
elapsed = self._state.last_token_time - self._state.first_token_time
return {
"total_tokens": self._state.total_tokens,
"flush_count": self._state.flush_count,
"avg_tokens_per_flush": (
self._state.total_tokens / max(self._state.flush_count, 1)
),
"current_rate_tok_s": self._state.current_rate,
"elapsed_sec": elapsed,
"strategy": self.config.strategy.value,
}
def _update_rate(self, now: float) -> None:
"""Update the current token generation rate estimate."""
elapsed = now - self._state.first_token_time
if elapsed > 0.1: # Need at least 100ms of data
self._state.current_rate = self._state.total_tokens / elapsed
def _select_adaptive_strategy(self) -> BufferStrategy:
"""Select the best strategy based on current generation rate."""
rate = self._state.current_rate
# Check if content is primarily Japanese
if self._is_japanese_content():
return BufferStrategy.JAPANESE_AWARE
if rate > self.config.adaptive_fast_threshold:
return BufferStrategy.WORD_BOUNDARY
elif rate > self.config.adaptive_slow_threshold:
return BufferStrategy.TOKEN_BATCH
else:
return BufferStrategy.NONE
def _is_japanese_content(self) -> bool:
"""Check if the current buffer contains primarily Japanese text."""
if not self._state.buffer:
return False
jp_chars = sum(
1 for ch in self._state.buffer
if unicodedata.category(ch).startswith("Lo") # Letter, other (CJK)
or "\u3040" <= ch <= "\u309f" # Hiragana
or "\u30a0" <= ch <= "\u30ff" # Katakana
)
return jp_chars > len(self._state.buffer) * 0.3
def _check_flush(self, strategy: BufferStrategy, now: float) -> bool:
"""Determine if the buffer should be flushed given the current strategy."""
buf = self._state.buffer
if strategy == BufferStrategy.NONE:
return True
elif strategy == BufferStrategy.TOKEN_BATCH:
return self._state.token_count >= self.config.token_batch_size
elif strategy == BufferStrategy.WORD_BOUNDARY:
return len(buf) > 0 and buf[-1] in (" ", "\n", "\t")
elif strategy == BufferStrategy.SENTENCE_BOUNDARY:
if not buf:
return False
return buf[-1] in ".!?\n" or buf.endswith("...") or buf[-1] in JP_BREAK_CHARS
elif strategy == BufferStrategy.TIME_BASED:
elapsed_ms = (now - self._state.last_flush_time) * 1000
return elapsed_ms >= self.config.time_flush_ms
elif strategy == BufferStrategy.JAPANESE_AWARE:
return self._check_japanese_flush(buf)
return False
def _check_japanese_flush(self, buf: str) -> bool:
"""Check if buffer should flush for Japanese text."""
if not buf:
return False
# Flush on Japanese punctuation
if buf[-1] in JP_BREAK_CHARS:
return True
# Flush on particle boundaries (particle followed by next char)
if len(buf) >= 2 and buf[-2] in JP_PARTICLES:
return True
# Flush by character count (since no word boundaries)
jp_chars_since_flush = sum(
1 for ch in buf
if ord(ch) > 0x2FFF # Rough CJK range
)
return jp_chars_since_flush >= self.config.jp_char_batch_size
async def _flush(self, send_fn: Callable[[str], Awaitable[None]]) -> None:
"""Flush the buffer contents via send_fn."""
text = self._state.buffer
if not text:
return
await send_fn(text)
self._state.buffer = ""
self._state.token_count = 0
self._state.flush_count += 1
self._state.last_flush_time = time.monotonic()
def reset(self) -> None:
"""Reset buffer state for a new stream."""
self._state = BufferState()
7. Latency Comparison Tables
7.1 Transport Mechanism Latency
| Transport | Connection Setup | TTFB Overhead | Per-Token Overhead | Reconnect Time | Best For |
|---|---|---|---|---|---|
| WebSocket (API GW) | ~100ms (upgrade) | ~5ms | ~2ms | ~150ms | Primary MangaAssist chat |
| SSE (REST API) | ~50ms (HTTP GET) | ~10ms | ~3ms | ~50ms (auto) | Fallback transport |
| Chunked HTTP (ALB) | ~30ms | ~8ms | ~1ms | ~30ms | High-throughput batch |
| Lambda Streaming | ~80ms (cold)/~10ms (warm) | ~15ms | ~2ms | N/A (new request) | Serverless streaming |
| Long Polling | ~30ms | ~full generation | ~0ms (single response) | ~30ms | Legacy clients |
7.2 End-to-End Latency Breakdown (MangaAssist)
| Stage | Sonnet | Haiku | Notes |
|---|---|---|---|
| WebSocket frame receipt | 5ms | 5ms | API Gateway processing |
| Auth + session lookup | 15ms | 15ms | Redis cache hit |
| RAG context retrieval | 120ms | 120ms | OpenSearch vector search |
| Prompt construction | 5ms | 5ms | Template + context assembly |
| Bedrock TTFT | 300ms | 150ms | Time to first token |
| Token streaming (avg response) | 1,875ms (150 tok) | 667ms (100 tok) | Model generation |
| Per-token delivery overhead | 2ms | 2ms | WS frame send |
| Total (first token visible) | ~445ms | ~295ms | User sees text start |
| Total (complete response) | ~2,320ms | ~960ms | Full answer delivered |
7.3 Buffering Strategy Impact on Perceived Latency
| Strategy | Tokens Per Flush | Perceived TTFB (added) | Smoothness (1-5) | Recommended For |
|---|---|---|---|---|
| None | 1 | +0ms | 2 (jittery) | Debug / testing |
| Token batch (3) | 3 | +30-40ms | 3 | General English |
| Token batch (5) | 5 | +50-65ms | 4 | Longer English answers |
| Word boundary | 1-4 | +20-60ms | 4 | English chat |
| Sentence boundary | 10-30 | +200-500ms | 5 | Synopsis/description |
| Japanese-aware (6 chars) | 2-6 | +30-80ms | 4 | Japanese content |
| Adaptive | Variable | +10-60ms | 5 | Production (recommended) |
7.4 Scale Projections
| Metric | Current Target | Infrastructure Impact |
|---|---|---|
| Messages per day | 1,000,000 | ~11.6 messages/sec average |
| Peak messages/sec (3x avg) | ~35 | ~35 concurrent Bedrock streams |
| Concurrent WebSocket connections | ~50,000 | Default limit: 500 (request increase) |
| Bedrock stream throughput | ~35 parallel | On-demand quota sufficient |
| ElastiCache connection pool | ~200 per ECS task | r6g.large supports 65,000 connections |
| DynamoDB session writes/sec | ~70 (writes at start+end) | On-demand capacity handles this |
7.5 Cost Per Message (Streaming)
| Model | Avg Input Tokens | Avg Output Tokens | Input Cost | Output Cost | Total per Message |
|---|---|---|---|---|---|
| Claude 3 Haiku | 500 | 100 | $0.000125 | $0.000125 | $0.000250 |
| Claude 3 Sonnet | 500 | 150 | $0.001500 | $0.002250 | $0.003750 |
| Model | Daily Cost (1M msgs, 100% this model) | Monthly Cost |
|---|---|---|
| Haiku only | $250 | $7,500 |
| Sonnet only | $3,750 | $112,500 |
| 80% Haiku / 20% Sonnet | $950 | $28,500 |
8. Decision Matrix: Choosing the Right Transport
┌─────────────────────────────────────────────────┐
│ Does the client need to send │
│ messages during streaming? │
└──────────────┬──────────────┬───────────────────┘
│ │
Yes │ │ No
▼ ▼
┌──────────────────┐ ┌──────────────────────┐
│ WebSocket │ │ Is HTTP/2 available? │
│ (Primary) │ └─────┬───────┬────────┘
└──────────────────┘ │ │
Yes │ │ No
▼ ▼
┌──────────────┐ ┌──────────────────┐
│ SSE │ │ Chunked HTTP │
│ (Preferred) │ │ (Fallback) │
└──────────────┘ └──────────────────┘
MangaAssist final architecture decision: - Primary: WebSocket via API Gateway (bidirectional chat, typing indicators, heartbeats) - Fallback: SSE over ALB (clients behind WebSocket-blocking proxies) - Batch/Internal: Chunked HTTP via ALB (admin tools, analytics pipelines)
9. Summary
Real-time AI interaction in MangaAssist depends on four pillars working together:
-
Bedrock
InvokeModelWithResponseStreamproduces tokens incrementally from Claude 3, enabling sub-500ms time-to-first-token instead of waiting 2+ seconds for full generation. -
API Gateway WebSocket provides bidirectional, persistent connections for the primary chat experience, with connection management in DynamoDB and heartbeats via ElastiCache Redis.
-
Server-Sent Events offer a simpler, unidirectional fallback with built-in reconnection and Last-Event-ID resume support for clients behind restrictive proxies.
-
Chunked transfer encoding enables streaming HTTP responses for internal services and batch operations, with careful handling of UTF-8 multi-byte boundaries for Japanese content.
The adaptive buffering strategy ties it all together, dynamically adjusting token delivery based on generation speed and content language to ensure a smooth, responsive user experience at 1M messages/day scale.