LOCAL PREVIEW View on GitHub

Task 2.5: Implement Application Integration Patterns and Development Tools

Overview

This task covers building GenAI-specific API interfaces, accessible UI components, business system enhancements, developer productivity tools, advanced agent patterns, and troubleshooting approaches.


Skill 2.5.1: Create FM API Interfaces for GenAI Workloads

Core Concepts

  • Streaming Response Handling: API Gateway must handle SSE/chunked responses differently than REST
  • Token Limit Management: Input/output token budgets need API-level enforcement
  • Retry Strategies: GenAI-specific retries for model timeouts (longer than typical API timeouts)

User Story 21: Production GenAI API Gateway with Token Management

As a backend engineer, I want an API layer that handles streaming, enforces token limits, and manages timeouts specific to FM workloads, So that downstream consumers don't need to understand FM-specific quirks.

Deep Dive Scenario

Company: AIaaS Corp - serving GenAI APIs to 200 B2B clients

Architecture:

[Client SDK]
    |
    v
[API Gateway]
    |--- Custom domain: api.genai.company.com
    |--- Integration timeout: 29s (API GW max)
    |--- For longer requests: WebSocket API (no timeout limit)
    |
    v
[Lambda@Edge / Authorizer]
    |--- Token budget validation (check remaining tokens for client)
    |--- Request size validation (input tokens)
    |
    v
[Lambda: Request Processor]
    |--- Truncate/chunk input if exceeding model context window
    |--- Apply client-specific system prompts
    |--- Route streaming vs non-streaming
    |
    v
[Bedrock FM]
    |--- invoke_model (sync, <29s responses)
    |--- invoke_model_with_response_stream (streaming)
    |
    v
[Response Processing]
    |--- Token usage tracking per client
    |--- Response caching for common queries
    |--- Cost metering

Token Limit Management:

