Real-Time AI Interaction Architecture
MangaAssist context: JP Manga store chatbot on AWS — Bedrock Claude 3 (Sonnet at $3/$15 per 1M tokens input/output, Haiku at $0.25/$1.25), OpenSearch Serverless (vector store), DynamoDB (sessions/products), ECS Fargate (orchestrator), API Gateway WebSocket, ElastiCache Redis. Target: useful answer in under 3 seconds, 1M messages/day scale.
Skill Mapping
| Dimension | Value |
|---|---|
| Certification | AWS Certified AI Practitioner — Specialty (AIP-C01) |
| Task | 2.4 — Select and implement FM API integration patterns |
| Skill | 2.4.2 — Real-Time AI Interaction |
| This File | 01 — Real-Time Interaction Architecture (streaming architecture, WebSocket connection management, time-to-first-token) |
Skill Scope
Skill 2.4.2 focuses on real-time, streaming interactions between users and Foundation Models. While Skill 2.4.1 covers synchronous request/response where the user waits for the complete answer, this skill addresses streaming delivery where tokens appear progressively as the model generates them. For MangaAssist, streaming transforms the user experience from "wait 2-3 seconds for full answer" to "see the first word in 200-400ms and read along as the model types." This file covers the overall streaming architecture, WebSocket connection lifecycle, and time-to-first-token (TTFT) optimization.
Mind Map
mindmap
root((Skill 2.4.2<br/>Real-Time AI<br/>Interaction))
Streaming Architecture
Bedrock InvokeModelWithResponseStream
Server-Sent Events (SSE)
Chunked transfer encoding
Token-by-token delivery
Stream multiplexing
WebSocket Management
API Gateway WebSocket API
Connection lifecycle ($connect/$disconnect)
Route selection ($default)
Connection table (DynamoDB)
Idle timeout handling
Time-to-First-Token
TTFT measurement
Prompt caching (Bedrock)
Pre-warming strategies
Connection keep-alive
Latency waterfall
Backpressure Handling
Client consumption rate
Buffer management
Flow control signals
Memory-bounded queues
Graceful degradation
MangaAssist Streaming
Progressive JP text display
Character-boundary chunking
Partial response caching
Stream recovery on drop
Typing indicator UX
1. Streaming Architecture Overview
1.1 End-to-End Streaming Flow
sequenceDiagram
participant User as Manga Reader<br/>(Browser)
participant WS as API Gateway<br/>WebSocket
participant Lambda as Lambda<br/>$default Route
participant ECS as ECS Fargate<br/>Orchestrator
participant Bedrock as Amazon Bedrock<br/>Claude 3 Streaming
User->>WS: WebSocket frame:<br/>{"action":"chat","message":"おすすめのマンガは?"}
WS->>Lambda: Route to $default handler
Lambda->>ECS: HTTP POST /stream-chat
ECS->>Bedrock: InvokeModelWithResponseStream
loop Token-by-token streaming
Bedrock-->>ECS: Stream chunk (token)
ECS-->>Lambda: Chunked HTTP response
Lambda->>WS: PostToConnection (chunk)
WS-->>User: WebSocket frame (chunk)
end
Note over User,Bedrock: User sees tokens appear progressively<br/>TTFT: ~300ms | Full response: ~2500ms
Bedrock-->>ECS: Stream end (stop_reason)
ECS-->>Lambda: Final chunk + metadata
Lambda->>WS: PostToConnection (done + usage)
WS-->>User: WebSocket frame (complete)
1.2 Streaming vs Non-Streaming Comparison
| Metric | Non-Streaming (InvokeModel) | Streaming (InvokeModelWithResponseStream) |
|---|---|---|
| Time to first visible token | 2000-3000ms | 200-400ms |
| Perceived latency | High (blank → full text) | Low (progressive reveal) |
| Total completion time | ~2500ms | ~2700ms (slightly longer) |
| Network overhead | 1 response payload | N small frames |
| Error handling | Simple (one response) | Complex (mid-stream errors) |
| Client complexity | Low | Medium (buffer management) |
| Cost | Same | Same (per-token pricing) |
1.3 Bedrock Streaming Client
"""
Bedrock streaming client for MangaAssist real-time chat.
Handles InvokeModelWithResponseStream with proper event parsing.
"""
import json
import time
import logging
from typing import Generator, Dict, Any, Optional
from dataclasses import dataclass, field
import boto3
from botocore.config import Config
from botocore.exceptions import ClientError, EventStreamError
logger = logging.getLogger(__name__)
@dataclass
class StreamChunk:
"""Represents a single chunk from the Bedrock stream."""
chunk_type: str # "content_block_delta", "message_start", "message_stop", etc.
text: str = "" # Generated text (for content deltas)
index: int = 0 # Chunk sequence number
input_tokens: int = 0 # Token count (populated at stream end)
output_tokens: int = 0
stop_reason: str = "" # Why generation stopped
latency_ms: float = 0.0 # Time since stream start
@dataclass
class StreamMetrics:
"""Metrics collected during a streaming invocation."""
ttft_ms: float = 0.0 # Time to first token
total_ms: float = 0.0 # Total stream duration
chunks_received: int = 0
total_text_length: int = 0
input_tokens: int = 0
output_tokens: int = 0
stop_reason: str = ""
class BedrockStreamingClient:
"""
Manages streaming invocations to Amazon Bedrock.
Yields parsed chunks for real-time relay to WebSocket clients.
"""
def __init__(self, region: str = "us-east-1"):
self._client = boto3.client(
"bedrock-runtime",
region_name=region,
config=Config(
retries={"max_attempts": 2, "mode": "adaptive"},
connect_timeout=5,
read_timeout=60, # Longer for streaming
max_pool_connections=25,
),
)
def stream_invoke(
self,
model_id: str,
body: dict,
) -> Generator[StreamChunk, None, StreamMetrics]:
"""
Invoke Bedrock with streaming and yield chunks as they arrive.
Usage:
client = BedrockStreamingClient()
gen = client.stream_invoke("anthropic.claude-3-sonnet-...", body)
for chunk in gen:
send_to_websocket(chunk.text)
# After iteration, gen.value contains StreamMetrics
Yields:
StreamChunk for each content delta
Returns:
StreamMetrics (accessible via generator .value after StopIteration)
"""
stream_start = time.time()
first_token_time = None
metrics = StreamMetrics()
chunk_index = 0
try:
response = self._client.invoke_model_with_response_stream(
modelId=model_id,
contentType="application/json",
accept="application/json",
body=json.dumps(body),
)
event_stream = response["body"]
for event in event_stream:
chunk_data = event.get("chunk")
if not chunk_data:
continue
payload = json.loads(chunk_data["bytes"].decode("utf-8"))
event_type = payload.get("type", "")
if event_type == "content_block_delta":
delta = payload.get("delta", {})
text = delta.get("text", "")
if text and first_token_time is None:
first_token_time = time.time()
metrics.ttft_ms = (first_token_time - stream_start) * 1000
metrics.chunks_received += 1
metrics.total_text_length += len(text)
yield StreamChunk(
chunk_type="content_block_delta",
text=text,
index=chunk_index,
latency_ms=(time.time() - stream_start) * 1000,
)
chunk_index += 1
elif event_type == "message_delta":
usage = payload.get("usage", {})
metrics.output_tokens = usage.get("output_tokens", 0)
metrics.stop_reason = payload.get("delta", {}).get("stop_reason", "")
elif event_type == "message_start":
msg = payload.get("message", {})
usage = msg.get("usage", {})
metrics.input_tokens = usage.get("input_tokens", 0)
elif event_type == "message_stop":
pass # Stream complete
except EventStreamError as exc:
logger.error("Stream event error: %s", exc)
yield StreamChunk(
chunk_type="error",
text=f"Stream error: {str(exc)}",
index=chunk_index,
)
except ClientError as exc:
error_code = exc.response["Error"]["Code"]
logger.error("Bedrock client error during stream: %s", error_code)
yield StreamChunk(
chunk_type="error",
text=f"Model error: {error_code}",
index=chunk_index,
)
metrics.total_ms = (time.time() - stream_start) * 1000
logger.info(
"Stream complete | ttft=%.0fms | total=%.0fms | chunks=%d | "
"tokens=%d/%d | stop=%s",
metrics.ttft_ms, metrics.total_ms, metrics.chunks_received,
metrics.input_tokens, metrics.output_tokens, metrics.stop_reason,
)
return metrics
2. WebSocket Connection Management
2.1 Connection Lifecycle
statechart-v2
flowchart TD
subgraph "Connection Lifecycle"
A[Client initiates<br/>WebSocket handshake] --> B[$connect Route<br/>Lambda Handler]
B --> C{Authenticate<br/>& Authorize}
C -->|Valid| D[Store connection<br/>in DynamoDB]
C -->|Invalid| E[Return 403<br/>Reject connection]
D --> F[Connection ACTIVE]
F --> G{Message received}
G -->|chat message| H[$default Route<br/>Lambda Handler]
G -->|ping| I[Heartbeat<br/>Response]
G -->|no activity 10min| J[Idle timeout<br/>API GW disconnects]
H --> K[Process & Stream<br/>Response]
K --> F
J --> L[$disconnect Route<br/>Lambda Handler]
L --> M[Remove from<br/>DynamoDB]
M --> N[Connection CLOSED]
F -->|Client closes| L
end
2.2 Connection Registry
"""
WebSocket connection registry backed by DynamoDB.
Tracks active connections, supports multi-device sessions,
and handles connection lifecycle events.
"""
import time
import logging
from typing import Optional, List, Dict
from dataclasses import dataclass
import boto3
from boto3.dynamodb.conditions import Key, Attr
from botocore.exceptions import ClientError
logger = logging.getLogger(__name__)
@dataclass
class ConnectionRecord:
"""Represents an active WebSocket connection."""
connection_id: str
user_id: str
session_id: str
connected_at: int # epoch ms
last_active_at: int # epoch ms
client_info: Dict = None
ttl: int = 0 # DynamoDB TTL (epoch seconds)
def __post_init__(self):
self.client_info = self.client_info or {}
# Auto-expire connections after 2 hours
if not self.ttl:
self.ttl = int(time.time()) + 7200
class ConnectionRegistry:
"""
Manages WebSocket connections in DynamoDB.
Table schema:
PK: connectionId (String)
GSI: userId-index (userId -> connectionId)
TTL: ttl (Number) — auto-cleanup of stale connections
"""
def __init__(self, table_name: str = "manga-ws-connections"):
self.table = boto3.resource("dynamodb").Table(table_name)
def register(self, record: ConnectionRecord) -> None:
"""Register a new WebSocket connection."""
try:
self.table.put_item(
Item={
"connectionId": record.connection_id,
"userId": record.user_id,
"sessionId": record.session_id,
"connectedAt": record.connected_at,
"lastActiveAt": record.last_active_at,
"clientInfo": record.client_info,
"ttl": record.ttl,
},
ConditionExpression="attribute_not_exists(connectionId)",
)
logger.info(
"Connection registered | connId=%s | userId=%s",
record.connection_id, record.user_id,
)
except ClientError as exc:
if exc.response["Error"]["Code"] == "ConditionalCheckFailedException":
logger.warning("Connection already registered: %s", record.connection_id)
else:
raise
def unregister(self, connection_id: str) -> None:
"""Remove a connection on disconnect."""
try:
self.table.delete_item(Key={"connectionId": connection_id})
logger.info("Connection unregistered | connId=%s", connection_id)
except Exception as exc:
logger.error("Failed to unregister: %s | error=%s", connection_id, exc)
def get_connection(self, connection_id: str) -> Optional[ConnectionRecord]:
"""Retrieve a connection record."""
response = self.table.get_item(Key={"connectionId": connection_id})
item = response.get("Item")
if not item:
return None
return ConnectionRecord(
connection_id=item["connectionId"],
user_id=item["userId"],
session_id=item["sessionId"],
connected_at=item["connectedAt"],
last_active_at=item["lastActiveAt"],
client_info=item.get("clientInfo", {}),
)
def update_activity(self, connection_id: str) -> None:
"""Update last active timestamp (prevents premature TTL expiry)."""
now = int(time.time() * 1000)
self.table.update_item(
Key={"connectionId": connection_id},
UpdateExpression="SET lastActiveAt = :ts, #ttl = :ttl",
ExpressionAttributeNames={"#ttl": "ttl"},
ExpressionAttributeValues={
":ts": now,
":ttl": int(time.time()) + 7200,
},
)
def get_user_connections(self, user_id: str) -> List[str]:
"""Get all active connections for a user (multi-device support)."""
response = self.table.query(
IndexName="userId-index",
KeyConditionExpression=Key("userId").eq(user_id),
)
return [item["connectionId"] for item in response.get("Items", [])]
3. Time-to-First-Token Optimization
3.1 TTFT Waterfall Analysis
gantt
title Time-to-First-Token Waterfall (Target: < 400ms)
dateFormat X
axisFormat %Lms
section Network
WS frame parse :a1, 0, 10
Lambda cold start :crit, a2, 10, 110
Lambda → ECS HTTP :a3, 110, 140
section Preparation
Session lookup (Redis) :b1, 140, 155
Prompt assembly :b2, 155, 165
Bedrock TCP connect :b3, 165, 185
section Model
Bedrock prefill :c1, 185, 350
First token generated :milestone, m1, 350, 350
section Delivery
ECS → Lambda chunk :d1, 350, 360
Lambda → PostToConnection :d2, 360, 380
WS frame to client :d3, 380, 395
3.2 TTFT Optimization Techniques
"""
Time-to-first-token optimization strategies for MangaAssist.
Combines connection pre-warming, prompt caching, and pipeline parallelism.
"""
import time
import logging
import asyncio
from typing import Dict, Any, Optional
from concurrent.futures import ThreadPoolExecutor
import boto3
import redis
logger = logging.getLogger(__name__)
# Shared resources — initialized once per Lambda container / ECS task
_bedrock_client = None
_redis_client = None
_ddb_resource = None
def get_bedrock_client():
"""Lazy singleton — avoids cold-start overhead on repeated invocations."""
global _bedrock_client
if _bedrock_client is None:
from botocore.config import Config
_bedrock_client = boto3.client(
"bedrock-runtime",
config=Config(
connect_timeout=3,
read_timeout=60,
max_pool_connections=25,
tcp_keepalive=True,
),
)
return _bedrock_client
def get_redis_client():
"""Lazy singleton Redis connection."""
global _redis_client
if _redis_client is None:
import os
_redis_client = redis.Redis(
host=os.environ.get("REDIS_HOST", "localhost"),
port=6379,
socket_connect_timeout=2,
socket_timeout=2,
decode_responses=True,
)
return _redis_client
class TTFTOptimizer:
"""
Optimizes time-to-first-token through parallelism and caching.
Key strategies:
1. Parallel session load + prompt assembly
2. Connection keep-alive (TCP reuse)
3. Minimal prompt for fast prefill
4. Pre-computed system prompts
"""
# Pre-computed system prompts — avoid re-encoding on every request
SYSTEM_PROMPTS = {
"ja": "あなたはMangaAssistです。日本のマンガ書店のチャットボットです。敬語を使用してください。",
"en": "You are MangaAssist, a helpful JP manga store chatbot.",
}
def __init__(self):
self._executor = ThreadPoolExecutor(max_workers=3)
async def prepare_stream_request(
self,
connection_id: str,
message: str,
session_id: str,
language: str = "ja",
) -> Dict[str, Any]:
"""
Prepare a streaming request with parallel I/O operations.
Runs session load and cache check concurrently to minimize
the time before we can start the Bedrock stream.
"""
loop = asyncio.get_event_loop()
# Run session load and cache check in parallel
session_future = loop.run_in_executor(
self._executor,
self._load_session_fast,
session_id,
)
cache_future = loop.run_in_executor(
self._executor,
self._check_streaming_cache,
session_id,
message,
)
# Wait for both to complete
history, cached_prefix = await asyncio.gather(session_future, cache_future)
# Build minimal prompt for fastest prefill
body = self._build_minimal_body(message, history, language)
return {
"body": body,
"cached_prefix": cached_prefix,
"model_id": self._select_streaming_model(message),
}
def _load_session_fast(self, session_id: str) -> list:
"""Load session from Redis first, DynamoDB as fallback."""
r = get_redis_client()
import json
# Try Redis (< 5ms)
cached = r.get(f"session:{session_id}")
if cached:
return json.loads(cached)
# Fallback to DynamoDB (~50ms)
ddb = boto3.resource("dynamodb").Table("manga-assist-sessions")
from boto3.dynamodb.conditions import Key
response = ddb.query(
KeyConditionExpression=Key("sessionId").eq(session_id),
ScanIndexForward=False,
Limit=6, # Only last 3 turns (6 messages = 3 user + 3 assistant)
)
items = list(reversed(response.get("Items", [])))
# Cache in Redis for next request
r.setex(f"session:{session_id}", 300, json.dumps(items, default=str))
return items
def _check_streaming_cache(self, session_id: str, message: str) -> Optional[str]:
"""Check if we have a cached partial response for this exact query."""
import hashlib
r = get_redis_client()
key = f"stream_prefix:{hashlib.sha256(f'{session_id}:{message}'.encode()).hexdigest()[:12]}"
return r.get(key)
def _build_minimal_body(self, message: str, history: list, language: str) -> dict:
"""
Build the smallest valid request body for fastest Bedrock prefill.
Prefill time scales with input tokens — keep context minimal.
Last 3 turns (6 messages) ≈ 500-1000 tokens → ~150ms prefill.
Full history (20 messages) ≈ 3000-5000 tokens → ~400ms prefill.
"""
messages = []
for item in history[-6:]: # Last 3 turns only for streaming
messages.append({"role": item["role"], "content": item["content"]})
messages.append({"role": "user", "content": message})
return {
"anthropic_version": "bedrock-2023-05-31",
"max_tokens": 2048, # Shorter max for faster streaming
"temperature": 0.3,
"system": self.SYSTEM_PROMPTS.get(language, self.SYSTEM_PROMPTS["en"]),
"messages": messages,
}
def _select_streaming_model(self, message: str) -> str:
"""Select model for streaming — prefer Haiku for faster TTFT."""
import re
# Haiku: ~150ms TTFT, Sonnet: ~300ms TTFT
# Use Haiku for simple queries, Sonnet for complex
jp_chars = len(re.findall(r"[\u3000-\u9fff]", message))
if len(message) < 100 and jp_chars < 30:
return "anthropic.claude-3-haiku-20240307-v1:0"
return "anthropic.claude-3-sonnet-20240229-v1:0"
4. Stream Relay Architecture
4.1 ECS Stream Relay
The ECS orchestrator acts as a relay between Bedrock's streaming response and the WebSocket API. It reads chunks from Bedrock and pushes them to the client via API Gateway's PostToConnection API.
flowchart LR
subgraph "Bedrock Stream"
B1[message_start] --> B2[content_block_start]
B2 --> B3[content_block_delta<br/>text: お]
B3 --> B4[content_block_delta<br/>text: す]
B4 --> B5[content_block_delta<br/>text: す]
B5 --> B6[content_block_delta<br/>text: め]
B6 --> B7[content_block_stop]
B7 --> B8[message_delta<br/>usage + stop_reason]
B8 --> B9[message_stop]
end
subgraph "Relay Processing"
B3 --> R1[Buffer until<br/>char boundary]
B4 --> R1
B5 --> R1
B6 --> R1
R1 --> R2[Batch small<br/>chunks: おすすめ]
R2 --> R3[PostToConnection]
end
subgraph "Client WebSocket"
R3 --> C1[Frame: おすすめ]
C1 --> C2[Render text<br/>progressively]
end
4.2 Stream Relay Implementation
"""
WebSocket stream relay — bridges Bedrock streaming to API Gateway WebSocket.
Handles chunk batching, character-boundary alignment, and heartbeats.
"""
import json
import time
import logging
import threading
from typing import Optional
import boto3
from botocore.exceptions import ClientError
logger = logging.getLogger(__name__)
class WebSocketStreamRelay:
"""
Relays Bedrock streaming responses to WebSocket clients.
Features:
- Batches small chunks to reduce PostToConnection calls
- Aligns on character boundaries (important for JP text)
- Sends periodic heartbeats during long prefill
- Handles client disconnection mid-stream
"""
def __init__(
self,
api_endpoint: str,
region: str = "us-east-1",
batch_interval_ms: int = 50,
heartbeat_interval_ms: int = 5000,
):
self.apigw = boto3.client(
"apigatewaymanagementapi",
endpoint_url=api_endpoint,
region_name=region,
)
self.batch_interval_ms = batch_interval_ms
self.heartbeat_interval_ms = heartbeat_interval_ms
def relay_stream(
self,
connection_id: str,
stream_generator,
request_id: str,
) -> dict:
"""
Relay a Bedrock stream to a WebSocket connection.
Args:
connection_id: API Gateway WebSocket connection ID
stream_generator: Generator from BedrockStreamingClient.stream_invoke
request_id: Unique request identifier for correlation
Returns:
Dict with relay metrics
"""
relay_start = time.time()
text_buffer = ""
last_send_time = time.time()
chunks_sent = 0
total_text = ""
client_connected = True
# Start heartbeat thread for long prefill periods
heartbeat_stop = threading.Event()
heartbeat_thread = threading.Thread(
target=self._heartbeat_loop,
args=(connection_id, request_id, heartbeat_stop),
daemon=True,
)
heartbeat_thread.start()
try:
for chunk in stream_generator:
if not client_connected:
break
if chunk.chunk_type == "error":
self._send_to_client(connection_id, {
"type": "error",
"requestId": request_id,
"error": chunk.text,
})
break
if chunk.chunk_type == "content_block_delta" and chunk.text:
# Stop heartbeat once content starts flowing
heartbeat_stop.set()
text_buffer += chunk.text
total_text += chunk.text
# Send when buffer is large enough or time threshold passed
now = time.time()
elapsed_ms = (now - last_send_time) * 1000
should_send = (
len(text_buffer) >= 10 # Enough text to send
or elapsed_ms >= self.batch_interval_ms # Time threshold
)
if should_send and text_buffer:
success = self._send_to_client(connection_id, {
"type": "chunk",
"requestId": request_id,
"text": text_buffer,
"index": chunks_sent,
})
if not success:
client_connected = False
break
chunks_sent += 1
text_buffer = ""
last_send_time = now
# Flush remaining buffer
if text_buffer and client_connected:
self._send_to_client(connection_id, {
"type": "chunk",
"requestId": request_id,
"text": text_buffer,
"index": chunks_sent,
})
chunks_sent += 1
# Send completion message
if client_connected:
self._send_to_client(connection_id, {
"type": "done",
"requestId": request_id,
"totalLength": len(total_text),
"chunksDelivered": chunks_sent,
})
finally:
heartbeat_stop.set()
heartbeat_thread.join(timeout=1)
relay_ms = (time.time() - relay_start) * 1000
logger.info(
"Stream relay complete | conn=%s | chunks=%d | text=%d chars | duration=%.0fms",
connection_id, chunks_sent, len(total_text), relay_ms,
)
return {
"chunks_sent": chunks_sent,
"total_text_length": len(total_text),
"relay_duration_ms": relay_ms,
"client_connected": client_connected,
}
def _send_to_client(self, connection_id: str, payload: dict) -> bool:
"""Send a message to a WebSocket client. Returns False if disconnected."""
try:
self.apigw.post_to_connection(
ConnectionId=connection_id,
Data=json.dumps(payload, ensure_ascii=False).encode("utf-8"),
)
return True
except ClientError as exc:
error_code = exc.response["Error"]["Code"]
if error_code == "GoneException":
logger.info("Client disconnected mid-stream | conn=%s", connection_id)
return False
logger.error(
"PostToConnection failed | conn=%s | error=%s",
connection_id, error_code,
)
return False
def _heartbeat_loop(
self,
connection_id: str,
request_id: str,
stop_event: threading.Event,
):
"""Send heartbeat pings during Bedrock prefill period."""
while not stop_event.is_set():
stop_event.wait(self.heartbeat_interval_ms / 1000)
if stop_event.is_set():
break
try:
self.apigw.post_to_connection(
ConnectionId=connection_id,
Data=json.dumps({
"type": "heartbeat",
"requestId": request_id,
"timestamp": int(time.time() * 1000),
}).encode("utf-8"),
)
except ClientError:
break # Client disconnected
5. Connection Health and Metrics
5.1 Connection Monitoring
flowchart TD
subgraph "Health Signals"
H1[WebSocket ping/pong]
H2[Heartbeat messages]
H3[PostToConnection success]
H4[DynamoDB TTL check]
end
subgraph "Metrics"
M1[Active connections<br/>gauge]
M2[TTFT histogram<br/>p50/p95/p99]
M3[Stream completion<br/>rate]
M4[Chunks per stream<br/>histogram]
M5[Client disconnect<br/>during stream rate]
end
H1 --> M1
H2 --> M1
H3 --> M3
H3 --> M5
H4 --> M1
M1 --> CW[CloudWatch<br/>Dashboard]
M2 --> CW
M3 --> CW
M4 --> CW
M5 --> CW
"""
Streaming metrics collector for MangaAssist.
Publishes TTFT, stream completion, and connection health metrics.
"""
import time
import logging
from typing import Optional
from dataclasses import dataclass
import boto3
logger = logging.getLogger(__name__)
@dataclass
class StreamingMetrics:
"""Collected metrics for a single streaming interaction."""
connection_id: str
ttft_ms: float
total_duration_ms: float
chunks_sent: int
text_length: int
model_id: str
completed: bool
disconnect_during_stream: bool = False
class StreamMetricsPublisher:
"""Publishes streaming metrics to CloudWatch."""
NAMESPACE = "MangaAssist/Streaming"
def __init__(self, environment: str = "prod"):
self.cw = boto3.client("cloudwatch")
self.env = environment
def publish(self, metrics: StreamingMetrics) -> None:
"""Publish a set of streaming metrics."""
from datetime import datetime
model_short = "sonnet" if "sonnet" in metrics.model_id else "haiku"
metric_data = [
{
"MetricName": "TimeToFirstToken",
"Value": metrics.ttft_ms,
"Unit": "Milliseconds",
"Dimensions": [
{"Name": "Environment", "Value": self.env},
{"Name": "Model", "Value": model_short},
],
"Timestamp": datetime.utcnow(),
},
{
"MetricName": "StreamDuration",
"Value": metrics.total_duration_ms,
"Unit": "Milliseconds",
"Dimensions": [
{"Name": "Environment", "Value": self.env},
{"Name": "Model", "Value": model_short},
],
"Timestamp": datetime.utcnow(),
},
{
"MetricName": "ChunksPerStream",
"Value": metrics.chunks_sent,
"Unit": "Count",
"Dimensions": [
{"Name": "Environment", "Value": self.env},
],
"Timestamp": datetime.utcnow(),
},
{
"MetricName": "StreamCompleted",
"Value": 1 if metrics.completed else 0,
"Unit": "Count",
"Dimensions": [
{"Name": "Environment", "Value": self.env},
],
"Timestamp": datetime.utcnow(),
},
]
if metrics.disconnect_during_stream:
metric_data.append({
"MetricName": "MidStreamDisconnect",
"Value": 1,
"Unit": "Count",
"Dimensions": [
{"Name": "Environment", "Value": self.env},
],
"Timestamp": datetime.utcnow(),
})
try:
self.cw.put_metric_data(
Namespace=self.NAMESPACE,
MetricData=metric_data,
)
except Exception as exc:
logger.error("Failed to publish streaming metrics: %s", exc)
Key Takeaways
| # | Takeaway |
|---|---|
| 1 | Streaming transforms UX — TTFT of 200-400ms vs 2-3s for full response. Users perceive the bot as faster even though total time is slightly longer. |
| 2 | InvokeModelWithResponseStream yields content_block_delta events with incremental text — parse the event stream and relay chunks to the WebSocket. |
| 3 | WebSocket connection registry in DynamoDB with TTL auto-cleans stale connections. Use a GSI on userId for multi-device support. |
| 4 | Chunk batching (every 50ms or 10+ characters) reduces PostToConnection API calls while keeping perceived latency low. |
| 5 | Heartbeat during prefill — send periodic {"type":"heartbeat"} frames while Bedrock processes the prompt, preventing client-side timeout assumptions. |
| 6 | GoneException from PostToConnection means the client disconnected — stop the Bedrock stream to save cost (you still pay for generated tokens). |
| 7 | TTFT optimization — keep conversation context to last 3 turns for streaming; fewer input tokens means faster Bedrock prefill. |
| 8 | Lazy singleton clients — initialize boto3 clients outside the handler function to reuse TCP connections across invocations, reducing TTFT by 50-100ms. |