Module 10 of 11 · AWS Bedrock — Build with Foundation Models · Intermediate

Production Patterns

Duration: 55 min

Building production-grade Bedrock applications requires error handling, cost optimization, caching, monitoring, and A/B testing. This module covers real-world patterns and best practices.

Error Handling & Resilience

import boto3
import time
from botocore.exceptions import ClientError

client = boto3.client('bedrock-runtime', region_name='us-east-1')

def invoke_with_retry(model_id, body, max_retries=3):
    """Invoke model with exponential backoff retry"""
    
    for attempt in range(max_retries):
        try:
            response = client.invoke_model(
                modelId=model_id,
                body=body
            )
            return response
        
        except ClientError as e:
            error_code = e.response['Error']['Code']
            
            # Retryable errors
            if error_code == 'ThrottlingException':
                wait_time = 2 ** attempt  # Exponential backoff
                print(f'Rate limited. Waiting {wait_time}s...')
                time.sleep(wait_time)
                continue
            
            # Non-retryable errors
            elif error_code == 'AccessDeniedException':
                print('Model access not enabled')
                raise
            elif error_code == 'ValidationException':
                print('Invalid request format')
                raise
            else:
                raise
    
    raise Exception('Max retries exceeded')

Cost Optimization

import json

def estimate_cost(model_id, input_text, output_tokens=100):
    """Estimate API cost before invoking"""
    
    # Pricing per 1K tokens (example rates)
    pricing = {
        'claude-3-sonnet': {'input': 0.003, 'output': 0.015},
        'claude-3-haiku': {'input': 0.00025, 'output': 0.00125},
        'llama3-70b': {'input': 0.00195, 'output': 0.00256},
        'titan-express': {'input': 0.00013, 'output': 0.00017}
    }
    
    # Rough token count (1 token ≈ 4 chars)
    input_tokens = len(input_text) / 4
    
    # Get pricing
    model_key = [k for k in pricing.keys() if k in model_id][0]
    rates = pricing[model_key]
    
    # Calculate cost
    input_cost = (input_tokens / 1000) * rates['input']
    output_cost = (output_tokens / 1000) * rates['output']
    total_cost = input_cost + output_cost
    
    return {
        'input_tokens': int(input_tokens),
        'output_tokens': output_tokens,
        'input_cost': input_cost,
        'output_cost': output_cost,
        'total_cost': total_cost
    }

# Example
cost = estimate_cost(
    'anthropic.claude-3-sonnet-20240229-v1:0',
    'What is AWS Bedrock?',
    output_tokens=100
)
print(f"Estimated cost: ${cost['total_cost']:.6f}")

Caching Strategies

import hashlib
import json
from functools import lru_cache

# In-memory cache
response_cache = {}

def get_cached_response(model_id, prompt, temperature=0.7):
    """Get response from cache or invoke model"""
    
    # Create cache key
    cache_key = hashlib.md5(
        f"{model_id}:{prompt}:{temperature}".encode()
    ).hexdigest()
    
    # Check cache
    if cache_key in response_cache:
        print("Cache hit!")
        return response_cache[cache_key]
    
    # Invoke model
    print("Cache miss. Invoking model...")
    response = client.invoke_model(
        modelId=model_id,
        body=json.dumps({
            "anthropic_version": "bedrock-2023-06-01",
            "max_tokens": 1024,
            "temperature": temperature,
            "messages": [{"role": "user", "content": prompt}]
        })
    )
    
    result = json.loads(response['body'].read())
    
    # Store in cache
    response_cache[cache_key] = result
    
    return result

# Redis cache for distributed systems
import redis

redis_client = redis.Redis(host='localhost', port=6379, db=0)

def get_cached_response_redis(model_id, prompt, ttl=3600):
    """Cache responses in Redis"""
    
    cache_key = hashlib.md5(
        f"{model_id}:{prompt}".encode()
    ).hexdigest()
    
    # Check Redis
    cached = redis_client.get(cache_key)
    if cached:
        return json.loads(cached)
    
    # Invoke model
    response = client.invoke_model(...)
    result = json.loads(response['body'].read())
    
    # Store in Redis with TTL
    redis_client.setex(
        cache_key,
        ttl,
        json.dumps(result)
    )
    
    return result

Monitoring & Logging

import logging
import time
from datetime import datetime

# Configure logging
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)