class TokenManager:
    """Manage token budgets per client, per request."""

    # Approximate token limits by model
    MODEL_LIMITS = {
        "anthropic.claude-haiku-4-5-20251001": {"max_input": 200000, "max_output": 8192},
        "anthropic.claude-sonnet-4-20250514": {"max_input": 200000, "max_output": 8192},
        "anthropic.claude-opus-4-20250514": {"max_input": 200000, "max_output": 8192},
    }

    def validate_and_adjust_request(self, request, client_id, model_id):
        """Validate token limits and adjust request if needed."""
        limits = self.MODEL_LIMITS[model_id]

        # Estimate input tokens (rough: 1 token ~= 4 chars for English)
        estimated_input_tokens = sum(
            len(m["content"]) // 4 for m in request["messages"]
        )

        # Check client's remaining daily budget
        remaining_budget = self.get_remaining_budget(client_id)
        if estimated_input_tokens > remaining_budget:
            raise TokenBudgetExceeded(
                f"Estimated {estimated_input_tokens} input tokens exceeds remaining "
                f"daily budget of {remaining_budget} tokens"
            )

        # Check model's context window
        if estimated_input_tokens > limits["max_input"] * 0.9:
            # Truncate oldest messages to fit
            request["messages"] = self.truncate_to_fit(
                request["messages"],
                max_tokens=int(limits["max_input"] * 0.8)  # Leave 20% for safety
            )

        # Cap output tokens
        max_output = min(
            request.get("max_tokens", limits["max_output"]),
            limits["max_output"],
            remaining_budget - estimated_input_tokens
        )
        request["max_tokens"] = max_output

        return request

    def truncate_to_fit(self, messages, max_tokens):
        """Keep system message + last N messages that fit in context."""
        system_messages = [m for m in messages if m["role"] == "system"]
        user_messages = [m for m in messages if m["role"] != "system"]

        total = sum(len(m["content"]) // 4 for m in system_messages)
        kept = list(system_messages)

        # Add messages from most recent, working backwards
        for msg in reversed(user_messages):
            msg_tokens = len(msg["content"]) // 4
            if total + msg_tokens <= max_tokens:
                kept.insert(len(system_messages), msg)
                total += msg_tokens
            else:
                break

        return kept

    def track_usage(self, client_id, usage):
        """Track actual token usage for billing and budget enforcement."""
        dynamodb.update_item(
            TableName="TokenUsage",
            Key={"clientId": client_id, "date": today()},
            UpdateExpression="""
                ADD inputTokens :input, outputTokens :output, requestCount :one
            """,
            ExpressionAttributeValues={
                ":input": usage["input_tokens"],
                ":output": usage["output_tokens"],
                ":one": 1
            }
        )

Retry Strategy for FM Timeouts:

import time
import random

class FMRetryStrategy:
    """GenAI-specific retry strategy accounting for model timeouts."""

    def __init__(self):
        self.max_retries = 3
        self.base_delay = 1.0

    def invoke_with_retry(self, model_id, body):
        """Retry with exponential backoff + jitter for FM calls."""
        last_exception = None

        for attempt in range(self.max_retries + 1):
            try:
                response = bedrock_runtime.invoke_model(
                    modelId=model_id,
                    body=json.dumps(body)
                )
                return json.loads(response['body'].read())

            except bedrock_runtime.exceptions.ThrottlingException:
                # Throttled: exponential backoff with jitter
                delay = self.base_delay * (2 ** attempt) + random.uniform(0, 1)
                time.sleep(delay)
                last_exception = "ThrottlingException"

            except bedrock_runtime.exceptions.ModelTimeoutException:
                # Model took too long: reduce max_tokens and retry
                body["max_tokens"] = body.get("max_tokens", 1024) // 2
                last_exception = "ModelTimeout"

            except bedrock_runtime.exceptions.ModelNotReadyException:
                # Model cold start: wait longer
                time.sleep(5)
                last_exception = "ModelNotReady"

            except bedrock_runtime.exceptions.ServiceUnavailableException:
                # Service down: try fallback region
                return self._invoke_fallback_region(model_id, body)

        raise FMInvocationError(f"All retries exhausted. Last error: {last_exception}")


Skill 2.5.2: Develop Accessible AI Interfaces

User Story 22: No-Code AI Workflow Builder for Business Users

As a business operations manager (non-technical), I want to create AI-powered workflows by connecting pre-built components visually, So that I can automate document processing, email drafting, and data analysis without writing code.

Deep Dive Scenario

Company: ProcessCo - 500 business users wanting AI automation, 10 developers to support them

Architecture:

[Business Users]
    |
    |--- [AWS Amplify: No-Code UI] (drag-and-drop workflow builder)
    |       |--- Declarative React components for AI features
    |       |--- Pre-built blocks: Summarize, Classify, Extract, Generate
    |
    |--- [Bedrock Prompt Flows: No-Code Orchestration]
    |       |--- Visual workflow designer
    |       |--- Connect prompts, knowledge bases, and tools
    |       |--- No Lambda/Step Functions needed
    |
    |--- [OpenAPI-First Development]
            |--- Auto-generated client SDKs
            |--- Interactive API docs
            |--- Consistent interface across teams

[Developers]
    |--- Build reusable AI components
    |--- Publish to internal component library
    |--- Expose via API-first patterns

Amplify Declarative UI Components:

// React component using Amplify AI Kit
import { AIConversation } from '@aws-amplify/ui-react-ai';
import { generateClient } from 'aws-amplify/api';

// Declarative AI conversation component - no backend code needed
function CustomerSupportChat() {
  return (
    <AIConversation
      // Connects to Bedrock automatically via Amplify backend
      aiContext={() => ({
        currentUser: getUserContext(),
        recentOrders: getRecentOrders()
      })}
      welcomeMessage="Hi! I can help with orders, returns, and product questions."
      suggestedPrompts={[
        { header: "Track Order", prompt: "Where is my most recent order?" },
        { header: "Return Item", prompt: "I need to return an item" },
        { header: "Product Help", prompt: "Help me choose a product" }
      ]}
      // Streaming enabled by default
      responseComponents={{
        OrderCard: ({ order }) => <OrderTrackingCard order={order} />,
        ProductCard: ({ product }) => <ProductRecommendation product={product} />
      }}
    />
  );
}

Bedrock Prompt Flows - No-Code Workflow:

# Creating a prompt flow programmatically (business users do this via console UI)
bedrock_agent = boto3.client('bedrock-agent')

# Define a no-code workflow: Document -> Extract -> Classify -> Route
flow = bedrock_agent.create_flow(
    name="InvoiceProcessingFlow",
    description="Extract, classify, and route invoices automatically",
    executionRoleArn="arn:aws:iam::123:role/BedrockFlowRole",
    definition={
        "nodes": [
            {
                "name": "Input",
                "type": "Input",
                "configuration": {
                    "input": {
                        "document": {"type": "String"}
                    }
                }
            },
            {
                "name": "ExtractInfo",
                "type": "Prompt",
                "configuration": {
                    "prompt": {
                        "sourceConfiguration": {
                            "inline": {
                                "modelId": "anthropic.claude-haiku-4-5-20251001",
                                "templateConfiguration": {
                                    "text": {
                                        "text": "Extract the following from this invoice: vendor name, invoice number, date, total amount, line items.\n\nInvoice:\n{{document}}\n\nRespond in JSON format."
                                    }
                                }
                            }
                        }
                    }
                }
            },
            {
                "name": "ClassifyPriority",
                "type": "Prompt",
                "configuration": {
                    "prompt": {
                        "sourceConfiguration": {
                            "inline": {
                                "modelId": "anthropic.claude-haiku-4-5-20251001",
                                "templateConfiguration": {
                                    "text": {
                                        "text": "Classify this invoice as HIGH, MEDIUM, or LOW priority based on amount and due date.\n\nInvoice data: {{extracted_info}}\n\nRespond with just the priority level."
                                    }
                                }
                            }
                        }
                    }
                }
            },
            {
                "name": "Output",
                "type": "Output",
                "configuration": {
                    "output": {
                        "extracted_info": {"type": "String"},
                        "priority": {"type": "String"}
                    }
                }
            }
        ],
        "connections": [
            {"name": "c1", "source": "Input", "target": "ExtractInfo"},
            {"name": "c2", "source": "ExtractInfo", "target": "ClassifyPriority"},
            {"name": "c3", "source": "ClassifyPriority", "target": "Output"}
        ]
    }
)

OpenAPI Specification for API-First Development:

openapi: "3.0.3"
info:
  title: GenAI Platform API
  version: "1.0"
  description: "AI capabilities exposed as standard REST APIs"

paths:
  /v1/summarize:
    post:
      operationId: summarizeDocument
      summary: Summarize a document using AI
      requestBody:
        required: true
        content:
          application/json:
            schema:
              $ref: '#/components/schemas/SummarizeRequest'
      responses:
        '200':
          description: Summary generated
          content:
            application/json:
              schema:
                $ref: '#/components/schemas/SummarizeResponse'
        '202':
          description: Processing (for large documents)
          content:
            application/json:
              schema:
                $ref: '#/components/schemas/AsyncJobResponse'

components:
  schemas:
    SummarizeRequest:
      type: object
      required: [text]
      properties:
        text:
          type: string
          maxLength: 500000
          description: "Document text to summarize"
        max_length:
          type: integer
          default: 200
          description: "Target summary length in words"
        format:
          type: string
          enum: [paragraph, bullet_points, executive_summary]
          default: paragraph
        language:
          type: string
          default: "en"


Skill 2.5.3: Create Business System Enhancements

User Story 23: AI-Enhanced CRM with Document Processing and Knowledge Tools

As a sales operations director, I want AI capabilities embedded in our CRM, document workflows, and internal knowledge base, So that sales reps get AI-assisted deal insights, auto-generated proposals, and instant company knowledge access.

Deep Dive Scenario

Company: SalesForce Corp - 500 sales reps, 10K deals/quarter

Architecture:

[Business Systems Enhancement]
    |
    |--- [CRM Enhancement: Lambda Functions]
    |       |--- Deal scoring (AI analyzes deal attributes, predicts win probability)
    |       |--- Email drafting (generate personalized outreach from deal context)
    |       |--- Meeting notes summarization
    |
    |--- [Document Processing: Step Functions]
    |       |--- Proposal generation (template + deal data -> custom proposal)
    |       |--- Contract analysis (extract key terms, flag risks)
    |       |--- Invoice processing (extract, validate, route)
    |
    |--- [Internal Knowledge: Amazon Q Business]
    |       |--- Index company docs, Confluence, SharePoint, Salesforce
    |       |--- Natural language search across all data sources
    |       |--- Role-based access (sales sees sales docs, HR sees HR docs)
    |
    |--- [Data Automation: Bedrock Data Automation]
            |--- Automated data extraction from unstructured documents
            |--- Convert PDFs, images, emails to structured data

CRM Enhancement with Lambda:

# Lambda: AI-powered deal scoring for CRM
def score_deal(event, context):
    """Enhance CRM with AI-powered deal scoring."""

    deal = event["deal"]

    prompt = f"""Analyze this sales deal and predict win probability (0-100).

    Deal Details:
    - Company: {deal['company']} (Industry: {deal['industry']})
    - Deal Size: ${deal['amount']}
    - Stage: {deal['stage']}
    - Days in Pipeline: {deal['days_in_pipeline']}
    - Champion Identified: {deal['has_champion']}
    - Competitor: {deal['competitor']}
    - Last Activity: {deal['last_activity_date']}
    - Number of Stakeholders: {deal['stakeholder_count']}

    Historical Win Rates for Similar Deals:
    - Same industry: {deal.get('industry_win_rate', 'N/A')}%
    - Same size range: {deal.get('size_win_rate', 'N/A')}%
    - Same stage: {deal.get('stage_win_rate', 'N/A')}%

    Provide:
    1. Win probability (0-100)
    2. Top 3 risk factors
    3. Top 3 recommended next actions
    4. Similar deals that won/lost and why

    Respond in JSON format."""

    response = bedrock_runtime.invoke_model(
        modelId="anthropic.claude-sonnet-4-20250514",
        body=json.dumps({
            "anthropic_version": "bedrock-2023-05-31",
            "max_tokens": 1024,
            "messages": [{"role": "user", "content": prompt}]
        })
    )

    scoring = json.loads(response['body'].read())["content"][0]["text"]

    # Update CRM record via webhook
    crm_client.update_deal(deal["id"], {
        "ai_win_probability": json.loads(scoring)["win_probability"],
        "ai_risk_factors": json.loads(scoring)["risk_factors"],
        "ai_next_actions": json.loads(scoring)["recommended_actions"],
        "ai_scored_at": datetime.utcnow().isoformat()
    })

    return json.loads(scoring)

Document Processing with Step Functions:

{
  "Comment": "Document Processing Pipeline",
  "StartAt": "ClassifyDocument",
  "States": {
    "ClassifyDocument": {
      "Type": "Task",
      "Resource": "arn:aws:lambda:us-east-1:123:function:ClassifyDoc",
      "Next": "RouteByType"
    },
    "RouteByType": {
      "Type": "Choice",
      "Choices": [
        {"Variable": "$.doc_type", "StringEquals": "invoice", "Next": "ProcessInvoice"},
        {"Variable": "$.doc_type", "StringEquals": "contract", "Next": "ProcessContract"},
        {"Variable": "$.doc_type", "StringEquals": "proposal_request", "Next": "GenerateProposal"}
      ]
    },
    "ProcessInvoice": {
      "Type": "Parallel",
      "Branches": [
        {
          "StartAt": "ExtractInvoiceData",
          "States": {
            "ExtractInvoiceData": {
              "Type": "Task",
              "Resource": "arn:aws:lambda:us-east-1:123:function:ExtractData",
              "End": true
            }
          }
        },
        {
          "StartAt": "ValidateAgainstPO",
          "States": {
            "ValidateAgainstPO": {
              "Type": "Task",
              "Resource": "arn:aws:lambda:us-east-1:123:function:ValidatePO",
              "End": true
            }
          }
        }
      ],
      "Next": "RouteToApprover"
    },
    "ProcessContract": {
      "Type": "Task",
      "Resource": "arn:aws:lambda:us-east-1:123:function:AnalyzeContract",
      "Parameters": {
        "document.$": "$.document",
        "analysis_type": "risk_extraction"
      },
      "Next": "HumanReview"
    },
    "GenerateProposal": {
      "Type": "Task",
      "Resource": "arn:aws:lambda:us-east-1:123:function:GenerateProposal",
      "Next": "HumanReview"
    },
    "HumanReview": {
      "Type": "Task",
      "Resource": "arn:aws:states:::lambda:invoke.waitForTaskToken",
      "Parameters": {
        "FunctionName": "NotifyReviewer",
        "Payload": {"taskToken.$": "$$.Task.Token", "document.$": "$.document"}
      },
      "TimeoutSeconds": 86400,
      "End": true
    }
  }
}

Amazon Q Business for Internal Knowledge:

# Set up Amazon Q Business as enterprise knowledge tool
q_business = boto3.client('qbusiness')

# Create application
app = q_business.create_application(
    displayName="SalesKnowledge",
    description="Internal knowledge base for sales team",
    roleArn="arn:aws:iam::123:role/QBusinessRole"
)

# Add data sources
q_business.create_data_source(
    applicationId=app["applicationId"],
    indexId=index_id,
    displayName="Confluence",
    configuration={
        "type": "CONFLUENCE",
        "confluenceConfiguration": {
            "sourceConfiguration": {
                "hostUrl": "https://company.atlassian.net",
                "authType": "OAUTH2"
            },
            "spaceConfiguration": {
                "spaceFilter": ["SALES", "PRODUCTS", "COMPETITORS"]
            }
        }
    }
)

# Add SharePoint data source
q_business.create_data_source(
    applicationId=app["applicationId"],
    indexId=index_id,
    displayName="SharePoint",
    configuration={
        "type": "SHAREPOINT",
        "sharePointConfiguration": {
            "siteUrls": ["https://company.sharepoint.com/teams/sales"]
        }
    }
)

Bedrock Data Automation:

# Automated data extraction from unstructured documents
bedrock_data = boto3.client('bedrock-data-automation')

# Create a blueprint for invoice extraction
blueprint = bedrock_data.create_blueprint(
    blueprintName="InvoiceExtraction",
    type="DOCUMENT",
    schema={
        "vendor_name": {"type": "string", "description": "Name of the vendor"},
        "invoice_number": {"type": "string", "description": "Invoice number"},
        "date": {"type": "date", "description": "Invoice date"},
        "line_items": {
            "type": "array",
            "items": {
                "description": {"type": "string"},
                "quantity": {"type": "number"},
                "unit_price": {"type": "number"},
                "total": {"type": "number"}
            }
        },
        "total_amount": {"type": "number", "description": "Total invoice amount"}
    }
)

# Process documents automatically
result = bedrock_data.invoke_data_automation(
    inputConfiguration={
        "s3Uri": "s3://documents/invoices/invoice_001.pdf"
    },
    blueprintArn=blueprint["blueprintArn"],
    outputConfiguration={
        "s3Uri": "s3://results/extracted/"
    }
)


Skill 2.5.4: Enhance Developer Productivity

User Story 24: AI-Accelerated Development Workflow

As a development team lead, I want AI-powered tools that help developers write, test, and optimize GenAI application code faster, So that our team ships AI features 3x faster with fewer bugs.

Deep Dive Scenario

Amazon Q Developer Capabilities:

[Development Lifecycle Acceleration]
    |
    |--- [Code Generation]
    |       |--- Generate API integration code from natural language
    |       |--- Auto-complete FM invocation patterns
    |       |--- Generate test cases for GenAI components
    |
    |--- [Code Refactoring]
    |       |--- Optimize prompt engineering patterns
    |       |--- Improve error handling in FM calls
    |       |--- Convert sync to async processing
    |
    |--- [API Assistance]
    |       |--- Suggest correct Bedrock API parameters
    |       |--- Auto-complete IAM policies for FM access
    |       |--- Generate CloudFormation/CDK for AI infra
    |
    |--- [Testing]
    |       |--- Generate unit tests for FM integration code
    |       |--- Create mock responses for Bedrock calls
    |       |--- Prompt regression test generation
    |
    |--- [Performance Optimization]
    |       |--- Identify unnecessary FM calls
    |       |--- Suggest caching strategies
    |       |--- Optimize token usage in prompts

Example: Using Q Developer for GenAI Code:

# Developer types: "Create a function to invoke Bedrock with streaming and error handling"
# Q Developer generates:

async def invoke_bedrock_streaming(
    model_id: str,
    messages: list[dict],
    system_prompt: str = "",
    max_tokens: int = 1024,
    on_chunk: callable = None
) -> dict:
    """Invoke Bedrock FM with streaming response and robust error handling.

    Args:
        model_id: Bedrock model identifier
        messages: Conversation messages
        system_prompt: Optional system prompt
        max_tokens: Maximum tokens to generate
        on_chunk: Callback function for each text chunk

    Returns:
        Complete response with usage statistics
    """
    bedrock = boto3.client('bedrock-runtime')

    body = {
        "anthropic_version": "bedrock-2023-05-31",
        "max_tokens": max_tokens,
        "messages": messages
    }
    if system_prompt:
        body["system"] = system_prompt

    try:
        response = bedrock.invoke_model_with_response_stream(
            modelId=model_id,
            body=json.dumps(body)
        )

        full_text = ""
        usage = {}

        for event in response['body']:
            chunk = json.loads(event['chunk']['bytes'])

            if chunk['type'] == 'content_block_delta':
                text = chunk['delta'].get('text', '')
                full_text += text
                if on_chunk:
                    on_chunk(text)

            elif chunk['type'] == 'message_delta':
                usage = chunk.get('usage', {})

        return {
            "text": full_text,
            "usage": usage,
            "model": model_id
        }

    except bedrock.exceptions.ThrottlingException:
        raise RetryableError("Model throttled, retry with backoff")
    except bedrock.exceptions.ModelTimeoutException:
        raise RetryableError("Model timeout, reduce max_tokens and retry")
    except bedrock.exceptions.ValidationException as e:
        raise InvalidRequestError(f"Invalid request: {e}")


Skill 2.5.5: Develop Advanced GenAI Applications

Core Concepts

  • Strands Agents + Agent Squad: AWS-native agent orchestration
  • Step Functions for Agent Patterns: Orchestrate complex multi-step agent workflows
  • Prompt Chaining: Connect FM outputs sequentially, each step refining the previous

User Story 25: Multi-Agent Research and Report Generation System

As a consulting firm partner, I want an AI system that researches topics, synthesizes findings, and produces analyst-quality reports, So that analysts spend time on insights and recommendations instead of data gathering.

Deep Dive Scenario

Multi-Agent Architecture with Agent Squad:

[Research Request: "Analyze the impact of AI on healthcare"]
    |
    v
[Agent Squad: Research Coordinator]
    |
    |--- [Agent 1: Data Gatherer] (Strands Agent)
    |       |--- Tools: Web search, database query, document retrieval
    |       |--- Output: Raw data and sources
    |
    |--- [Agent 2: Analyst] (Strands Agent)
    |       |--- Tools: Statistical analysis, trend detection
    |       |--- Input: Data from Agent 1
    |       |--- Output: Key findings and trends
    |
    |--- [Agent 3: Writer] (Strands Agent)
    |       |--- Tools: Document formatter, citation manager
    |       |--- Input: Findings from Agent 2
    |       |--- Output: Polished report sections
    |
    |--- [Agent 4: Reviewer] (Strands Agent)
    |       |--- Tools: Fact checker, grammar checker
    |       |--- Input: Draft from Agent 3
    |       |--- Output: Final reviewed report
    |
    v
[Final Research Report]

Prompt Chaining with Bedrock:

class PromptChain:
    """Prompt chaining pattern: each step builds on the previous."""

    def execute_research_chain(self, topic: str) -> dict:
        """Multi-step prompt chain for research report generation."""

        # Step 1: Generate research outline
        outline = self.invoke_fm(
            system="You are a research strategist.",
            prompt=f"""Create a detailed research outline for the topic: {topic}

            Include:
            1. Key research questions (5-7)
            2. Data sources to investigate
            3. Report structure recommendation

            Respond in JSON format.""",
            model="anthropic.claude-sonnet-4-20250514"
        )

        # Step 2: For each research question, gather analysis
        sections = []
        for question in json.loads(outline)["research_questions"]:
            analysis = self.invoke_fm(
                system="You are a research analyst with deep domain expertise.",
                prompt=f"""Research question: {question}

                Context from outline: {outline}

                Provide a thorough analysis with:
                - Key findings (with specific data points)
                - Supporting evidence
                - Counterarguments
                - Implications

                Be specific and cite sources where possible.""",
                model="anthropic.claude-sonnet-4-20250514"
            )
            sections.append({"question": question, "analysis": analysis})

        # Step 3: Synthesize into coherent report
        report_draft = self.invoke_fm(
            system="You are an expert report writer.",
            prompt=f"""Synthesize these research sections into a coherent report.

            Topic: {topic}
            Outline: {outline}
            Sections: {json.dumps(sections)}

            Write a professional report with:
            - Executive summary
            - Introduction
            - Main findings (organized by theme, not by question)
            - Analysis and implications
            - Recommendations
            - Conclusion

            Maintain an analytical, objective tone.""",
            model="anthropic.claude-sonnet-4-20250514"  # Use best model for synthesis
        )

        # Step 4: Review and refine
        final_report = self.invoke_fm(
            system="You are a senior editor and fact-checker.",
            prompt=f"""Review and improve this report:

            {report_draft}

            Check for:
            1. Factual accuracy and consistency
            2. Logical flow and argument strength
            3. Grammar and style
            4. Missing perspectives or gaps

            Return the improved report with tracked changes noted.""",
            model="anthropic.claude-sonnet-4-20250514"
        )

        return {
            "outline": outline,
            "sections": sections,
            "draft": report_draft,
            "final_report": final_report
        }

    def invoke_fm(self, system, prompt, model):
        """Helper to invoke FM."""
        response = bedrock_runtime.invoke_model(
            modelId=model,
            body=json.dumps({
                "anthropic_version": "bedrock-2023-05-31",
                "max_tokens": 4096,
                "system": system,
                "messages": [{"role": "user", "content": prompt}]
            })
        )
        return json.loads(response['body'].read())["content"][0]["text"]

Step Functions for Agent Pattern Orchestration:

{
  "Comment": "Agent Design Pattern: Plan -> Execute -> Validate -> Refine",
  "StartAt": "PlanPhase",
  "States": {
    "PlanPhase": {
      "Type": "Task",
      "Resource": "arn:aws:lambda:us-east-1:123:function:AgentPlanner",
      "Comment": "Agent creates execution plan",
      "ResultPath": "$.plan",
      "Next": "ExecutePhase"
    },
    "ExecutePhase": {
      "Type": "Map",
      "ItemsPath": "$.plan.steps",
      "MaxConcurrency": 3,
      "Comment": "Execute plan steps (parallel where possible)",
      "Iterator": {
        "StartAt": "ExecuteStep",
        "States": {
          "ExecuteStep": {
            "Type": "Task",
            "Resource": "arn:aws:lambda:us-east-1:123:function:AgentExecutor",
            "End": true
          }
        }
      },
      "ResultPath": "$.results",
      "Next": "ValidatePhase"
    },
    "ValidatePhase": {
      "Type": "Task",
      "Resource": "arn:aws:lambda:us-east-1:123:function:AgentValidator",
      "Comment": "Validate results meet quality criteria",
      "ResultPath": "$.validation",
      "Next": "CheckQuality"
    },
    "CheckQuality": {
      "Type": "Choice",
      "Choices": [
        {
          "Variable": "$.validation.meets_criteria",
          "BooleanEquals": true,
          "Next": "FinalOutput"
        },
        {
          "Variable": "$.validation.iteration",
          "NumericLessThan": 3,
          "Next": "RefinePhase"
        }
      ],
      "Default": "FinalOutput"
    },
    "RefinePhase": {
      "Type": "Task",
      "Resource": "arn:aws:lambda:us-east-1:123:function:AgentRefiner",
      "Comment": "Refine based on validation feedback",
      "Next": "ValidatePhase"
    },
    "FinalOutput": {
      "Type": "Task",
      "Resource": "arn:aws:lambda:us-east-1:123:function:FormatOutput",
      "End": true
    }
  }
}


Skill 2.5.6: Improve Troubleshooting Efficiency for FM Applications

User Story 26: GenAI Observability and Troubleshooting Platform

As an SRE (Site Reliability Engineer), I want comprehensive observability for our GenAI applications with automated anomaly detection, So that I can quickly diagnose prompt regressions, latency spikes, and cost anomalies.

Deep Dive Scenario

Company: AIScale - runs 20 FM-powered services, needs <5 min MTTR

Observability Architecture:

[GenAI Application]
    |
    |--- [CloudWatch Logs] (prompts, responses, errors)
    |       |
    |       v
    |   [CloudWatch Logs Insights]
    |       |--- Query prompt patterns
    |       |--- Analyze error distributions
    |       |--- Detect response quality trends
    |
    |--- [X-Ray Traces] (end-to-end request flow)
    |       |--- API Gateway -> Lambda -> Bedrock latency
    |       |--- Service map visualization
    |       |--- Error correlation across services
    |
    |--- [CloudWatch Metrics] (custom GenAI metrics)
    |       |--- Token usage (input/output)
    |       |--- Model latency (TTFT, total)
    |       |--- Error rates by model/endpoint
    |       |--- Cost per request
    |
    |--- [Amazon Q Developer]
            |--- GenAI-specific error pattern recognition
            |--- "Why is latency spiking for Claude Sonnet?"
            |--- Auto-suggests fixes based on error patterns

CloudWatch Logs Insights Queries for GenAI:

-- Query 1: Analyze prompt patterns and token usage
fields @timestamp, model_id, input_tokens, output_tokens, latency_ms
| filter @logGroup = '/genai/api/invocations'
| stats
    avg(input_tokens) as avg_input,
    avg(output_tokens) as avg_output,
    avg(latency_ms) as avg_latency,
    pct(latency_ms, 99) as p99_latency,
    count(*) as request_count
by bin(5m), model_id
| sort @timestamp desc

-- Query 2: Find failing prompts (error patterns)
fields @timestamp, error_type, model_id, prompt_template, error_message
| filter @logGroup = '/genai/api/errors'
| filter error_type = 'ValidationException' OR error_type = 'ThrottlingException'
| stats count(*) as error_count by error_type, model_id, bin(15m)
| sort error_count desc

-- Query 3: Detect prompt injection attempts
fields @timestamp, user_id, prompt_text
| filter @logGroup = '/genai/api/invocations'
| filter prompt_text like /(?i)(ignore|forget|disregard).*(?i)(instruction|prompt|system)/
    or prompt_text like /(?i)(you are now|act as|pretend)/
| stats count(*) as injection_attempts by user_id, bin(1h)
| sort injection_attempts desc

-- Query 4: Cost analysis by feature/team
fields @timestamp, team_id, feature, model_id, input_tokens, output_tokens
| filter @logGroup = '/genai/api/invocations'
| stats
    sum(input_tokens * 0.000003 + output_tokens * 0.000015) as estimated_cost,
    count(*) as request_count,
    sum(input_tokens + output_tokens) as total_tokens
by team_id, feature, bin(1d)
| sort estimated_cost desc

-- Query 5: Response quality monitoring (if quality scores are logged)
fields @timestamp, model_id, quality_score, response_length
| filter @logGroup = '/genai/api/quality'
| stats
    avg(quality_score) as avg_quality,
    min(quality_score) as min_quality,
    stddev(quality_score) as quality_variance,
    count(*) as sample_size
by model_id, bin(1h)
| filter avg_quality < 0.8  -- Alert on quality drops

X-Ray Tracing for FM API Calls:

from aws_xray_sdk.core import xray_recorder, patch_all

patch_all()  # Auto-instrument boto3, requests

def genai_handler(event, context):
    """Lambda with X-Ray tracing for FM troubleshooting."""

    # Custom subsegment for detailed FM timing
    with xray_recorder.in_subsegment("fm_invocation") as subsegment:
        start_time = time.time()

        # Annotations (indexed, searchable in X-Ray console)
        subsegment.put_annotation("model_id", "anthropic.claude-sonnet-4-20250514")
        subsegment.put_annotation("feature", "chat")
        subsegment.put_annotation("team_id", event.get("team_id", "unknown"))

        response = bedrock_runtime.invoke_model(
            modelId="anthropic.claude-sonnet-4-20250514",
            body=json.dumps(body)
        )

        result = json.loads(response['body'].read())
        total_time = (time.time() - start_time) * 1000

        # Metadata (not indexed, but visible in trace details)
        subsegment.put_metadata("input_tokens", result["usage"]["input_tokens"])
        subsegment.put_metadata("output_tokens", result["usage"]["output_tokens"])
        subsegment.put_metadata("total_latency_ms", total_time)
        subsegment.put_metadata("stop_reason", result.get("stop_reason"))

        # Extract Bedrock-specific timing from headers
        bedrock_latency = response["ResponseMetadata"]["HTTPHeaders"].get(
            "x-amzn-bedrock-invocation-latency"
        )
        subsegment.put_annotation("bedrock_latency_ms", int(bedrock_latency or 0))

        return result

Amazon Q Developer for Error Pattern Recognition:

# Example Q Developer interactions for troubleshooting:

Developer: "I'm seeing ThrottlingException errors on Bedrock Claude Sonnet in us-east-1."

Q Developer Analysis:
"Based on the error pattern:
1. Your request rate (150 RPM) approaches the default quota of 200 RPM
2. Burst pattern: 80% of requests occur between 9am-11am
3. Recommendations:
   - Request a quota increase via Service Quotas
   - Implement request queuing with SQS for non-real-time calls
   - Add cross-region failover to us-west-2
   - Consider Provisioned Throughput for predictable capacity"

Developer: "Why are my Claude Sonnet responses taking 15 seconds?"

Q Developer Analysis:
"Latency breakdown from your X-Ray traces:
- Network: 50ms
- Bedrock processing: 14,500ms (97% of total)
- Lambda processing: 450ms

Root cause: Your average output is 3,500 tokens. At ~250 tokens/sec, that's 14 seconds.
Recommendations:
- Reduce max_tokens if you don't need long responses
- Use streaming (invoke_model_with_response_stream) for better perceived latency
- Consider Claude Haiku for queries that don't need Sonnet-level reasoning
- Add a system prompt instruction to be concise"

Custom CloudWatch Dashboard for GenAI Monitoring:

# Create GenAI-specific monitoring dashboard
cloudwatch = boto3.client('cloudwatch')

dashboard_body = {
    "widgets": [
        {
            "type": "metric",
            "properties": {
                "title": "FM Invocation Latency (p50, p90, p99)",
                "metrics": [
                    ["GenAI", "InvocationLatency", "ModelId", "claude-sonnet", {"stat": "p50"}],
                    ["GenAI", "InvocationLatency", "ModelId", "claude-sonnet", {"stat": "p90"}],
                    ["GenAI", "InvocationLatency", "ModelId", "claude-sonnet", {"stat": "p99"}]
                ],
                "period": 300,
                "view": "timeSeries"
            }
        },
        {
            "type": "metric",
            "properties": {
                "title": "Token Usage (Input vs Output)",
                "metrics": [
                    ["GenAI", "InputTokens", "ModelId", "claude-sonnet", {"stat": "Sum"}],
                    ["GenAI", "OutputTokens", "ModelId", "claude-sonnet", {"stat": "Sum"}]
                ]
            }
        },
        {
            "type": "metric",
            "properties": {
                "title": "Error Rate by Type",
                "metrics": [
                    ["GenAI", "Errors", "ErrorType", "ThrottlingException", {"stat": "Sum"}],
                    ["GenAI", "Errors", "ErrorType", "ValidationException", {"stat": "Sum"}],
                    ["GenAI", "Errors", "ErrorType", "ModelTimeout", {"stat": "Sum"}]
                ]
            }
        },
        {
            "type": "metric",
            "properties": {
                "title": "Estimated Cost per Hour",
                "metrics": [
                    ["GenAI", "EstimatedCost", "ModelId", "claude-sonnet", {"stat": "Sum"}],
                    ["GenAI", "EstimatedCost", "ModelId", "claude-haiku", {"stat": "Sum"}]
                ]
            }
        },
        {
            "type": "log",
            "properties": {
                "title": "Recent Errors",
                "query": "fields @timestamp, error_type, model_id, error_message | filter @logGroup = '/genai/api/errors' | sort @timestamp desc | limit 20",
                "region": "us-east-1"
            }
        }
    ]
}

cloudwatch.put_dashboard(
    DashboardName="GenAI-Operations",
    DashboardBody=json.dumps(dashboard_body)
)

Troubleshooting Decision Tree:

GenAI Application Issue
|
|--- High Latency?
|    |--- Check X-Ray: Where is time spent?
|    |    |--- Bedrock processing: Reduce max_tokens, use streaming, use faster model
|    |    |--- Lambda cold start: Increase memory, use provisioned concurrency
|    |    |--- Network: Check VPC endpoints, NAT gateway
|    |--- Check CloudWatch Logs Insights: Token count trends
|
|--- Errors?
|    |--- ThrottlingException: Request quota increase, add backoff, cross-region failover
|    |--- ValidationException: Check prompt format, token limits, model compatibility
|    |--- ModelTimeout: Reduce complexity, lower max_tokens, check model status
|    |--- AccessDeniedException: Verify IAM policies, model access permissions
|
|--- Quality Degradation?
|    |--- Check Logs Insights: Prompt template changes recently?
|    |--- Compare response quality scores over time
|    |--- Run prompt regression tests
|    |--- Verify knowledge base data freshness
|
|--- Cost Spike?
|    |--- Check Logs Insights: Token usage by team/feature
|    |--- Identify top consumers
|    |--- Check for prompt bloat (unnecessary context in prompts)
|    |--- Verify model routing (expensive model used for simple tasks?)

Exam-Relevant Points: - CloudWatch Logs Insights: Primary tool for analyzing GenAI prompts and responses - X-Ray: Traces FM API calls across service boundaries (API GW -> Lambda -> Bedrock) - Amazon Q Developer: GenAI-specific error pattern recognition and fix suggestions - Custom CloudWatch metrics for token usage, cost, and quality scores - Prompt regression testing in CI/CD catches quality drops before production - Key GenAI-specific errors: ThrottlingException, ModelTimeout, ValidationException - X-Ray annotations are indexed and searchable; metadata is for debugging details - Streaming APIs (SSE, WebSocket) reduce perceived latency even if total time is same - Bedrock Prompt Flows for no-code workflow building by business users - Amazon Q Business indexes enterprise data sources for natural language knowledge access - Prompt chaining: sequential FM calls where each builds on the previous output - Agent patterns via Step Functions: Plan -> Execute -> Validate -> Refine loops