Module 3 of 11 · AWS SageMaker — End-to-End ML Platform · Intermediate

Data Preparation with Processing Jobs

Duration: 55 min

Processing Jobs enable large-scale data preparation without managing infrastructure. This module covers SKLearnProcessor, SparkProcessor, FrameworkProcessor, and custom processing containers for ETL workflows.

What are Processing Jobs?

Processing Jobs run data processing scripts on managed infrastructure. They scale automatically, handle distributed processing, and integrate with S3 for input/output data. Unlike training jobs, they're optimized for data transformation rather than model training.

SKLearnProcessor for Data Cleaning

from sagemaker.sklearn.processing import SKLearnProcessor
import sagemaker

session = sagemaker.Session()
role = 'arn:aws:iam::123456789012:role/SageMakerRole'

# Create SKLearnProcessor
sklearn_processor = SKLearnProcessor(
    framework_version='0.23-1',
    role=role,
    instance_type='ml.m5.xlarge',
    instance_count=1,
    sagemaker_session=session
)

# Run processing job
sklearn_processor.run(
    code='preprocessing.py',
    inputs=[
        ProcessingInput(
            source='s3://my-bucket/raw-data/',
            destination='/opt/ml/processing/input'
        )
    ],
    outputs=[
        ProcessingOutput(
            source='/opt/ml/processing/output',
            destination='s3://my-bucket/processed-data/'
        )
    ],
    arguments=['--input-data', '/opt/ml/processing/input']
)

SparkProcessor for Distributed ETL

from sagemaker.spark.processing import PySparkProcessor

spark_processor = PySparkProcessor(
    framework_version='2.4',
    role=role,
    instance_type='ml.m5.xlarge',
    instance_count=3,
    sagemaker_session=session
)

# Run Spark job
spark_processor.run(
    submit_app='spark_etl.py',
    inputs=[
        ProcessingInput(
            source='s3://my-bucket/raw-data/',
            destination='/opt/ml/processing/input'
        )
    ],
    outputs=[
        ProcessingOutput(
            source='/opt/ml/processing/output',
            destination='s3://my-bucket/processed-data/'
        )
    ]
)

FrameworkProcessor for TensorFlow/PyTorch

from sagemaker.processing import FrameworkProcessor

tf_processor = FrameworkProcessor(
    estimator_cls=TensorFlow,
    framework_version='2.8',
    role=role,
    instance_type='ml.p3.2xlarge',
    instance_count=1,
    sagemaker_session=session
)

# Run TensorFlow processing job
tf_processor.run(
    code='data_augmentation.py',
    inputs=[
        ProcessingInput(
            source='s3://my-bucket/images/',
            destination='/opt/ml/processing/input'
        )
    ],
    outputs=[
        ProcessingOutput(
            source='/opt/ml/processing/output',
            destination='s3://my-bucket/augmented-images/'
        )
    ]
)

Custom Processing Container

# Dockerfile for custom processor
FROM python:3.9

RUN pip install pandas numpy scikit-learn

COPY processing_script.py /opt/ml/code/processing_script.py

ENTRYPOINT ["python", "/opt/ml/code/processing_script.py"]

Processing Job Configuration

{
  "processing_job_config": {
    "job_name": "data-prep-job",
    "role_arn": "arn:aws:iam::123456789012:role/SageMakerRole",
    "processing_inputs": [
      {
        "input_name": "input-1",
        "s3_input": {
          "s3_uri": "s3://my-bucket/raw-data/",
          "local_path": "/opt/ml/processing/input",
          "s3_data_type": "S3Prefix",
          "s3_input_mode": "File"
        }
      }
    ],
    "processing_output_config": {
      "outputs": [
        {
          "output_name": "output-1",
          "s3_output": {
            "s3_uri": "s3://my-bucket/processed-data/",
            "local_path": "/opt/ml/processing/output",
            "s3_upload_mode": "EndOfJob"
          }
        }
      ]
    },
    "processing_resources": {
      "cluster_config": {
        "instance_count": 1,
        "instance_type": "ml.m5.xlarge",
        "volume_size_in_gb": 30
      }
    }
  }
}

Monitoring Processing Jobs

# Check processing job status
import boto3

sm_client = boto3.client('sagemaker')

response = sm_client.describe_processing_job(
    ProcessingJobName='data-prep-job-2024-01-15-12-30-45'
)

print(f"Status: {response['ProcessingJobStatus']}")
print(f"Exit code: {response['ExitCode']}")
print(f"Logs: {response['ProcessingOutputConfig']}")

Quiz 1

❓ What is the primary purpose of SageMaker Processing Jobs?

Quiz 2

❓ Which processor is best for distributed Spark jobs?

Quiz 3

❓ What does SKLearnProcessor use for data processing?

Quiz 4

❓ Where do Processing Jobs read input data from?

Quiz 5

❓ What is FrameworkProcessor used for?

← Previous Continue interactively → Next →

Related Courses