def invoke_with_monitoring(model_id, body):
    """Invoke model with comprehensive monitoring"""
    
    start_time = time.time()
    
    try:
        logger.info(f"Invoking model: {model_id}")
        
        response = client.invoke_model(
            modelId=model_id,
            body=body
        )
        
        result = json.loads(response['body'].read())
        
        # Extract metrics
        duration = time.time() - start_time
        input_tokens = result.get('usage', {}).get('input_tokens', 0)
        output_tokens = result.get('usage', {}).get('output_tokens', 0)
        
        # Log metrics
        logger.info(
            f"Success | Duration: {duration:.2f}s | "
            f"Input: {input_tokens} | Output: {output_tokens}"
        )
        
        # Send to monitoring service (CloudWatch, DataDog, etc.)
        send_metrics({
            'model': model_id,
            'duration': duration,
            'input_tokens': input_tokens,
            'output_tokens': output_tokens,
            'status': 'success'
        })
        
        return result
    
    except Exception as e:
        duration = time.time() - start_time
        logger.error(f"Error: {str(e)} | Duration: {duration:.2f}s")
        
        send_metrics({
            'model': model_id,
            'duration': duration,
            'status': 'error',
            'error': str(e)
        })
        
        raise

def send_metrics(metrics):
    """Send metrics to monitoring service"""
    # Example: CloudWatch
    cloudwatch = boto3.client('cloudwatch')
    cloudwatch.put_metric_data(
        Namespace='BedrockApp',
        MetricData=[
            {
                'MetricName': 'ModelInvocation',
                'Value': metrics['duration'],
                'Unit': 'Seconds',
                'Dimensions': [
                    {'Name': 'Model', 'Value': metrics['model']},
                    {'Name': 'Status', 'Value': metrics['status']}
                ]
            }
        ]
    )

A/B Testing

import random

def invoke_with_ab_test(prompt, variant_a_model, variant_b_model):
    """A/B test two models"""
    
    # Randomly select variant
    variant = random.choice(['A', 'B'])
    model_id = variant_a_model if variant == 'A' else variant_b_model
    
    # Invoke model
    response = client.invoke_model(
        modelId=model_id,
        body=json.dumps({
            "anthropic_version": "bedrock-2023-06-01",
            "max_tokens": 1024,
            "messages": [{"role": "user", "content": prompt}]
        })
    )
    
    result = json.loads(response['body'].read())
    
    # Log for analysis
    logger.info(f"A/B Test | Variant: {variant} | Model: {model_id}")
    
    # Store result with variant for later analysis
    store_ab_result({
        'variant': variant,
        'model': model_id,
        'response': result,
        'timestamp': datetime.now().isoformat()
    })
    
    return result

def analyze_ab_test_results():
    """Analyze A/B test results"""
    # Query stored results
    # Calculate metrics: latency, cost, user satisfaction
    # Determine winner
    pass

Rate Limiting

from threading import Lock, Semaphore
import time

class RateLimiter:
    def __init__(self, max_requests_per_minute=100):
        self.max_requests = max_requests_per_minute
        self.requests = []
        self.lock = Lock()
    
    def wait_if_needed(self):
        """Wait if rate limit would be exceeded"""
        with self.lock:
            now = time.time()
            # Remove old requests (older than 1 minute)
            self.requests = [r for r in self.requests if now - r < 60]
            
            if len(self.requests) >= self.max_requests:
                # Wait until oldest request is 1 minute old
                wait_time = 60 - (now - self.requests[0])
                print(f"Rate limit reached. Waiting {wait_time:.1f}s...")
                time.sleep(wait_time)
                self.requests = []
            
            self.requests.append(now)

# Usage
limiter = RateLimiter(max_requests_per_minute=100)

for i in range(150):
    limiter.wait_if_needed()
    response = client.invoke_model(...)

Request Validation

def validate_bedrock_request(model_id, prompt, max_tokens=1024):
    """Validate request before invoking"""
    
    errors = []
    
    # Validate model ID
    if not model_id:
        errors.append("Model ID is required")
    
    # Validate prompt
    if not prompt or len(prompt) < 1:
        errors.append("Prompt cannot be empty")
    if len(prompt) > 100000:
        errors.append("Prompt exceeds maximum length")
    
    # Validate max_tokens
    if max_tokens < 1 or max_tokens > 4096:
        errors.append("max_tokens must be between 1 and 4096")
    
    if errors:
        raise ValueError(f"Validation errors: {', '.join(errors)}")
    
    return True

References

AWS Bedrock Documentation:

Model Documentation:

LangChain:

Best Practices:

Community:


❓ What is exponential backoff used for?

❓ What is the main benefit of caching responses?

❓ What is A/B testing used for in production?

❓ What should you do before invoking a model in production?

← Previous Continue interactively → Next →

Related Courses