Task 2.5: Implement Application Integration Patterns and Development Tools
Overview
This task covers building GenAI-specific API interfaces, accessible UI components, business system enhancements, developer productivity tools, advanced agent patterns, and troubleshooting approaches.
Skill 2.5.1: Create FM API Interfaces for GenAI Workloads
Core Concepts
- Streaming Response Handling: API Gateway must handle SSE/chunked responses differently than REST
- Token Limit Management: Input/output token budgets need API-level enforcement
- Retry Strategies: GenAI-specific retries for model timeouts (longer than typical API timeouts)
User Story 21: Production GenAI API Gateway with Token Management
As a backend engineer, I want an API layer that handles streaming, enforces token limits, and manages timeouts specific to FM workloads, So that downstream consumers don't need to understand FM-specific quirks.
Deep Dive Scenario
Company: AIaaS Corp - serving GenAI APIs to 200 B2B clients
Architecture:
[Client SDK]
|
v
[API Gateway]
|--- Custom domain: api.genai.company.com
|--- Integration timeout: 29s (API GW max)
|--- For longer requests: WebSocket API (no timeout limit)
|
v
[Lambda@Edge / Authorizer]
|--- Token budget validation (check remaining tokens for client)
|--- Request size validation (input tokens)
|
v
[Lambda: Request Processor]
|--- Truncate/chunk input if exceeding model context window
|--- Apply client-specific system prompts
|--- Route streaming vs non-streaming
|
v
[Bedrock FM]
|--- invoke_model (sync, <29s responses)
|--- invoke_model_with_response_stream (streaming)
|
v
[Response Processing]
|--- Token usage tracking per client
|--- Response caching for common queries
|--- Cost metering
Token Limit Management:
class TokenManager:
"""Manage token budgets per client, per request."""
# Approximate token limits by model
MODEL_LIMITS = {
"anthropic.claude-haiku-4-5-20251001": {"max_input": 200000, "max_output": 8192},
"anthropic.claude-sonnet-4-20250514": {"max_input": 200000, "max_output": 8192},
"anthropic.claude-opus-4-20250514": {"max_input": 200000, "max_output": 8192},
}
def validate_and_adjust_request(self, request, client_id, model_id):
"""Validate token limits and adjust request if needed."""
limits = self.MODEL_LIMITS[model_id]
# Estimate input tokens (rough: 1 token ~= 4 chars for English)
estimated_input_tokens = sum(
len(m["content"]) // 4 for m in request["messages"]
)
# Check client's remaining daily budget
remaining_budget = self.get_remaining_budget(client_id)
if estimated_input_tokens > remaining_budget:
raise TokenBudgetExceeded(
f"Estimated {estimated_input_tokens} input tokens exceeds remaining "
f"daily budget of {remaining_budget} tokens"
)
# Check model's context window
if estimated_input_tokens > limits["max_input"] * 0.9:
# Truncate oldest messages to fit
request["messages"] = self.truncate_to_fit(
request["messages"],
max_tokens=int(limits["max_input"] * 0.8) # Leave 20% for safety
)
# Cap output tokens
max_output = min(
request.get("max_tokens", limits["max_output"]),
limits["max_output"],
remaining_budget - estimated_input_tokens
)
request["max_tokens"] = max_output
return request
def truncate_to_fit(self, messages, max_tokens):
"""Keep system message + last N messages that fit in context."""
system_messages = [m for m in messages if m["role"] == "system"]
user_messages = [m for m in messages if m["role"] != "system"]
total = sum(len(m["content"]) // 4 for m in system_messages)
kept = list(system_messages)
# Add messages from most recent, working backwards
for msg in reversed(user_messages):
msg_tokens = len(msg["content"]) // 4
if total + msg_tokens <= max_tokens:
kept.insert(len(system_messages), msg)
total += msg_tokens
else:
break
return kept
def track_usage(self, client_id, usage):
"""Track actual token usage for billing and budget enforcement."""
dynamodb.update_item(
TableName="TokenUsage",
Key={"clientId": client_id, "date": today()},
UpdateExpression="""
ADD inputTokens :input, outputTokens :output, requestCount :one
""",
ExpressionAttributeValues={
":input": usage["input_tokens"],
":output": usage["output_tokens"],
":one": 1
}
)
Retry Strategy for FM Timeouts:
import time
import random
class FMRetryStrategy:
"""GenAI-specific retry strategy accounting for model timeouts."""
def __init__(self):
self.max_retries = 3
self.base_delay = 1.0
def invoke_with_retry(self, model_id, body):
"""Retry with exponential backoff + jitter for FM calls."""
last_exception = None
for attempt in range(self.max_retries + 1):
try:
response = bedrock_runtime.invoke_model(
modelId=model_id,
body=json.dumps(body)
)
return json.loads(response['body'].read())
except bedrock_runtime.exceptions.ThrottlingException:
# Throttled: exponential backoff with jitter
delay = self.base_delay * (2 ** attempt) + random.uniform(0, 1)
time.sleep(delay)
last_exception = "ThrottlingException"
except bedrock_runtime.exceptions.ModelTimeoutException:
# Model took too long: reduce max_tokens and retry
body["max_tokens"] = body.get("max_tokens", 1024) // 2
last_exception = "ModelTimeout"
except bedrock_runtime.exceptions.ModelNotReadyException:
# Model cold start: wait longer
time.sleep(5)
last_exception = "ModelNotReady"
except bedrock_runtime.exceptions.ServiceUnavailableException:
# Service down: try fallback region
return self._invoke_fallback_region(model_id, body)
raise FMInvocationError(f"All retries exhausted. Last error: {last_exception}")
Skill 2.5.2: Develop Accessible AI Interfaces
User Story 22: No-Code AI Workflow Builder for Business Users
As a business operations manager (non-technical), I want to create AI-powered workflows by connecting pre-built components visually, So that I can automate document processing, email drafting, and data analysis without writing code.
Deep Dive Scenario
Company: ProcessCo - 500 business users wanting AI automation, 10 developers to support them
Architecture:
[Business Users]
|
|--- [AWS Amplify: No-Code UI] (drag-and-drop workflow builder)
| |--- Declarative React components for AI features
| |--- Pre-built blocks: Summarize, Classify, Extract, Generate
|
|--- [Bedrock Prompt Flows: No-Code Orchestration]
| |--- Visual workflow designer
| |--- Connect prompts, knowledge bases, and tools
| |--- No Lambda/Step Functions needed
|
|--- [OpenAPI-First Development]
|--- Auto-generated client SDKs
|--- Interactive API docs
|--- Consistent interface across teams
[Developers]
|--- Build reusable AI components
|--- Publish to internal component library
|--- Expose via API-first patterns
Amplify Declarative UI Components:
// React component using Amplify AI Kit
import { AIConversation } from '@aws-amplify/ui-react-ai';
import { generateClient } from 'aws-amplify/api';
// Declarative AI conversation component - no backend code needed
function CustomerSupportChat() {
return (
<AIConversation
// Connects to Bedrock automatically via Amplify backend
aiContext={() => ({
currentUser: getUserContext(),
recentOrders: getRecentOrders()
})}
welcomeMessage="Hi! I can help with orders, returns, and product questions."
suggestedPrompts={[
{ header: "Track Order", prompt: "Where is my most recent order?" },
{ header: "Return Item", prompt: "I need to return an item" },
{ header: "Product Help", prompt: "Help me choose a product" }
]}
// Streaming enabled by default
responseComponents={{
OrderCard: ({ order }) => <OrderTrackingCard order={order} />,
ProductCard: ({ product }) => <ProductRecommendation product={product} />
}}
/>
);
}
Bedrock Prompt Flows - No-Code Workflow:
# Creating a prompt flow programmatically (business users do this via console UI)
bedrock_agent = boto3.client('bedrock-agent')
# Define a no-code workflow: Document -> Extract -> Classify -> Route
flow = bedrock_agent.create_flow(
name="InvoiceProcessingFlow",
description="Extract, classify, and route invoices automatically",
executionRoleArn="arn:aws:iam::123:role/BedrockFlowRole",
definition={
"nodes": [
{
"name": "Input",
"type": "Input",
"configuration": {
"input": {
"document": {"type": "String"}
}
}
},
{
"name": "ExtractInfo",
"type": "Prompt",
"configuration": {
"prompt": {
"sourceConfiguration": {
"inline": {
"modelId": "anthropic.claude-haiku-4-5-20251001",
"templateConfiguration": {
"text": {
"text": "Extract the following from this invoice: vendor name, invoice number, date, total amount, line items.\n\nInvoice:\n{{document}}\n\nRespond in JSON format."
}
}
}
}
}
}
},
{
"name": "ClassifyPriority",
"type": "Prompt",
"configuration": {
"prompt": {
"sourceConfiguration": {
"inline": {
"modelId": "anthropic.claude-haiku-4-5-20251001",
"templateConfiguration": {
"text": {
"text": "Classify this invoice as HIGH, MEDIUM, or LOW priority based on amount and due date.\n\nInvoice data: {{extracted_info}}\n\nRespond with just the priority level."
}
}
}
}
}
}
},
{
"name": "Output",
"type": "Output",
"configuration": {
"output": {
"extracted_info": {"type": "String"},
"priority": {"type": "String"}
}
}
}
],
"connections": [
{"name": "c1", "source": "Input", "target": "ExtractInfo"},
{"name": "c2", "source": "ExtractInfo", "target": "ClassifyPriority"},
{"name": "c3", "source": "ClassifyPriority", "target": "Output"}
]
}
)
OpenAPI Specification for API-First Development:
openapi: "3.0.3"
info:
title: GenAI Platform API
version: "1.0"
description: "AI capabilities exposed as standard REST APIs"
paths:
/v1/summarize:
post:
operationId: summarizeDocument
summary: Summarize a document using AI
requestBody:
required: true
content:
application/json:
schema:
$ref: '#/components/schemas/SummarizeRequest'
responses:
'200':
description: Summary generated
content:
application/json:
schema:
$ref: '#/components/schemas/SummarizeResponse'
'202':
description: Processing (for large documents)
content:
application/json:
schema:
$ref: '#/components/schemas/AsyncJobResponse'
components:
schemas:
SummarizeRequest:
type: object
required: [text]
properties:
text:
type: string
maxLength: 500000
description: "Document text to summarize"
max_length:
type: integer
default: 200
description: "Target summary length in words"
format:
type: string
enum: [paragraph, bullet_points, executive_summary]
default: paragraph
language:
type: string
default: "en"
Skill 2.5.3: Create Business System Enhancements
User Story 23: AI-Enhanced CRM with Document Processing and Knowledge Tools
As a sales operations director, I want AI capabilities embedded in our CRM, document workflows, and internal knowledge base, So that sales reps get AI-assisted deal insights, auto-generated proposals, and instant company knowledge access.
Deep Dive Scenario
Company: SalesForce Corp - 500 sales reps, 10K deals/quarter
Architecture:
[Business Systems Enhancement]
|
|--- [CRM Enhancement: Lambda Functions]
| |--- Deal scoring (AI analyzes deal attributes, predicts win probability)
| |--- Email drafting (generate personalized outreach from deal context)
| |--- Meeting notes summarization
|
|--- [Document Processing: Step Functions]
| |--- Proposal generation (template + deal data -> custom proposal)
| |--- Contract analysis (extract key terms, flag risks)
| |--- Invoice processing (extract, validate, route)
|
|--- [Internal Knowledge: Amazon Q Business]
| |--- Index company docs, Confluence, SharePoint, Salesforce
| |--- Natural language search across all data sources
| |--- Role-based access (sales sees sales docs, HR sees HR docs)
|
|--- [Data Automation: Bedrock Data Automation]
|--- Automated data extraction from unstructured documents
|--- Convert PDFs, images, emails to structured data
CRM Enhancement with Lambda:
# Lambda: AI-powered deal scoring for CRM
def score_deal(event, context):
"""Enhance CRM with AI-powered deal scoring."""
deal = event["deal"]
prompt = f"""Analyze this sales deal and predict win probability (0-100).
Deal Details:
- Company: {deal['company']} (Industry: {deal['industry']})
- Deal Size: ${deal['amount']}
- Stage: {deal['stage']}
- Days in Pipeline: {deal['days_in_pipeline']}
- Champion Identified: {deal['has_champion']}
- Competitor: {deal['competitor']}
- Last Activity: {deal['last_activity_date']}
- Number of Stakeholders: {deal['stakeholder_count']}
Historical Win Rates for Similar Deals:
- Same industry: {deal.get('industry_win_rate', 'N/A')}%
- Same size range: {deal.get('size_win_rate', 'N/A')}%
- Same stage: {deal.get('stage_win_rate', 'N/A')}%
Provide:
1. Win probability (0-100)
2. Top 3 risk factors
3. Top 3 recommended next actions
4. Similar deals that won/lost and why
Respond in JSON format."""
response = bedrock_runtime.invoke_model(
modelId="anthropic.claude-sonnet-4-20250514",
body=json.dumps({
"anthropic_version": "bedrock-2023-05-31",
"max_tokens": 1024,
"messages": [{"role": "user", "content": prompt}]
})
)
scoring = json.loads(response['body'].read())["content"][0]["text"]
# Update CRM record via webhook
crm_client.update_deal(deal["id"], {
"ai_win_probability": json.loads(scoring)["win_probability"],
"ai_risk_factors": json.loads(scoring)["risk_factors"],
"ai_next_actions": json.loads(scoring)["recommended_actions"],
"ai_scored_at": datetime.utcnow().isoformat()
})
return json.loads(scoring)
Document Processing with Step Functions:
{
"Comment": "Document Processing Pipeline",
"StartAt": "ClassifyDocument",
"States": {
"ClassifyDocument": {
"Type": "Task",
"Resource": "arn:aws:lambda:us-east-1:123:function:ClassifyDoc",
"Next": "RouteByType"
},
"RouteByType": {
"Type": "Choice",
"Choices": [
{"Variable": "$.doc_type", "StringEquals": "invoice", "Next": "ProcessInvoice"},
{"Variable": "$.doc_type", "StringEquals": "contract", "Next": "ProcessContract"},
{"Variable": "$.doc_type", "StringEquals": "proposal_request", "Next": "GenerateProposal"}
]
},
"ProcessInvoice": {
"Type": "Parallel",
"Branches": [
{
"StartAt": "ExtractInvoiceData",
"States": {
"ExtractInvoiceData": {
"Type": "Task",
"Resource": "arn:aws:lambda:us-east-1:123:function:ExtractData",
"End": true
}
}
},
{
"StartAt": "ValidateAgainstPO",
"States": {
"ValidateAgainstPO": {
"Type": "Task",
"Resource": "arn:aws:lambda:us-east-1:123:function:ValidatePO",
"End": true
}
}
}
],
"Next": "RouteToApprover"
},
"ProcessContract": {
"Type": "Task",
"Resource": "arn:aws:lambda:us-east-1:123:function:AnalyzeContract",
"Parameters": {
"document.$": "$.document",
"analysis_type": "risk_extraction"
},
"Next": "HumanReview"
},
"GenerateProposal": {
"Type": "Task",
"Resource": "arn:aws:lambda:us-east-1:123:function:GenerateProposal",
"Next": "HumanReview"
},
"HumanReview": {
"Type": "Task",
"Resource": "arn:aws:states:::lambda:invoke.waitForTaskToken",
"Parameters": {
"FunctionName": "NotifyReviewer",
"Payload": {"taskToken.$": "$$.Task.Token", "document.$": "$.document"}
},
"TimeoutSeconds": 86400,
"End": true
}
}
}
Amazon Q Business for Internal Knowledge:
# Set up Amazon Q Business as enterprise knowledge tool
q_business = boto3.client('qbusiness')
# Create application
app = q_business.create_application(
displayName="SalesKnowledge",
description="Internal knowledge base for sales team",
roleArn="arn:aws:iam::123:role/QBusinessRole"
)
# Add data sources
q_business.create_data_source(
applicationId=app["applicationId"],
indexId=index_id,
displayName="Confluence",
configuration={
"type": "CONFLUENCE",
"confluenceConfiguration": {
"sourceConfiguration": {
"hostUrl": "https://company.atlassian.net",
"authType": "OAUTH2"
},
"spaceConfiguration": {
"spaceFilter": ["SALES", "PRODUCTS", "COMPETITORS"]
}
}
}
)
# Add SharePoint data source
q_business.create_data_source(
applicationId=app["applicationId"],
indexId=index_id,
displayName="SharePoint",
configuration={
"type": "SHAREPOINT",
"sharePointConfiguration": {
"siteUrls": ["https://company.sharepoint.com/teams/sales"]
}
}
)
Bedrock Data Automation:
# Automated data extraction from unstructured documents
bedrock_data = boto3.client('bedrock-data-automation')
# Create a blueprint for invoice extraction
blueprint = bedrock_data.create_blueprint(
blueprintName="InvoiceExtraction",
type="DOCUMENT",
schema={
"vendor_name": {"type": "string", "description": "Name of the vendor"},
"invoice_number": {"type": "string", "description": "Invoice number"},
"date": {"type": "date", "description": "Invoice date"},
"line_items": {
"type": "array",
"items": {
"description": {"type": "string"},
"quantity": {"type": "number"},
"unit_price": {"type": "number"},
"total": {"type": "number"}
}
},
"total_amount": {"type": "number", "description": "Total invoice amount"}
}
)
# Process documents automatically
result = bedrock_data.invoke_data_automation(
inputConfiguration={
"s3Uri": "s3://documents/invoices/invoice_001.pdf"
},
blueprintArn=blueprint["blueprintArn"],
outputConfiguration={
"s3Uri": "s3://results/extracted/"
}
)
Skill 2.5.4: Enhance Developer Productivity
User Story 24: AI-Accelerated Development Workflow
As a development team lead, I want AI-powered tools that help developers write, test, and optimize GenAI application code faster, So that our team ships AI features 3x faster with fewer bugs.
Deep Dive Scenario
Amazon Q Developer Capabilities:
[Development Lifecycle Acceleration]
|
|--- [Code Generation]
| |--- Generate API integration code from natural language
| |--- Auto-complete FM invocation patterns
| |--- Generate test cases for GenAI components
|
|--- [Code Refactoring]
| |--- Optimize prompt engineering patterns
| |--- Improve error handling in FM calls
| |--- Convert sync to async processing
|
|--- [API Assistance]
| |--- Suggest correct Bedrock API parameters
| |--- Auto-complete IAM policies for FM access
| |--- Generate CloudFormation/CDK for AI infra
|
|--- [Testing]
| |--- Generate unit tests for FM integration code
| |--- Create mock responses for Bedrock calls
| |--- Prompt regression test generation
|
|--- [Performance Optimization]
| |--- Identify unnecessary FM calls
| |--- Suggest caching strategies
| |--- Optimize token usage in prompts
Example: Using Q Developer for GenAI Code:
# Developer types: "Create a function to invoke Bedrock with streaming and error handling"
# Q Developer generates:
async def invoke_bedrock_streaming(
model_id: str,
messages: list[dict],
system_prompt: str = "",
max_tokens: int = 1024,
on_chunk: callable = None
) -> dict:
"""Invoke Bedrock FM with streaming response and robust error handling.
Args:
model_id: Bedrock model identifier
messages: Conversation messages
system_prompt: Optional system prompt
max_tokens: Maximum tokens to generate
on_chunk: Callback function for each text chunk
Returns:
Complete response with usage statistics
"""
bedrock = boto3.client('bedrock-runtime')
body = {
"anthropic_version": "bedrock-2023-05-31",
"max_tokens": max_tokens,
"messages": messages
}
if system_prompt:
body["system"] = system_prompt
try:
response = bedrock.invoke_model_with_response_stream(
modelId=model_id,
body=json.dumps(body)
)
full_text = ""
usage = {}
for event in response['body']:
chunk = json.loads(event['chunk']['bytes'])
if chunk['type'] == 'content_block_delta':
text = chunk['delta'].get('text', '')
full_text += text
if on_chunk:
on_chunk(text)
elif chunk['type'] == 'message_delta':
usage = chunk.get('usage', {})
return {
"text": full_text,
"usage": usage,
"model": model_id
}
except bedrock.exceptions.ThrottlingException:
raise RetryableError("Model throttled, retry with backoff")
except bedrock.exceptions.ModelTimeoutException:
raise RetryableError("Model timeout, reduce max_tokens and retry")
except bedrock.exceptions.ValidationException as e:
raise InvalidRequestError(f"Invalid request: {e}")
Skill 2.5.5: Develop Advanced GenAI Applications
Core Concepts
- Strands Agents + Agent Squad: AWS-native agent orchestration
- Step Functions for Agent Patterns: Orchestrate complex multi-step agent workflows
- Prompt Chaining: Connect FM outputs sequentially, each step refining the previous
User Story 25: Multi-Agent Research and Report Generation System
As a consulting firm partner, I want an AI system that researches topics, synthesizes findings, and produces analyst-quality reports, So that analysts spend time on insights and recommendations instead of data gathering.
Deep Dive Scenario
Multi-Agent Architecture with Agent Squad:
[Research Request: "Analyze the impact of AI on healthcare"]
|
v
[Agent Squad: Research Coordinator]
|
|--- [Agent 1: Data Gatherer] (Strands Agent)
| |--- Tools: Web search, database query, document retrieval
| |--- Output: Raw data and sources
|
|--- [Agent 2: Analyst] (Strands Agent)
| |--- Tools: Statistical analysis, trend detection
| |--- Input: Data from Agent 1
| |--- Output: Key findings and trends
|
|--- [Agent 3: Writer] (Strands Agent)
| |--- Tools: Document formatter, citation manager
| |--- Input: Findings from Agent 2
| |--- Output: Polished report sections
|
|--- [Agent 4: Reviewer] (Strands Agent)
| |--- Tools: Fact checker, grammar checker
| |--- Input: Draft from Agent 3
| |--- Output: Final reviewed report
|
v
[Final Research Report]
Prompt Chaining with Bedrock:
class PromptChain:
"""Prompt chaining pattern: each step builds on the previous."""
def execute_research_chain(self, topic: str) -> dict:
"""Multi-step prompt chain for research report generation."""
# Step 1: Generate research outline
outline = self.invoke_fm(
system="You are a research strategist.",
prompt=f"""Create a detailed research outline for the topic: {topic}
Include:
1. Key research questions (5-7)
2. Data sources to investigate
3. Report structure recommendation
Respond in JSON format.""",
model="anthropic.claude-sonnet-4-20250514"
)
# Step 2: For each research question, gather analysis
sections = []
for question in json.loads(outline)["research_questions"]:
analysis = self.invoke_fm(
system="You are a research analyst with deep domain expertise.",
prompt=f"""Research question: {question}
Context from outline: {outline}
Provide a thorough analysis with:
- Key findings (with specific data points)
- Supporting evidence
- Counterarguments
- Implications
Be specific and cite sources where possible.""",
model="anthropic.claude-sonnet-4-20250514"
)
sections.append({"question": question, "analysis": analysis})
# Step 3: Synthesize into coherent report
report_draft = self.invoke_fm(
system="You are an expert report writer.",
prompt=f"""Synthesize these research sections into a coherent report.
Topic: {topic}
Outline: {outline}
Sections: {json.dumps(sections)}
Write a professional report with:
- Executive summary
- Introduction
- Main findings (organized by theme, not by question)
- Analysis and implications
- Recommendations
- Conclusion
Maintain an analytical, objective tone.""",
model="anthropic.claude-sonnet-4-20250514" # Use best model for synthesis
)
# Step 4: Review and refine
final_report = self.invoke_fm(
system="You are a senior editor and fact-checker.",
prompt=f"""Review and improve this report:
{report_draft}
Check for:
1. Factual accuracy and consistency
2. Logical flow and argument strength
3. Grammar and style
4. Missing perspectives or gaps
Return the improved report with tracked changes noted.""",
model="anthropic.claude-sonnet-4-20250514"
)
return {
"outline": outline,
"sections": sections,
"draft": report_draft,
"final_report": final_report
}
def invoke_fm(self, system, prompt, model):
"""Helper to invoke FM."""
response = bedrock_runtime.invoke_model(
modelId=model,
body=json.dumps({
"anthropic_version": "bedrock-2023-05-31",
"max_tokens": 4096,
"system": system,
"messages": [{"role": "user", "content": prompt}]
})
)
return json.loads(response['body'].read())["content"][0]["text"]
Step Functions for Agent Pattern Orchestration:
{
"Comment": "Agent Design Pattern: Plan -> Execute -> Validate -> Refine",
"StartAt": "PlanPhase",
"States": {
"PlanPhase": {
"Type": "Task",
"Resource": "arn:aws:lambda:us-east-1:123:function:AgentPlanner",
"Comment": "Agent creates execution plan",
"ResultPath": "$.plan",
"Next": "ExecutePhase"
},
"ExecutePhase": {
"Type": "Map",
"ItemsPath": "$.plan.steps",
"MaxConcurrency": 3,
"Comment": "Execute plan steps (parallel where possible)",
"Iterator": {
"StartAt": "ExecuteStep",
"States": {
"ExecuteStep": {
"Type": "Task",
"Resource": "arn:aws:lambda:us-east-1:123:function:AgentExecutor",
"End": true
}
}
},
"ResultPath": "$.results",
"Next": "ValidatePhase"
},
"ValidatePhase": {
"Type": "Task",
"Resource": "arn:aws:lambda:us-east-1:123:function:AgentValidator",
"Comment": "Validate results meet quality criteria",
"ResultPath": "$.validation",
"Next": "CheckQuality"
},
"CheckQuality": {
"Type": "Choice",
"Choices": [
{
"Variable": "$.validation.meets_criteria",
"BooleanEquals": true,
"Next": "FinalOutput"
},
{
"Variable": "$.validation.iteration",
"NumericLessThan": 3,
"Next": "RefinePhase"
}
],
"Default": "FinalOutput"
},
"RefinePhase": {
"Type": "Task",
"Resource": "arn:aws:lambda:us-east-1:123:function:AgentRefiner",
"Comment": "Refine based on validation feedback",
"Next": "ValidatePhase"
},
"FinalOutput": {
"Type": "Task",
"Resource": "arn:aws:lambda:us-east-1:123:function:FormatOutput",
"End": true
}
}
}
Skill 2.5.6: Improve Troubleshooting Efficiency for FM Applications
User Story 26: GenAI Observability and Troubleshooting Platform
As an SRE (Site Reliability Engineer), I want comprehensive observability for our GenAI applications with automated anomaly detection, So that I can quickly diagnose prompt regressions, latency spikes, and cost anomalies.
Deep Dive Scenario
Company: AIScale - runs 20 FM-powered services, needs <5 min MTTR
Observability Architecture:
[GenAI Application]
|
|--- [CloudWatch Logs] (prompts, responses, errors)
| |
| v
| [CloudWatch Logs Insights]
| |--- Query prompt patterns
| |--- Analyze error distributions
| |--- Detect response quality trends
|
|--- [X-Ray Traces] (end-to-end request flow)
| |--- API Gateway -> Lambda -> Bedrock latency
| |--- Service map visualization
| |--- Error correlation across services
|
|--- [CloudWatch Metrics] (custom GenAI metrics)
| |--- Token usage (input/output)
| |--- Model latency (TTFT, total)
| |--- Error rates by model/endpoint
| |--- Cost per request
|
|--- [Amazon Q Developer]
|--- GenAI-specific error pattern recognition
|--- "Why is latency spiking for Claude Sonnet?"
|--- Auto-suggests fixes based on error patterns
CloudWatch Logs Insights Queries for GenAI:
-- Query 1: Analyze prompt patterns and token usage
fields @timestamp, model_id, input_tokens, output_tokens, latency_ms
| filter @logGroup = '/genai/api/invocations'
| stats
avg(input_tokens) as avg_input,
avg(output_tokens) as avg_output,
avg(latency_ms) as avg_latency,
pct(latency_ms, 99) as p99_latency,
count(*) as request_count
by bin(5m), model_id
| sort @timestamp desc
-- Query 2: Find failing prompts (error patterns)
fields @timestamp, error_type, model_id, prompt_template, error_message
| filter @logGroup = '/genai/api/errors'
| filter error_type = 'ValidationException' OR error_type = 'ThrottlingException'
| stats count(*) as error_count by error_type, model_id, bin(15m)
| sort error_count desc
-- Query 3: Detect prompt injection attempts
fields @timestamp, user_id, prompt_text
| filter @logGroup = '/genai/api/invocations'
| filter prompt_text like /(?i)(ignore|forget|disregard).*(?i)(instruction|prompt|system)/
or prompt_text like /(?i)(you are now|act as|pretend)/
| stats count(*) as injection_attempts by user_id, bin(1h)
| sort injection_attempts desc
-- Query 4: Cost analysis by feature/team
fields @timestamp, team_id, feature, model_id, input_tokens, output_tokens
| filter @logGroup = '/genai/api/invocations'
| stats
sum(input_tokens * 0.000003 + output_tokens * 0.000015) as estimated_cost,
count(*) as request_count,
sum(input_tokens + output_tokens) as total_tokens
by team_id, feature, bin(1d)
| sort estimated_cost desc
-- Query 5: Response quality monitoring (if quality scores are logged)
fields @timestamp, model_id, quality_score, response_length
| filter @logGroup = '/genai/api/quality'
| stats
avg(quality_score) as avg_quality,
min(quality_score) as min_quality,
stddev(quality_score) as quality_variance,
count(*) as sample_size
by model_id, bin(1h)
| filter avg_quality < 0.8 -- Alert on quality drops
X-Ray Tracing for FM API Calls:
from aws_xray_sdk.core import xray_recorder, patch_all
patch_all() # Auto-instrument boto3, requests
def genai_handler(event, context):
"""Lambda with X-Ray tracing for FM troubleshooting."""
# Custom subsegment for detailed FM timing
with xray_recorder.in_subsegment("fm_invocation") as subsegment:
start_time = time.time()
# Annotations (indexed, searchable in X-Ray console)
subsegment.put_annotation("model_id", "anthropic.claude-sonnet-4-20250514")
subsegment.put_annotation("feature", "chat")
subsegment.put_annotation("team_id", event.get("team_id", "unknown"))
response = bedrock_runtime.invoke_model(
modelId="anthropic.claude-sonnet-4-20250514",
body=json.dumps(body)
)
result = json.loads(response['body'].read())
total_time = (time.time() - start_time) * 1000
# Metadata (not indexed, but visible in trace details)
subsegment.put_metadata("input_tokens", result["usage"]["input_tokens"])
subsegment.put_metadata("output_tokens", result["usage"]["output_tokens"])
subsegment.put_metadata("total_latency_ms", total_time)
subsegment.put_metadata("stop_reason", result.get("stop_reason"))
# Extract Bedrock-specific timing from headers
bedrock_latency = response["ResponseMetadata"]["HTTPHeaders"].get(
"x-amzn-bedrock-invocation-latency"
)
subsegment.put_annotation("bedrock_latency_ms", int(bedrock_latency or 0))
return result
Amazon Q Developer for Error Pattern Recognition:
# Example Q Developer interactions for troubleshooting:
Developer: "I'm seeing ThrottlingException errors on Bedrock Claude Sonnet in us-east-1."
Q Developer Analysis:
"Based on the error pattern:
1. Your request rate (150 RPM) approaches the default quota of 200 RPM
2. Burst pattern: 80% of requests occur between 9am-11am
3. Recommendations:
- Request a quota increase via Service Quotas
- Implement request queuing with SQS for non-real-time calls
- Add cross-region failover to us-west-2
- Consider Provisioned Throughput for predictable capacity"
Developer: "Why are my Claude Sonnet responses taking 15 seconds?"
Q Developer Analysis:
"Latency breakdown from your X-Ray traces:
- Network: 50ms
- Bedrock processing: 14,500ms (97% of total)
- Lambda processing: 450ms
Root cause: Your average output is 3,500 tokens. At ~250 tokens/sec, that's 14 seconds.
Recommendations:
- Reduce max_tokens if you don't need long responses
- Use streaming (invoke_model_with_response_stream) for better perceived latency
- Consider Claude Haiku for queries that don't need Sonnet-level reasoning
- Add a system prompt instruction to be concise"
Custom CloudWatch Dashboard for GenAI Monitoring:
# Create GenAI-specific monitoring dashboard
cloudwatch = boto3.client('cloudwatch')
dashboard_body = {
"widgets": [
{
"type": "metric",
"properties": {
"title": "FM Invocation Latency (p50, p90, p99)",
"metrics": [
["GenAI", "InvocationLatency", "ModelId", "claude-sonnet", {"stat": "p50"}],
["GenAI", "InvocationLatency", "ModelId", "claude-sonnet", {"stat": "p90"}],
["GenAI", "InvocationLatency", "ModelId", "claude-sonnet", {"stat": "p99"}]
],
"period": 300,
"view": "timeSeries"
}
},
{
"type": "metric",
"properties": {
"title": "Token Usage (Input vs Output)",
"metrics": [
["GenAI", "InputTokens", "ModelId", "claude-sonnet", {"stat": "Sum"}],
["GenAI", "OutputTokens", "ModelId", "claude-sonnet", {"stat": "Sum"}]
]
}
},
{
"type": "metric",
"properties": {
"title": "Error Rate by Type",
"metrics": [
["GenAI", "Errors", "ErrorType", "ThrottlingException", {"stat": "Sum"}],
["GenAI", "Errors", "ErrorType", "ValidationException", {"stat": "Sum"}],
["GenAI", "Errors", "ErrorType", "ModelTimeout", {"stat": "Sum"}]
]
}
},
{
"type": "metric",
"properties": {
"title": "Estimated Cost per Hour",
"metrics": [
["GenAI", "EstimatedCost", "ModelId", "claude-sonnet", {"stat": "Sum"}],
["GenAI", "EstimatedCost", "ModelId", "claude-haiku", {"stat": "Sum"}]
]
}
},
{
"type": "log",
"properties": {
"title": "Recent Errors",
"query": "fields @timestamp, error_type, model_id, error_message | filter @logGroup = '/genai/api/errors' | sort @timestamp desc | limit 20",
"region": "us-east-1"
}
}
]
}
cloudwatch.put_dashboard(
DashboardName="GenAI-Operations",
DashboardBody=json.dumps(dashboard_body)
)
Troubleshooting Decision Tree:
GenAI Application Issue
|
|--- High Latency?
| |--- Check X-Ray: Where is time spent?
| | |--- Bedrock processing: Reduce max_tokens, use streaming, use faster model
| | |--- Lambda cold start: Increase memory, use provisioned concurrency
| | |--- Network: Check VPC endpoints, NAT gateway
| |--- Check CloudWatch Logs Insights: Token count trends
|
|--- Errors?
| |--- ThrottlingException: Request quota increase, add backoff, cross-region failover
| |--- ValidationException: Check prompt format, token limits, model compatibility
| |--- ModelTimeout: Reduce complexity, lower max_tokens, check model status
| |--- AccessDeniedException: Verify IAM policies, model access permissions
|
|--- Quality Degradation?
| |--- Check Logs Insights: Prompt template changes recently?
| |--- Compare response quality scores over time
| |--- Run prompt regression tests
| |--- Verify knowledge base data freshness
|
|--- Cost Spike?
| |--- Check Logs Insights: Token usage by team/feature
| |--- Identify top consumers
| |--- Check for prompt bloat (unnecessary context in prompts)
| |--- Verify model routing (expensive model used for simple tasks?)
Exam-Relevant Points: - CloudWatch Logs Insights: Primary tool for analyzing GenAI prompts and responses - X-Ray: Traces FM API calls across service boundaries (API GW -> Lambda -> Bedrock) - Amazon Q Developer: GenAI-specific error pattern recognition and fix suggestions - Custom CloudWatch metrics for token usage, cost, and quality scores - Prompt regression testing in CI/CD catches quality drops before production - Key GenAI-specific errors: ThrottlingException, ModelTimeout, ValidationException - X-Ray annotations are indexed and searchable; metadata is for debugging details - Streaming APIs (SSE, WebSocket) reduce perceived latency even if total time is same - Bedrock Prompt Flows for no-code workflow building by business users - Amazon Q Business indexes enterprise data sources for natural language knowledge access - Prompt chaining: sequential FM calls where each builds on the previous output - Agent patterns via Step Functions: Plan -> Execute -> Validate -> Refine loops