Module 10 of 11 · AWS SageMaker — End-to-End ML Platform · Intermediate

Monitoring & Cost Optimization

Duration: 50 min

Production ML systems require continuous monitoring and cost management. This module covers Model Monitor for data drift detection, endpoint auto-scaling, and cost optimization strategies.

Model Monitor for Data Drift

Model Monitor detects data drift and model performance degradation. It compares production data against a baseline to identify distribution shifts.

from sagemaker.model_monitor import DataQualityMonitor, DataCaptureConfig
import sagemaker

session = sagemaker.Session()
role = 'arn:aws:iam::123456789012:role/SageMakerRole'
bucket = session.default_bucket()

# Enable data capture on endpoint
data_capture_config = DataCaptureConfig(
    enabled=True,
    sampling_percentage=100,
    destination_s3_uri=f's3://{bucket}/data-capture/'
)

# Deploy endpoint with data capture
predictor = estimator.deploy(
    initial_instance_count=1,
    instance_type='ml.m5.large',
    data_capture_config=data_capture_config,
    endpoint_name='monitored-endpoint'
)

# Create baseline
monitor = DataQualityMonitor(
    role=role,
    instance_count=1,
    instance_type='ml.m5.xlarge',
    sagemaker_session=session
)

# Create baseline from training data
monitor.suggest_baseline(
    baseline_dataset=f's3://{bucket}/train-data/data.csv',
    dataset_format='text/csv'
)

# Schedule monitoring
monitor.create_monitoring_schedule(
    monitor_schedule_name='data-quality-monitor',
    endpoint_input=f's3://{bucket}/data-capture/',
    output_s3_uri=f's3://{bucket}/monitoring-output/',
    statistics=monitor.baseline_statistics(),
    constraints=monitor.baseline_constraints(),
    schedule_expression='cron(0 * * * ? *)'  # Hourly
)

Detecting Data Drift

import boto3

sm_client = boto3.client('sagemaker')

# Get monitoring execution details
response = sm_client.list_monitoring_executions(
    MonitoringScheduleName='data-quality-monitor'
)

for execution in response['MonitoringExecutionSummaries']:
    print(f"Execution: {execution['MonitoringExecutionArn']}")
    print(f"Status: {execution['MonitoringExecutionStatus']}")
    
    # Get violations
    violations = sm_client.get_monitoring_schedule(
        MonitoringScheduleName='data-quality-monitor'
    )

Endpoint Auto-Scaling

import boto3

autoscaling = boto3.client('application-autoscaling')

# Register endpoint for auto-scaling
autoscaling.register_scalable_target(
    ServiceNamespace='sagemaker',
    ResourceId='endpoint/my-endpoint/variant/AllTraffic',
    ScalableDimension='sagemaker:variant:DesiredInstanceCount',
    MinCapacity=1,
    MaxCapacity=10
)

# Create scaling policy
autoscaling.put_scaling_policy(
    PolicyName='endpoint-scaling-policy',
    ServiceNamespace='sagemaker',
    ResourceId='endpoint/my-endpoint/variant/AllTraffic',
    ScalableDimension='sagemaker:variant:DesiredInstanceCount',
    PolicyType='TargetTrackingScaling',
    TargetTrackingScalingPolicyConfiguration={
        'TargetValue': 70.0,
        'PredefinedMetricSpecification': {
            'PredefinedMetricType': 'SageMakerVariantInvocationsPerInstance'
        },
        'ScaleOutCooldown': 300,
        'ScaleInCooldown': 300
    }
)

Cost Optimization Strategies

from sagemaker.estimator import Estimator

# Use spot instances for training
estimator = Estimator(
    image_uri='382416733822.dkr.ecr.us-east-1.amazonaws.com/sagemaker-xgboost:latest',
    role=role,
    instance_count=1,
    instance_type='ml.m5.xlarge',
    output_path=f's3://{bucket}/training-output',
    sagemaker_session=session,
    use_spot_instances=True,
    max_run=3600,
    max_wait=5400
)

# Use serverless endpoints for variable traffic
from sagemaker.serverless import ServerlessInferenceConfig

serverless_config = ServerlessInferenceConfig(
    memory_size_in_mb=1024,
    max_concurrency=10
)

predictor = estimator.deploy(
    serverless_inference_config=serverless_config,
    endpoint_name='cost-optimized-endpoint'
)

# Use multi-model endpoints
from sagemaker.multidatamodel import MultiDataModel

multi_model = MultiDataModel(
    name='multi-model-endpoint',
    model_data_prefix=f's3://{bucket}/models/',
    model_name='xgboost-multi',
    container_uri='382416733822.dkr.ecr.us-east-1.amazonaws.com/sagemaker-xgboost:latest',
    role=role,
    sagemaker_session=session
)

predictor = multi_model.deploy(
    initial_instance_count=1,
    instance_type='ml.m5.large'
)

Monitoring Configuration

{
  "monitoring_config": {
    "monitoring_schedule_name": "data-quality-monitor",
    "monitoring_job_definition": {
      "baseline_config": {
        "baselining_job_name": "baseline-job"
      },
      "monitoring_inputs": [
        {
          "endpoint_input": {
            "endpoint_name": "my-endpoint",
            "local_path": "/opt/ml/processing/input",
            "s3_input_mode": "File",
            "s3_data_distribution_type": "FullyReplicated"
          }
        }
      ],
      "monitoring_output_config": {
        "monitoring_outputs": [
          {
            "s3_output": {
              "s3_uri": "s3://my-bucket/monitoring-output/",
              "local_path": "/opt/ml/processing/output",
              "s3_upload_mode": "EndOfJob"
            }
          }
        ]
      },
      "monitoring_resources": {
        "cluster_config": {
          "instance_count": 1,
          "instance_type": "ml.m5.xlarge",
          "volume_size_in_gb": 30
        }
      }
    },
    "schedule_expression": "cron(0 * * * ? *)"
  }
}

CloudWatch Metrics

import boto3

cloudwatch = boto3.client('cloudwatch')

# Get endpoint invocation metrics
response = cloudwatch.get_metric_statistics(
    Namespace='AWS/SageMaker',
    MetricName='InvocationsPerInstance',
    Dimensions=[
        {
            'Name': 'EndpointName',
            'Value': 'my-endpoint'
        },
        {
            'Name': 'VariantName',
            'Value': 'AllTraffic'
        }
    ],
    StartTime='2024-01-01T00:00:00Z',
    EndTime='2024-01-02T00:00:00Z',
    Period=3600,
    Statistics=['Average', 'Sum']
)

print(f"Metrics: {response['Datapoints']}")

References

AWS SageMaker Documentation

Key Services

Best Practices

Cost Optimization Tips

  1. Use serverless endpoints for unpredictable traffic
  2. Implement auto-scaling for consistent performance
  3. Use spot training instances
  4. Consolidate models on multi-model endpoints
  5. Monitor and clean up unused resources
  6. Use reserved instances for predictable workloads

Quiz 1

❓ What does Model Monitor detect?

Quiz 2

❓ What is data capture used for?

Quiz 3

❓ What does endpoint auto-scaling do?

Quiz 4

❓ Which strategy saves the most on training costs?

Quiz 5

❓ When should you use serverless endpoints?

← Previous Continue interactively → Next →

Related Courses