Module 8 of 11 · AWS SageMaker — End-to-End ML Platform · Intermediate

SageMaker Pipelines

Duration: 60 min

SageMaker Pipelines enable MLOps by orchestrating end-to-end ML workflows. This module covers pipeline steps, conditions, model registry, and automation for production ML systems.

What are SageMaker Pipelines?

Pipelines automate ML workflows by defining steps for data processing, training, evaluation, and deployment. They integrate with the model registry for version control and enable continuous ML workflows.

Creating a Basic Pipeline

from sagemaker.workflow.pipeline import Pipeline
from sagemaker.workflow.steps import ProcessingStep, TrainingStep
from sagemaker.processing import ScriptProcessor
from sagemaker.estimator import Estimator
import sagemaker

session = sagemaker.Session()
role = 'arn:aws:iam::123456789012:role/SageMakerRole'
bucket = session.default_bucket()

# Define processing step
processor = ScriptProcessor(
    role=role,
    instance_type='ml.m5.xlarge',
    instance_count=1,
    framework_version='0.23-1',
    sagemaker_session=session
)

processing_step = ProcessingStep(
    name='ProcessingStep',
    processor=processor,
    code='preprocessing.py',
    inputs=[
        ProcessingInput(
            source=f's3://{bucket}/raw-data/',
            destination='/opt/ml/processing/input'
        )
    ],
    outputs=[
        ProcessingOutput(
            source='/opt/ml/processing/output',
            destination=f's3://{bucket}/processed-data/'
        )
    ]
)

# Define training step
estimator = Estimator(
    image_uri='382416733822.dkr.ecr.us-east-1.amazonaws.com/sagemaker-xgboost:latest',
    role=role,
    instance_count=1,
    instance_type='ml.m5.xlarge',
    output_path=f's3://{bucket}/training-output',
    sagemaker_session=session
)

training_step = TrainingStep(
    name='TrainingStep',
    estimator=estimator,
    inputs={'training': f's3://{bucket}/processed-data/'}
)

# Create pipeline
pipeline = Pipeline(
    name='ml-pipeline',
    parameters=[],
    steps=[processing_step, training_step]
)

# Execute pipeline
pipeline.upsert(role_arn=role)
pipeline.start()

Pipeline Parameters

from sagemaker.workflow.parameters import ParameterString, ParameterInteger

# Define parameters
instance_type = ParameterString(
    name='InstanceType',
    default_value='ml.m5.xlarge'
)

epochs = ParameterInteger(
    name='Epochs',
    default_value=10
)

# Use parameters in steps
estimator = Estimator(
    image_uri='382416733822.dkr.ecr.us-east-1.amazonaws.com/sagemaker-xgboost:latest',
    role=role,
    instance_count=1,
    instance_type=instance_type,
    output_path=f's3://{bucket}/training-output',
    sagemaker_session=session
)

estimator.set_hyperparameters(epochs=epochs)

Conditional Steps

from sagemaker.workflow.conditions import ConditionGreaterThan
from sagemaker.workflow.steps import ConditionStep

# Create condition
condition = ConditionGreaterThan(
    left=training_step.properties.FinalMetricDataList[0].Value,
    right=0.8
)

# Create conditional step
conditional_step = ConditionStep(
    name='ConditionalDeploymentStep',
    conditions=[condition],
    if_steps=[deployment_step],
    else_steps=[]
)

Model Registry Integration

from sagemaker.model_monitor import DataCaptureConfig
from sagemaker.model import Model

# Create model
model = Model(
    image_uri='382416733822.dkr.ecr.us-east-1.amazonaws.com/sagemaker-xgboost:latest',
    model_data=training_step.properties.ModelArtifacts.S3ModelArtifacts,
    role=role,
    sagemaker_session=session
)

# Register model
from sagemaker.model_registry import ModelPackageGroup

model_package_group = ModelPackageGroup(
    name='xgboost-models',
    model_package_group_description='XGBoost models',
    sagemaker_session=session
)

model_package = model.register(
    model_package_group_name='xgboost-models',
    content_types=['text/csv'],
    response_types=['text/csv'],
    inference_instances=['ml.m5.large'],
    transform_instances=['ml.m5.large']
)

Pipeline Configuration

{
  "pipeline_config": {
    "pipeline_name": "ml-pipeline",
    "pipeline_definition": {
      "Version": "2020-12-01",
      "Metadata": {},
      "Parameters": [
        {
          "Name": "InstanceType",
          "Type": "String",
          "DefaultValue": "ml.m5.xlarge"
        }
      ],
      "Steps": [
        {
          "Name": "ProcessingStep",
          "Type": "Task",
          "Resource": "arn:aws:states:::sagemaker:createProcessingJob.sync"
        },
        {
          "Name": "TrainingStep",
          "Type": "Task",
          "Resource": "arn:aws:states:::sagemaker:createTrainingJob.sync"
        }
      ]
    }
  }
}

Monitoring Pipeline Execution

# Get pipeline execution details
execution = pipeline.start()
execution_arn = execution.arn

# Check execution status
import boto3

sm_client = boto3.client('sagemaker')
response = sm_client.describe_pipeline_execution(
    PipelineExecutionArn=execution_arn
)

print(f"Status: {response['PipelineExecutionStatus']}")
print(f"Start time: {response['CreationTime']}")

Quiz 1

❓ What is the primary purpose of SageMaker Pipelines?

Quiz 2

❓ What are pipeline parameters used for?

Quiz 3

❓ What do conditional steps enable?

Quiz 4

❓ What is the model registry used for?

Quiz 5

❓ What enables MLOps in SageMaker?

← Previous Continue interactively → Next →

Related Courses