Task 2.1: Implement Agentic AI Solutions and Tool Integrations
Overview
This task covers building autonomous AI systems that can reason, act, use tools, coordinate with humans, and operate safely within defined boundaries.
Skill 2.1.1: Develop Intelligent Autonomous Systems with Memory and State Management
Core Concepts
- Strands Agents: AWS-native agent framework for building autonomous AI agents with tool use, memory, and planning capabilities
- AWS Agent Squad (formerly Multi-Agent Orchestrator): Framework for coordinating multiple specialized agents
- MCP (Model Context Protocol): Open standard for agent-tool interactions, enabling agents to discover and use tools dynamically
- Memory Types: Short-term (conversation context), long-term (persisted knowledge), episodic (past interaction recall)
User Story 1: E-Commerce Personal Shopping Assistant
As a retail platform product manager, I want an AI shopping assistant that remembers customer preferences across sessions, So that customers receive increasingly personalized recommendations without repeating themselves.
Deep Dive Scenario
Company: MegaMart Online - 50M monthly active users
Problem: Customers complain about repeating preferences every chat session. The existing chatbot treats every conversation as brand new.
Architecture:
User Message
|
v
[API Gateway] --> [Lambda: Session Router]
|
v
[Strands Agent - Shopping Assistant]
|--- Short-term Memory: DynamoDB (session context, current cart)
|--- Long-term Memory: Amazon Bedrock Knowledge Base (purchase history, preferences)
|--- Episodic Memory: OpenSearch (past conversation summaries)
|
|--- Tool: Product Catalog Search (via MCP)
|--- Tool: Price Checker (via MCP)
|--- Tool: Inventory Lookup (via MCP)
|--- Tool: Order Placement (via MCP)
|
v
[Response to User]
Implementation Details:
-
Agent Definition (Strands):
from strands import Agent, tool from strands.models.bedrock import BedrockModel @tool def search_products(query: str, category: str = None, price_max: float = None) -> dict: """Search the product catalog with filters.""" # Query OpenSearch product index results = opensearch_client.search( index="products", body=build_product_query(query, category, price_max) ) return format_results(results) @tool def check_user_preferences(user_id: str) -> dict: """Retrieve stored user preferences and purchase history.""" prefs = dynamodb.get_item(TableName="UserPreferences", Key={"userId": user_id}) history = dynamodb.query(TableName="PurchaseHistory", KeyConditionExpression="userId = :uid") return {"preferences": prefs, "recent_purchases": history} model = BedrockModel(model_id="anthropic.claude-sonnet-4-20250514") shopping_agent = Agent( model=model, tools=[search_products, check_user_preferences, check_inventory, place_order], system_prompt="""You are a personal shopping assistant for MegaMart. Always check user preferences before making recommendations. Remember context from the current conversation. Be proactive about suggesting complementary products.""" ) -
Memory Management:
# Short-term: Conversation buffer in DynamoDB with TTL def save_conversation_turn(session_id, role, content): dynamodb.put_item( TableName="ConversationHistory", Item={ "sessionId": session_id, "timestamp": int(time.time()), "role": role, "content": content, "ttl": int(time.time()) + 3600 # 1-hour TTL for short-term } ) # Long-term: Summarize and persist preferences after each session def persist_session_learnings(session_id, user_id): conversation = get_full_conversation(session_id) summary = bedrock.invoke_model( modelId="anthropic.claude-haiku-4-5-20251001", body={"prompt": f"Extract user preferences from: {conversation}"} ) dynamodb.update_item( TableName="UserPreferences", Key={"userId": user_id}, UpdateExpression="SET preferences = list_append(preferences, :new)", ExpressionAttributeValues={":new": [summary]} ) -
Multi-Agent Coordination (Agent Squad):
from agent_squad import AgentSquad, BedrockAgent # Specialized agents for different departments product_agent = BedrockAgent(name="ProductExpert", instructions="...") returns_agent = BedrockAgent(name="ReturnsSpecialist", instructions="...") style_agent = BedrockAgent(name="StyleAdvisor", instructions="...") squad = AgentSquad( agents=[product_agent, returns_agent, style_agent], classifier_model="anthropic.claude-sonnet-4-20250514" ) # The squad automatically routes to the right agent response = await squad.route("I want to return these shoes and find better ones") # Routes to returns_agent first, then style_agent
State Management Decision Matrix:
| State Type | Storage | TTL | Use Case |
|---|---|---|---|
| Session context | DynamoDB | 1 hour | Current conversation |
| User preferences | DynamoDB | Permanent | Likes, sizes, brands |
| Conversation summaries | OpenSearch | 90 days | Past interaction recall |
| Product embeddings | OpenSearch Serverless | Permanent | Semantic product search |
| Cart state | ElastiCache (Redis) | 24 hours | Active shopping cart |
Exam-Relevant Points: - Strands Agents provide native AWS integration with tool use and memory - Agent Squad handles multi-agent routing and classifier-based delegation - MCP standardizes how agents discover and interact with tools - Memory hierarchy: short-term (session) vs long-term (persistent) vs episodic (historical) - DynamoDB TTL for automatic cleanup of short-term state - Choose memory store based on access pattern (key-value vs. semantic search vs. time-series)
User Story 2: Multi-Agent Customer Service Platform
As a customer service director, I want specialized AI agents that collaborate to resolve complex customer issues, So that resolution time drops while handling multi-department queries in a single interaction.
Deep Dive Scenario
Company: TelecoMax - telecom provider with billing, technical, and account management departments
Architecture:
Customer Query
|
v
[Agent Squad - Classifier]
|
|--- [Billing Agent] --> Tools: Invoice lookup, Payment processing, Plan comparison
|--- [Technical Agent] --> Tools: Network diagnostics, Device troubleshooting, Speed test
|--- [Account Agent] --> Tools: Plan changes, Address update, Identity verification
|--- [Escalation Agent] --> Tools: Ticket creation, Manager notification, SLA tracking
|
|--- Shared Memory Store (DynamoDB + OpenSearch)
|
v
[Unified Response]
Key Implementation - Agent Handoff with Context Preservation:
from agent_squad import AgentSquad, BedrockAgent, ConversationMessage
# Each agent has its own memory but shares a global context
billing_agent = BedrockAgent(
name="BillingExpert",
model_id="anthropic.claude-sonnet-4-20250514",
instructions="""You handle billing inquiries. You have access to:
- Invoice history and payment records
- Plan pricing and promotions
- Refund processing capabilities
When a query involves technical issues, indicate handoff needed.""",
tools=[lookup_invoice, process_payment, compare_plans]
)
technical_agent = BedrockAgent(
name="TechSupport",
model_id="anthropic.claude-sonnet-4-20250514",
instructions="""You handle technical support. You have access to:
- Network diagnostics and outage maps
- Device configuration guides
- Speed test results
When a query involves billing, indicate handoff needed.""",
tools=[run_diagnostics, check_outages, device_troubleshoot]
)
squad = AgentSquad(
agents=[billing_agent, technical_agent, account_agent, escalation_agent],
storage=DynamoDbStorage(table_name="AgentSquadState"),
classifier_model="anthropic.claude-sonnet-4-20250514"
)
# Customer says: "My internet is slow AND my bill seems too high"
# Classifier routes to technical_agent first, then billing_agent
# Both share the conversation context via DynamoDB storage
Skill 2.1.2: Create Advanced Problem-Solving Systems (ReAct, Chain-of-Thought)
Core Concepts
- ReAct Pattern: Reason + Act - the agent thinks about what to do, takes an action, observes the result, then reasons again
- Chain-of-Thought (CoT): Breaking complex problems into sequential reasoning steps
- Step Functions: AWS service to orchestrate multi-step workflows as state machines
- Structured Reasoning: Explicit decomposition of problems into sub-tasks
User Story 3: Financial Fraud Investigation Agent
As a fraud investigation team lead, I want an AI system that methodically investigates suspicious transactions using structured reasoning, So that investigators receive pre-analyzed cases with evidence chains rather than raw alerts.
Deep Dive Scenario
Company: SecureBank - processes 2M transactions/day, generates 5,000 fraud alerts daily
Problem: Analysts spend 80% of their time gathering data before they can even start investigating. Need an AI that follows the same investigative methodology human analysts use.
ReAct Pattern Implementation via Step Functions:
{
"Comment": "Fraud Investigation ReAct Pattern",
"StartAt": "InitialAssessment",
"States": {
"InitialAssessment": {
"Type": "Task",
"Resource": "arn:aws:lambda:us-east-1:123456789:function:ReasonAboutAlert",
"Comment": "REASON: Analyze the alert and determine what information is needed",
"Parameters": {
"alert_data.$": "$.alert",
"prompt": "Given this fraud alert, what are the key risk indicators? What information do we need to investigate? Break this into specific investigation steps."
},
"ResultPath": "$.reasoning",
"Next": "DetermineNextAction"
},
"DetermineNextAction": {
"Type": "Choice",
"Choices": [
{
"Variable": "$.reasoning.next_action",
"StringEquals": "CHECK_TRANSACTION_HISTORY",
"Next": "FetchTransactionHistory"
},
{
"Variable": "$.reasoning.next_action",
"StringEquals": "CHECK_DEVICE_FINGERPRINT",
"Next": "AnalyzeDeviceFingerprint"
},
{
"Variable": "$.reasoning.next_action",
"StringEquals": "CHECK_GEOLOCATION",
"Next": "GeolocationAnalysis"
},
{
"Variable": "$.reasoning.next_action",
"StringEquals": "SUFFICIENT_EVIDENCE",
"Next": "GenerateReport"
}
],
"Default": "FetchTransactionHistory"
},
"FetchTransactionHistory": {
"Type": "Task",
"Resource": "arn:aws:lambda:us-east-1:123456789:function:FetchTransactions",
"Comment": "ACT: Retrieve transaction history for the account",
"ResultPath": "$.observations.transactions",
"Next": "ReasonAfterObservation"
},
"AnalyzeDeviceFingerprint": {
"Type": "Task",
"Resource": "arn:aws:lambda:us-east-1:123456789:function:DeviceAnalysis",
"Comment": "ACT: Check device fingerprint against known patterns",
"ResultPath": "$.observations.device",
"Next": "ReasonAfterObservation"
},
"GeolocationAnalysis": {
"Type": "Task",
"Resource": "arn:aws:lambda:us-east-1:123456789:function:GeoAnalysis",
"Comment": "ACT: Analyze geographic patterns (impossible travel, VPN detection)",
"ResultPath": "$.observations.geolocation",
"Next": "ReasonAfterObservation"
},
"ReasonAfterObservation": {
"Type": "Task",
"Resource": "arn:aws:lambda:us-east-1:123456789:function:ReasonAboutObservation",
"Comment": "OBSERVE + REASON: Analyze new evidence and decide next step",
"Parameters": {
"all_observations.$": "$.observations",
"original_alert.$": "$.alert",
"previous_reasoning.$": "$.reasoning",
"iteration_count.$": "$.iteration_count",
"prompt": "Given all evidence collected so far, reason about: 1) What does this evidence tell us? 2) What patterns emerge? 3) Do we have enough to make a determination, or what else do we need?"
},
"ResultPath": "$.reasoning",
"Next": "CheckIterationLimit"
},
"CheckIterationLimit": {
"Type": "Choice",
"Comment": "Safety: Prevent infinite reasoning loops",
"Choices": [
{
"Variable": "$.iteration_count",
"NumericGreaterThan": 10,
"Next": "GenerateReport"
}
],
"Default": "IncrementCounter"
},
"IncrementCounter": {
"Type": "Pass",
"Parameters": {
"iteration_count.$": "States.MathAdd($.iteration_count, 1)"
},
"ResultPath": "$.iteration_count",
"Next": "DetermineNextAction"
},
"GenerateReport": {
"Type": "Task",
"Resource": "arn:aws:lambda:us-east-1:123456789:function:GenerateInvestigationReport",
"Comment": "Final synthesis: Generate structured investigation report",
"Parameters": {
"all_observations.$": "$.observations",
"reasoning_chain.$": "$.reasoning",
"prompt": "Create a structured fraud investigation report with: Executive Summary, Evidence Chain, Risk Score (1-100), Recommended Action (block/monitor/clear), and Confidence Level."
},
"End": true
}
}
}
Chain-of-Thought Reasoning Lambda:
def reason_about_observation(event, context):
"""REASON step in ReAct loop - uses CoT prompting."""
observations = event["all_observations"]
previous_reasoning = event["previous_reasoning"]
prompt = f"""You are a fraud investigator. Think step by step.
ALERT: {event['original_alert']}
EVIDENCE COLLECTED SO FAR:
{json.dumps(observations, indent=2)}
PREVIOUS REASONING:
{previous_reasoning.get('chain_of_thought', 'Initial investigation')}
Think through this step by step:
1. What does the new evidence reveal?
2. Are there contradictions or confirmations with previous evidence?
3. What patterns are emerging (velocity, geography, device, behavior)?
4. What is the current fraud probability estimate and why?
5. Do we have sufficient evidence to make a determination?
6. If not, what specific information would be most valuable next?
Respond in JSON:
{{
"chain_of_thought": "Your detailed reasoning...",
"fraud_probability": 0.0-1.0,
"key_findings": ["finding1", "finding2"],
"next_action": "CHECK_TRANSACTION_HISTORY|CHECK_DEVICE_FINGERPRINT|CHECK_GEOLOCATION|SUFFICIENT_EVIDENCE",
"confidence": "low|medium|high"
}}"""
response = bedrock.invoke_model(
modelId="anthropic.claude-sonnet-4-20250514",
body={"messages": [{"role": "user", "content": prompt}]}
)
return json.loads(response["body"]["content"][0]["text"])
ReAct Execution Flow Example:
Alert: $4,500 wire transfer from Account #12345 to new payee
REASON (Step 1): "This is a large wire to a new payee. I need to check:
- Recent transaction history for unusual patterns
- Whether the sending device is recognized
- Geographic consistency"
ACT (Step 1): Fetch transaction history
OBSERVE: "Account normally has $200-$500 transactions. No wire transfers in 2 years."
REASON (Step 2): "Unusual amount pattern. Volume anomaly detected.
Need device fingerprint to check if authorized device."
ACT (Step 2): Check device fingerprint
OBSERVE: "Login from new device (Android, Brazil). Account owner's known devices are iPhone, US."
REASON (Step 3): "New device + new geography + unusual amount = high risk.
Let me check geolocation for impossible travel."
ACT (Step 3): Geolocation analysis
OBSERVE: "Last known US login was 2 hours ago. Brazil login is impossible travel."
REASON (Step 4): "SUFFICIENT_EVIDENCE. Fraud probability: 0.94.
Evidence chain: New device + impossible travel + unusual amount + new payee."
REPORT: Risk Score 94/100, Recommended Action: BLOCK, Confidence: HIGH
Exam-Relevant Points: - ReAct = iterative loop of Reason -> Act -> Observe - Step Functions are the AWS-native way to implement ReAct workflows - Chain-of-Thought is implemented via prompting ("think step by step") - Always include iteration limits to prevent infinite loops - Choice states in Step Functions implement the routing logic between reasoning steps - Each "Act" step is a Lambda function calling an external data source - The reasoning step is a Lambda calling an FM with accumulated context
Skill 2.1.3: Develop Safeguarded AI Workflows
Core Concepts
- Stopping Conditions: Rules that halt agent execution (max iterations, cost limits, safety thresholds)
- Timeout Mechanisms: Time-based boundaries on FM operations
- IAM Policies: Resource-level permissions for what agents can access
- Circuit Breakers: Patterns that prevent cascading failures when downstream services fail
User Story 4: Autonomous Code Deployment Agent with Safety Rails
As a DevOps engineering manager, I want an AI agent that can autonomously deploy code changes with built-in safety controls, So that routine deployments happen faster while catastrophic failures are prevented.
Deep Dive Scenario
Company: CloudScale SaaS - deploys 50 times/day across 12 microservices
Safeguard Architecture:
[Deployment Request]
|
v
[Step Functions: SafeDeployment Workflow]
|
|--- [Gate 1: Pre-deployment Validation] (Lambda + IAM scoped)
| |--- Code review status check
| |--- Test coverage threshold (>80%)
| |--- Security scan results
| |--- STOPPING CONDITION: Any critical vulnerability = HALT
|
|--- [Gate 2: Canary Deployment] (Lambda + Timeout)
| |--- Deploy to 5% of traffic
| |--- TIMEOUT: 15-minute observation window
| |--- CIRCUIT BREAKER: Error rate >2% = ROLLBACK
|
|--- [Gate 3: Progressive Rollout] (Lambda + IAM scoped)
| |--- 5% -> 25% -> 50% -> 100%
| |--- STOPPING CONDITION: Latency p99 >500ms = PAUSE
| |--- CIRCUIT BREAKER: 3 consecutive failures = ROLLBACK
|
|--- [Gate 4: Post-deployment Verification]
| |--- Health check all endpoints
| |--- TIMEOUT: 5-minute health check window
|
v
[Success / Rollback Notification]
Step Functions with Safeguards:
{
"StartAt": "PreDeploymentValidation",
"States": {
"PreDeploymentValidation": {
"Type": "Task",
"Resource": "arn:aws:lambda:us-east-1:123:function:ValidateDeployment",
"TimeoutSeconds": 300,
"HeartbeatSeconds": 60,
"Retry": [
{
"ErrorEquals": ["Lambda.ServiceException"],
"IntervalSeconds": 5,
"MaxAttempts": 2,
"BackoffRate": 2.0
}
],
"Catch": [
{
"ErrorEquals": ["CriticalVulnerabilityFound"],
"Next": "HaltDeployment",
"ResultPath": "$.error"
},
{
"ErrorEquals": ["States.Timeout"],
"Next": "HaltDeployment",
"ResultPath": "$.error"
}
],
"Next": "CanaryDeployment"
},
"CanaryDeployment": {
"Type": "Task",
"Resource": "arn:aws:lambda:us-east-1:123:function:DeployCanary",
"TimeoutSeconds": 900,
"Next": "MonitorCanary"
},
"MonitorCanary": {
"Type": "Task",
"Resource": "arn:aws:lambda:us-east-1:123:function:MonitorMetrics",
"TimeoutSeconds": 900,
"Parameters": {
"duration_minutes": 15,
"error_rate_threshold": 0.02,
"latency_p99_threshold_ms": 500
},
"Catch": [
{
"ErrorEquals": ["CircuitBreakerTripped"],
"Next": "AutoRollback",
"ResultPath": "$.error"
}
],
"Next": "ProgressiveRollout"
},
"ProgressiveRollout": {
"Type": "Map",
"ItemsPath": "$.rollout_stages",
"MaxConcurrency": 1,
"Iterator": {
"StartAt": "DeployStage",
"States": {
"DeployStage": {
"Type": "Task",
"Resource": "arn:aws:lambda:us-east-1:123:function:DeployStage",
"TimeoutSeconds": 600,
"Next": "MonitorStage"
},
"MonitorStage": {
"Type": "Task",
"Resource": "arn:aws:lambda:us-east-1:123:function:MonitorMetrics",
"TimeoutSeconds": 600,
"Catch": [
{
"ErrorEquals": ["CircuitBreakerTripped", "StoppingConditionMet"],
"Next": "StageRollback"
}
],
"End": true
},
"StageRollback": {
"Type": "Task",
"Resource": "arn:aws:lambda:us-east-1:123:function:RollbackStage",
"End": true
}
}
},
"Catch": [
{
"ErrorEquals": ["States.ALL"],
"Next": "AutoRollback"
}
],
"Next": "PostDeploymentVerification"
},
"AutoRollback": {
"Type": "Task",
"Resource": "arn:aws:lambda:us-east-1:123:function:FullRollback",
"Next": "NotifyFailure"
},
"HaltDeployment": {
"Type": "Task",
"Resource": "arn:aws:lambda:us-east-1:123:function:NotifyHalt",
"End": true
},
"NotifyFailure": {
"Type": "Task",
"Resource": "arn:aws:sns:us-east-1:123:topic:DeploymentFailures",
"End": true
},
"PostDeploymentVerification": {
"Type": "Task",
"Resource": "arn:aws:lambda:us-east-1:123:function:HealthCheck",
"TimeoutSeconds": 300,
"Next": "NotifySuccess"
},
"NotifySuccess": {
"Type": "Task",
"Resource": "arn:aws:sns:us-east-1:123:topic:DeploymentSuccess",
"End": true
}
}
}
Circuit Breaker Lambda:
import time
class CircuitBreaker:
"""Circuit breaker for FM/service calls."""
CLOSED = "CLOSED" # Normal operation
OPEN = "OPEN" # Failing - reject all calls
HALF_OPEN = "HALF_OPEN" # Testing if service recovered
def __init__(self, failure_threshold=3, recovery_timeout=60):
self.failure_threshold = failure_threshold
self.recovery_timeout = recovery_timeout
self.state = self.CLOSED
self.failure_count = 0
self.last_failure_time = 0
def call(self, func, *args, **kwargs):
if self.state == self.OPEN:
if time.time() - self.last_failure_time > self.recovery_timeout:
self.state = self.HALF_OPEN
else:
raise CircuitBreakerTripped(
f"Circuit breaker OPEN. {self.failure_count} consecutive failures. "
f"Recovery in {self.recovery_timeout - (time.time() - self.last_failure_time):.0f}s"
)
try:
result = func(*args, **kwargs)
if self.state == self.HALF_OPEN:
self.state = self.CLOSED
self.failure_count = 0
return result
except Exception as e:
self.failure_count += 1
self.last_failure_time = time.time()
if self.failure_count >= self.failure_threshold:
self.state = self.OPEN
raise
# Usage in deployment monitoring
breaker = CircuitBreaker(failure_threshold=3, recovery_timeout=120)
def monitor_canary_metrics(event, context):
"""Monitor canary deployment with circuit breaker."""
try:
metrics = breaker.call(
cloudwatch.get_metric_statistics,
Namespace="MyApp",
MetricName="5xxErrors",
Period=60,
Statistics=["Sum"]
)
error_rate = calculate_error_rate(metrics)
if error_rate > event["error_rate_threshold"]:
raise CircuitBreakerTripped(f"Error rate {error_rate} exceeds threshold")
return {"status": "healthy", "error_rate": error_rate}
except CircuitBreakerTripped:
raise # Caught by Step Functions Catch block -> AutoRollback
IAM Policy for Agent - Least Privilege:
{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "AllowDeploymentActions",
"Effect": "Allow",
"Action": [
"ecs:UpdateService",
"ecs:DescribeServices",
"codedeploy:CreateDeployment"
],
"Resource": "arn:aws:ecs:us-east-1:123:service/production/*",
"Condition": {
"StringEquals": {
"aws:RequestTag/ManagedBy": "DeploymentAgent"
}
}
},
{
"Sid": "DenyDangerousActions",
"Effect": "Deny",
"Action": [
"ecs:DeleteService",
"ecs:DeleteCluster",
"rds:DeleteDBInstance"
],
"Resource": "*"
},
{
"Sid": "AllowReadOnlyMonitoring",
"Effect": "Allow",
"Action": [
"cloudwatch:GetMetricStatistics",
"cloudwatch:DescribeAlarms",
"logs:GetLogEvents"
],
"Resource": "*"
}
]
}
Safeguard Summary Table:
| Safeguard | Implementation | Purpose |
|---|---|---|
| Stopping Conditions | Step Functions Choice states | Halt on critical thresholds |
| Timeouts | TimeoutSeconds in Step Functions |
Prevent hung operations |
| Heartbeats | HeartbeatSeconds in Step Functions |
Detect stalled Lambda |
| Circuit Breakers | Lambda code pattern | Prevent cascading failures |
| IAM Boundaries | IAM policies with deny rules | Limit blast radius |
| Retry with Backoff | Step Functions Retry config | Handle transient errors |
| Catch/Fallback | Step Functions Catch blocks | Graceful error handling |
| Iteration Limits | Counter + Choice state | Prevent infinite loops |
Skill 2.1.4: Create Sophisticated Model Coordination Systems
User Story 5: Multi-Model Legal Document Analysis
As a legal operations director, I want a system that coordinates multiple specialized FMs to analyze complex legal documents, So that contracts are reviewed faster with higher accuracy than any single model.
Deep Dive Scenario
Company: LegalEagle - processes 500 contracts/month for Fortune 500 clients
Model Ensemble Architecture:
[Contract Uploaded to S3]
|
v
[Step Functions: Document Analysis Pipeline]
|
|--- [Parallel Branch 1: Entity Extraction]
| |--- FM: Claude (Bedrock) - Extract parties, dates, obligations
|
|--- [Parallel Branch 2: Risk Analysis]
| |--- FM: Claude (Bedrock) - Identify risky clauses
|
|--- [Parallel Branch 3: Compliance Check]
| |--- FM: Fine-tuned model (SageMaker) - Industry-specific compliance
|
|--- [Parallel Branch 4: Financial Terms]
| |--- FM: Specialized math model - Payment terms, penalties, SLAs
|
v
[Custom Aggregation Lambda]
|--- Weighted consensus across models
|--- Conflict resolution logic
|--- Confidence scoring
|
v
[Unified Analysis Report]
Model Selection Framework:
class ModelSelector:
"""Dynamic model selection based on task characteristics."""
MODEL_CAPABILITIES = {
"claude-sonnet": {
"strengths": ["reasoning", "nuance", "long_context"],
"cost_per_1k_tokens": 0.003,
"latency_ms": 2000,
"max_context": 200000
},
"claude-haiku": {
"strengths": ["speed", "classification", "extraction"],
"cost_per_1k_tokens": 0.00025,
"latency_ms": 500,
"max_context": 200000
},
"custom-compliance-model": {
"strengths": ["industry_compliance", "regulatory"],
"cost_per_1k_tokens": 0.005,
"latency_ms": 3000,
"max_context": 32000
}
}
def select_model(self, task_type, document_length, latency_requirement, budget):
"""Select optimal model based on task requirements."""
candidates = []
for model_id, caps in self.MODEL_CAPABILITIES.items():
score = 0
# Task fit
if task_type in caps["strengths"]:
score += 40
# Context window fit
if document_length <= caps["max_context"]:
score += 20
else:
continue # Can't handle this document
# Latency fit
if caps["latency_ms"] <= latency_requirement:
score += 20
# Cost fit
estimated_cost = (document_length / 1000) * caps["cost_per_1k_tokens"]
if estimated_cost <= budget:
score += 20
candidates.append((model_id, score, estimated_cost))
candidates.sort(key=lambda x: x[1], reverse=True)
return candidates[0] if candidates else None
# Usage
selector = ModelSelector()
# Quick classification task -> Haiku
model = selector.select_model("classification", 1000, 1000, 0.01)
# Complex reasoning task -> Sonnet
model = selector.select_model("reasoning", 50000, 5000, 1.00)
# Compliance check -> Custom model
model = selector.select_model("regulatory", 10000, 5000, 0.50)
Custom Aggregation Logic:
def aggregate_model_outputs(event, context):
"""Aggregate outputs from multiple specialized models."""
entity_results = event["entity_extraction"]
risk_results = event["risk_analysis"]
compliance_results = event["compliance_check"]
financial_results = event["financial_terms"]
# Weighted consensus for risk scores
weights = {
"risk_analysis": 0.4, # Primary risk model gets highest weight
"compliance_check": 0.35, # Compliance model is authoritative for regulations
"entity_extraction": 0.15, # Entity model provides supporting context
"financial_terms": 0.10 # Financial model for monetary impact
}
overall_risk_score = (
risk_results["risk_score"] * weights["risk_analysis"] +
compliance_results["risk_score"] * weights["compliance_check"] +
entity_results.get("risk_indicators", 0) * weights["entity_extraction"] +
financial_results.get("financial_risk", 0) * weights["financial_terms"]
)
# Conflict resolution: When models disagree
conflicts = detect_conflicts(entity_results, risk_results, compliance_results)
if conflicts:
# Route conflicts to a "judge" model for resolution
resolution = bedrock.invoke_model(
modelId="anthropic.claude-sonnet-4-20250514",
body={
"messages": [{
"role": "user",
"content": f"""Multiple analysis models disagree on these points:
{json.dumps(conflicts)}
Original analyses:
Risk: {risk_results}
Compliance: {compliance_results}
Provide a reasoned resolution for each conflict."""
}]
}
)
return {
"overall_risk_score": overall_risk_score,
"entities": entity_results,
"risk_findings": risk_results,
"compliance_status": compliance_results,
"financial_summary": financial_results,
"conflicts_resolved": conflicts,
"confidence": calculate_ensemble_confidence(event)
}
Exam-Relevant Points: - Model ensembles use multiple FMs in parallel for different aspects - Aggregation logic combines outputs using weighted scoring or voting - Model selection frameworks choose optimal FM based on task, latency, cost - Conflict resolution can be handled by a "judge" model or rule-based logic - Step Functions Parallel state enables concurrent model invocations - Use specialized fine-tuned models for domain-specific tasks
Skill 2.1.5: Develop Collaborative AI Systems (Human-in-the-Loop)
User Story 6: AI-Assisted Medical Diagnosis with Human Oversight
As a chief medical informatics officer, I want an AI system that generates diagnostic suggestions requiring physician review before being recorded, So that we leverage AI speed while maintaining clinical accountability.
Deep Dive Scenario
Company: MedTech Hospital Network - 200 physicians, 5,000 patients/day
Architecture with Human-in-the-Loop:
[Patient Data + Symptoms]
|
v
[Step Functions: Diagnostic Workflow]
|
|--- [Task 1: FM Generates Differential Diagnosis]
| |--- Claude analyzes symptoms, lab results, history
| |--- Outputs ranked diagnoses with confidence + reasoning
|
|--- [Task 2: Human Review Gateway]
| |--- Step Functions Task Token (.waitForTaskToken)
| |--- API Gateway endpoint for physician approval UI
| |--- Physician can: Approve / Modify / Reject / Request more tests
| |--- TIMEOUT: 4 hours (escalation if not reviewed)
|
|--- [Task 3: Feedback Loop]
| |--- Capture physician's modifications
| |--- Store as training signal for model improvement
| |--- Update diagnostic accuracy metrics
|
v
[Final Diagnosis Recorded in EHR]
Step Functions Human Task Pattern:
{
"HumanReview": {
"Type": "Task",
"Resource": "arn:aws:states:::lambda:invoke.waitForTaskToken",
"Parameters": {
"FunctionName": "SendForPhysicianReview",
"Payload": {
"taskToken.$": "$$.Task.Token",
"diagnosis.$": "$.ai_diagnosis",
"patient_id.$": "$.patient_id",
"urgency.$": "$.urgency_level"
}
},
"TimeoutSeconds": 14400,
"HeartbeatSeconds": 1800,
"Catch": [
{
"ErrorEquals": ["States.Timeout"],
"Next": "EscalateToOnCallPhysician"
}
],
"Next": "ProcessPhysicianDecision"
},
"ProcessPhysicianDecision": {
"Type": "Choice",
"Choices": [
{
"Variable": "$.physician_decision",
"StringEquals": "APPROVED",
"Next": "RecordDiagnosis"
},
{
"Variable": "$.physician_decision",
"StringEquals": "MODIFIED",
"Next": "RecordModifiedDiagnosis"
},
{
"Variable": "$.physician_decision",
"StringEquals": "REJECTED",
"Next": "RequestAdditionalTests"
}
]
}
}
Feedback Collection via API Gateway:
# API Gateway -> Lambda: Physician submits review
def handle_physician_review(event, context):
"""Process physician's review and send task token back to Step Functions."""
body = json.loads(event["body"])
task_token = body["taskToken"]
decision = body["decision"] # APPROVED, MODIFIED, REJECTED
modifications = body.get("modifications", {})
feedback = body.get("feedback_text", "")
# Store feedback for model improvement
dynamodb.put_item(
TableName="DiagnosticFeedback",
Item={
"review_id": str(uuid.uuid4()),
"patient_id": body["patient_id"],
"ai_diagnosis": body["original_diagnosis"],
"physician_decision": decision,
"modifications": modifications,
"feedback": feedback,
"physician_id": event["requestContext"]["authorizer"]["claims"]["sub"],
"timestamp": int(time.time())
}
)
# Resume Step Functions execution
sfn_client.send_task_success(
taskToken=task_token,
output=json.dumps({
"physician_decision": decision,
"final_diagnosis": modifications.get("diagnosis", body["original_diagnosis"]),
"physician_notes": feedback
})
)
return {"statusCode": 200, "body": json.dumps({"message": "Review submitted"})}
Exam-Relevant Points:
- .waitForTaskToken is the key Step Functions feature for human-in-the-loop
- API Gateway provides the HTTP interface for human feedback
- Always set TimeoutSeconds for human review tasks with escalation paths
- Feedback collection enables continuous model improvement
- Human augmentation patterns: Review, Approve, Modify, Reject, Escalate
- HeartbeatSeconds detects if the review interface is still active
Skill 2.1.6: Implement Intelligent Tool Integrations
User Story 7: AI-Powered IT Operations Agent with Tool Arsenal
As an IT operations manager, I want an AI agent with access to infrastructure management tools that validates parameters before acting, So that routine ops tasks are automated safely without misconfigured commands causing outages.
Deep Dive Scenario
Company: CloudOps Inc. - manages 500 EC2 instances, 50 RDS databases, 200 Lambda functions
Strands Agent with Custom Tool Behaviors:
from strands import Agent, tool
from strands.models.bedrock import BedrockModel
import re
# Tool with parameter validation and error handling
@tool
def restart_ec2_instance(instance_id: str, force: bool = False) -> dict:
"""Restart an EC2 instance. Use force=True only for unresponsive instances.
Args:
instance_id: The EC2 instance ID (format: i-xxxxxxxxxxxxxxxxx)
force: If True, performs a hard stop/start. Default is soft reboot.
"""
# Parameter validation
if not re.match(r'^i-[a-f0-9]{17}$', instance_id):
return {"error": f"Invalid instance ID format: {instance_id}. Expected: i-xxxxxxxxxxxxxxxxx"}
# Safety check: Don't force-restart production databases
try:
tags = ec2.describe_tags(
Filters=[{"Name": "resource-id", "Values": [instance_id]}]
)
tag_dict = {t["Key"]: t["Value"] for t in tags["Tags"]}
if force and tag_dict.get("Environment") == "production":
return {
"error": "Cannot force-restart production instances. Use force=False for graceful reboot.",
"suggestion": "If instance is truly unresponsive, escalate to on-call engineer."
}
if tag_dict.get("Protected") == "true":
return {"error": "Instance is tagged as Protected. Manual intervention required."}
# Execute the restart
if force:
ec2.stop_instances(InstanceIds=[instance_id], Force=True)
waiter = ec2.get_waiter('instance_stopped')
waiter.wait(InstanceIds=[instance_id])
ec2.start_instances(InstanceIds=[instance_id])
else:
ec2.reboot_instances(InstanceIds=[instance_id])
return {
"status": "success",
"instance_id": instance_id,
"action": "force_restart" if force else "graceful_reboot",
"message": f"Instance {instance_id} reboot initiated"
}
except ec2.exceptions.ClientError as e:
return {"error": f"AWS API error: {str(e)}", "instance_id": instance_id}
@tool
def query_cloudwatch_metrics(
instance_id: str,
metric_name: str,
period_minutes: int = 60
) -> dict:
"""Query CloudWatch metrics for an instance.
Args:
instance_id: The resource ID to query metrics for
metric_name: One of: CPUUtilization, NetworkIn, NetworkOut, DiskReadOps, StatusCheckFailed
period_minutes: Time range to query (default: 60 minutes, max: 1440)
"""
ALLOWED_METRICS = ["CPUUtilization", "NetworkIn", "NetworkOut", "DiskReadOps", "StatusCheckFailed"]
if metric_name not in ALLOWED_METRICS:
return {"error": f"Metric '{metric_name}' not allowed. Choose from: {ALLOWED_METRICS}"}
if period_minutes > 1440:
return {"error": "Maximum period is 1440 minutes (24 hours)"}
response = cloudwatch.get_metric_statistics(
Namespace="AWS/EC2",
MetricName=metric_name,
Dimensions=[{"Name": "InstanceId", "Value": instance_id}],
StartTime=datetime.utcnow() - timedelta(minutes=period_minutes),
EndTime=datetime.utcnow(),
Period=300,
Statistics=["Average", "Maximum"]
)
return {
"metric": metric_name,
"instance_id": instance_id,
"datapoints": sorted(response["Datapoints"], key=lambda x: x["Timestamp"]),
"period_minutes": period_minutes
}
@tool
def scale_ecs_service(cluster: str, service: str, desired_count: int) -> dict:
"""Scale an ECS service to a desired task count.
Args:
cluster: ECS cluster name
service: ECS service name
desired_count: Target number of tasks (1-100)
"""
if not 1 <= desired_count <= 100:
return {"error": f"desired_count must be 1-100, got {desired_count}"}
current = ecs.describe_services(cluster=cluster, services=[service])
current_count = current["services"][0]["desiredCount"]
# Safety: Don't allow >3x scale-up in single operation
if desired_count > current_count * 3:
return {
"error": f"Scale-up too aggressive: {current_count} -> {desired_count} (>3x).",
"suggestion": f"Scale incrementally. Recommended next step: {current_count * 2} tasks.",
"current_count": current_count
}
ecs.update_service(cluster=cluster, service=service, desiredCount=desired_count)
return {"status": "success", "previous_count": current_count, "new_count": desired_count}
# Agent with all tools and system guardrails
ops_agent = Agent(
model=BedrockModel(model_id="anthropic.claude-sonnet-4-20250514"),
tools=[restart_ec2_instance, query_cloudwatch_metrics, scale_ecs_service,
check_rds_status, query_logs, create_incident_ticket],
system_prompt="""You are an IT Operations AI agent. Follow these rules strictly:
1. ALWAYS check metrics before taking remediation actions
2. NEVER force-restart without first attempting graceful reboot
3. ALWAYS explain your reasoning before executing any action
4. For scaling operations, verify current load before scaling
5. If unsure, create an incident ticket instead of acting autonomously
6. Log all actions taken for audit purposes"""
)
Exam-Relevant Points:
- Strands @tool decorator defines tools with type hints and docstrings
- Standardized function definitions include parameter types, descriptions, and validation
- Error handling should return structured errors (not exceptions) so the agent can reason
- Parameter validation prevents dangerous inputs at the tool level
- Safety checks (tag-based, threshold-based) add domain-specific guardrails
- Tools should be self-documenting via docstrings for agent understanding
Skill 2.1.7: Develop Model Extension Frameworks (MCP Servers)
User Story 8: Enterprise MCP Server Network for FM Tool Access
As a platform engineering lead, I want a standardized network of MCP servers providing tool access to our AI agents, So that any agent can discover and use enterprise tools without custom integrations.
Deep Dive Scenario
Company: DataFlow Enterprise - 20 internal AI agents needing access to 50+ enterprise tools
Architecture:
[AI Agents (Strands, Custom)]
|
v
[MCP Client Libraries] <-- Consistent access patterns
|
|--> [Lambda MCP Server: Lightweight Tools]
| |--- Calculator, Date parser, Unit converter
| |--- Stateless, <15s execution
| |--- Pay-per-use, auto-scaling
|
|--> [ECS MCP Server: Complex Tools]
| |--- Database queries (connection pooling)
| |--- Document processing (PDF parsing, OCR)
| |--- API integrations (Salesforce, SAP)
| |--- Stateful, long-running, persistent connections
|
|--> [ECS MCP Server: ML Tools]
|--- Custom model inference
|--- Data transformation pipelines
|--- Requires GPU, high memory
Lambda-Based Stateless MCP Server (lightweight tools):
# Lambda function implementing MCP server protocol
import json
def lambda_handler(event, context):
"""Stateless MCP server for lightweight tool access."""
method = event.get("method")
if method == "tools/list":
# Tool discovery - agents call this to see what's available
return {
"tools": [
{
"name": "currency_convert",
"description": "Convert between currencies using real-time rates",
"inputSchema": {
"type": "object",
"properties": {
"amount": {"type": "number", "description": "Amount to convert"},
"from_currency": {"type": "string", "description": "Source currency code (e.g., USD)"},
"to_currency": {"type": "string", "description": "Target currency code (e.g., EUR)"}
},
"required": ["amount", "from_currency", "to_currency"]
}
},
{
"name": "calculate_compound_interest",
"description": "Calculate compound interest over a period",
"inputSchema": {
"type": "object",
"properties": {
"principal": {"type": "number"},
"annual_rate": {"type": "number"},
"years": {"type": "number"},
"compounds_per_year": {"type": "integer", "default": 12}
},
"required": ["principal", "annual_rate", "years"]
}
}
]
}
elif method == "tools/call":
tool_name = event["params"]["name"]
arguments = event["params"]["arguments"]
if tool_name == "currency_convert":
return currency_convert(**arguments)
elif tool_name == "calculate_compound_interest":
return calculate_compound_interest(**arguments)
else:
return {"error": f"Unknown tool: {tool_name}"}
def currency_convert(amount, from_currency, to_currency):
"""Stateless currency conversion - no persistent connections needed."""
# Fetch rate from DynamoDB cache (updated by separate Lambda on schedule)
rate = get_exchange_rate(from_currency, to_currency)
converted = amount * rate
return {
"content": [{
"type": "text",
"text": f"{amount} {from_currency} = {converted:.2f} {to_currency} (rate: {rate})"
}]
}
ECS-Based Stateful MCP Server (complex tools):
# ECS container running MCP server with persistent connections
from mcp.server import Server
from mcp.types import Tool, TextContent
import asyncpg
class DatabaseMCPServer:
"""MCP server with connection pooling for database operations."""
def __init__(self):
self.server = Server("enterprise-database-tools")
self.db_pool = None
self.setup_handlers()
async def initialize(self):
"""Create persistent database connection pool."""
self.db_pool = await asyncpg.create_pool(
dsn=os.environ["DATABASE_URL"],
min_size=5,
max_size=20,
command_timeout=30
)
def setup_handlers(self):
@self.server.list_tools()
async def list_tools():
return [
Tool(
name="query_customer_data",
description="Query customer database with read-only access",
inputSchema={
"type": "object",
"properties": {
"customer_id": {"type": "string"},
"fields": {
"type": "array",
"items": {"type": "string"},
"description": "Fields to retrieve: name, email, plan, usage"
}
},
"required": ["customer_id"]
}
),
Tool(
name="run_analytics_query",
description="Run pre-approved analytics queries against the data warehouse",
inputSchema={
"type": "object",
"properties": {
"query_template": {
"type": "string",
"enum": ["monthly_revenue", "churn_analysis", "usage_trends"]
},
"date_range_days": {"type": "integer", "default": 30}
},
"required": ["query_template"]
}
)
]
@self.server.call_tool()
async def call_tool(name: str, arguments: dict):
if name == "query_customer_data":
return await self.query_customer(arguments)
elif name == "run_analytics_query":
return await self.run_analytics(arguments)
async def query_customer(self, args):
"""Uses persistent connection pool - ECS advantage over Lambda."""
async with self.db_pool.acquire() as conn:
fields = args.get("fields", ["name", "email", "plan"])
# Parameterized query to prevent SQL injection
row = await conn.fetchrow(
f"SELECT {', '.join(fields)} FROM customers WHERE id = $1",
args["customer_id"]
)
return [TextContent(type="text", text=json.dumps(dict(row)))]
MCP Client - Consistent Access Pattern:
from mcp import ClientSession
from mcp.client.stdio import stdio_client
class EnterpriseMCPClient:
"""Unified MCP client for consistent access across all MCP servers."""
def __init__(self):
self.servers = {
"lightweight": {"type": "lambda", "arn": "arn:aws:lambda:..."},
"database": {"type": "ecs", "url": "http://db-mcp.internal:8080"},
"ml": {"type": "ecs", "url": "http://ml-mcp.internal:8080"}
}
async def discover_tools(self, server_name=None):
"""Discover available tools across all or specific MCP servers."""
tools = []
servers = [server_name] if server_name else self.servers.keys()
for name in servers:
server_tools = await self._call_server(name, "tools/list")
for tool in server_tools["tools"]:
tool["_server"] = name # Track which server provides this tool
tools.append(tool)
return tools
async def call_tool(self, tool_name, arguments):
"""Route tool call to the correct MCP server."""
# Find which server provides this tool
all_tools = await self.discover_tools()
tool = next((t for t in all_tools if t["name"] == tool_name), None)
if not tool:
raise ValueError(f"Tool '{tool_name}' not found in any registered MCP server")
return await self._call_server(
tool["_server"],
"tools/call",
{"name": tool_name, "arguments": arguments}
)
Lambda vs ECS MCP Server Decision Matrix:
| Factor | Lambda MCP Server | ECS MCP Server |
|---|---|---|
| Best For | Stateless, quick tools | Stateful, complex tools |
| Execution Time | <15 seconds | Minutes to hours |
| Connection Pooling | No (cold starts) | Yes (persistent) |
| Cost Model | Pay-per-invocation | Pay-per-hour |
| Scaling | Automatic, instant | Task-based, configurable |
| Memory | Up to 10GB | Up to 30GB+ |
| GPU Access | No | Yes |
| Use Cases | Calculators, parsers, converters | DB queries, ML inference, document processing |
Exam-Relevant Points:
- MCP provides standardized tool discovery (tools/list) and invocation (tools/call)
- Lambda MCP servers: stateless, lightweight, auto-scaling, pay-per-use
- ECS MCP servers: stateful, connection pooling, GPU access, complex operations
- MCP client libraries ensure consistent access patterns regardless of server type
- Tool definitions use JSON Schema for input validation
- Use Lambda for simple tools, ECS for tools needing persistent state or heavy compute