SageMaker Pipelines
Duration: 60 min
SageMaker Pipelines enable MLOps by orchestrating end-to-end ML workflows. This module covers pipeline steps, conditions, model registry, and automation for production ML systems.
What are SageMaker Pipelines?
Pipelines automate ML workflows by defining steps for data processing, training, evaluation, and deployment. They integrate with the model registry for version control and enable continuous ML workflows.
Creating a Basic Pipeline
from sagemaker.workflow.pipeline import Pipeline
from sagemaker.workflow.steps import ProcessingStep, TrainingStep
from sagemaker.processing import ScriptProcessor
from sagemaker.estimator import Estimator
import sagemaker
session = sagemaker.Session()
role = 'arn:aws:iam::123456789012:role/SageMakerRole'
bucket = session.default_bucket()
# Define processing step
processor = ScriptProcessor(
role=role,
instance_type='ml.m5.xlarge',
instance_count=1,
framework_version='0.23-1',
sagemaker_session=session
)
processing_step = ProcessingStep(
name='ProcessingStep',
processor=processor,
code='preprocessing.py',
inputs=[
ProcessingInput(
source=f's3://{bucket}/raw-data/',
destination='/opt/ml/processing/input'
)
],
outputs=[
ProcessingOutput(
source='/opt/ml/processing/output',
destination=f's3://{bucket}/processed-data/'
)
]
)
# Define training step
estimator = Estimator(
image_uri='382416733822.dkr.ecr.us-east-1.amazonaws.com/sagemaker-xgboost:latest',
role=role,
instance_count=1,
instance_type='ml.m5.xlarge',
output_path=f's3://{bucket}/training-output',
sagemaker_session=session
)
training_step = TrainingStep(
name='TrainingStep',
estimator=estimator,
inputs={'training': f's3://{bucket}/processed-data/'}
)
# Create pipeline
pipeline = Pipeline(
name='ml-pipeline',
parameters=[],
steps=[processing_step, training_step]
)
# Execute pipeline
pipeline.upsert(role_arn=role)
pipeline.start()Pipeline Parameters
from sagemaker.workflow.parameters import ParameterString, ParameterInteger
# Define parameters
instance_type = ParameterString(
name='InstanceType',
default_value='ml.m5.xlarge'
)
epochs = ParameterInteger(
name='Epochs',
default_value=10
)
# Use parameters in steps
estimator = Estimator(
image_uri='382416733822.dkr.ecr.us-east-1.amazonaws.com/sagemaker-xgboost:latest',
role=role,
instance_count=1,
instance_type=instance_type,
output_path=f's3://{bucket}/training-output',
sagemaker_session=session
)
estimator.set_hyperparameters(epochs=epochs)Conditional Steps
from sagemaker.workflow.conditions import ConditionGreaterThan
from sagemaker.workflow.steps import ConditionStep
# Create condition
condition = ConditionGreaterThan(
left=training_step.properties.FinalMetricDataList[0].Value,
right=0.8
)
# Create conditional step
conditional_step = ConditionStep(
name='ConditionalDeploymentStep',
conditions=[condition],
if_steps=[deployment_step],
else_steps=[]
)Model Registry Integration
from sagemaker.model_monitor import DataCaptureConfig
from sagemaker.model import Model
# Create model
model = Model(
image_uri='382416733822.dkr.ecr.us-east-1.amazonaws.com/sagemaker-xgboost:latest',
model_data=training_step.properties.ModelArtifacts.S3ModelArtifacts,
role=role,
sagemaker_session=session
)
# Register model
from sagemaker.model_registry import ModelPackageGroup
model_package_group = ModelPackageGroup(
name='xgboost-models',
model_package_group_description='XGBoost models',
sagemaker_session=session
)
model_package = model.register(
model_package_group_name='xgboost-models',
content_types=['text/csv'],
response_types=['text/csv'],
inference_instances=['ml.m5.large'],
transform_instances=['ml.m5.large']
)Pipeline Configuration
{
"pipeline_config": {
"pipeline_name": "ml-pipeline",
"pipeline_definition": {
"Version": "2020-12-01",
"Metadata": {},
"Parameters": [
{
"Name": "InstanceType",
"Type": "String",
"DefaultValue": "ml.m5.xlarge"
}
],
"Steps": [
{
"Name": "ProcessingStep",
"Type": "Task",
"Resource": "arn:aws:states:::sagemaker:createProcessingJob.sync"
},
{
"Name": "TrainingStep",
"Type": "Task",
"Resource": "arn:aws:states:::sagemaker:createTrainingJob.sync"
}
]
}
}
}Monitoring Pipeline Execution
# Get pipeline execution details
execution = pipeline.start()
execution_arn = execution.arn
# Check execution status
import boto3
sm_client = boto3.client('sagemaker')
response = sm_client.describe_pipeline_execution(
PipelineExecutionArn=execution_arn
)
print(f"Status: {response['PipelineExecutionStatus']}")
print(f"Start time: {response['CreationTime']}")Quiz 1
❓ What is the primary purpose of SageMaker Pipelines?
Quiz 2
❓ What are pipeline parameters used for?
Quiz 3
❓ What do conditional steps enable?
Quiz 4
❓ What is the model registry used for?
Quiz 5
❓ What enables MLOps in SageMaker?