Module 9 of 11 · AWS SageMaker — End-to-End ML Platform · Intermediate

SageMaker Feature Store

Duration: 55 min

Feature Store centralizes feature management for ML workflows. This module covers feature groups, online/offline stores, feature ingestion, and retrieval for training and inference.

What is Feature Store?

Feature Store is a centralized repository for ML features. It manages feature definitions, stores features in online and offline stores, and provides APIs for feature retrieval during training and inference.

Creating Feature Groups

from sagemaker.feature_store.feature_group import FeatureGroup
import sagemaker
import pandas as pd

session = sagemaker.Session()
role = 'arn:aws:iam::123456789012:role/SageMakerRole'
bucket = session.default_bucket()

# Create sample data
df = pd.DataFrame({
    'customer_id': [1, 2, 3],
    'age': [25, 30, 35],
    'income': [50000, 60000, 70000],
    'event_time': pd.date_range('2024-01-01', periods=3)
})

# Create feature group
feature_group = FeatureGroup(
    name='customer-features',
    sagemaker_session=session
)

# Load data
feature_group.load_feature_definitions(data_frame=df)

# Create feature group
feature_group.create(
    s3_uri=f's3://{bucket}/feature-store/',
    record_identifier_name='customer_id',
    event_time_feature_name='event_time',
    role_arn=role,
    enable_online_store=True
)

# Ingest data
feature_group.ingest(df, max_workers=3, wait=True)

Online and Offline Stores

from sagemaker.feature_store.feature_group import FeatureGroup

# Create feature group with both stores
feature_group = FeatureGroup(
    name='transaction-features',
    sagemaker_session=session
)

feature_group.create(
    s3_uri=f's3://{bucket}/feature-store/',
    record_identifier_name='transaction_id',
    event_time_feature_name='timestamp',
    role_arn=role,
    enable_online_store=True,  # For real-time inference
    online_store_kms_key_id=None,
    offline_store_kms_key_id=None
)

# Query online store for inference
import boto3

fs_client = boto3.client('sagemaker-featurestore-runtime')

response = fs_client.get_record(
    FeatureGroupName='transaction-features',
    RecordIdentifierValueAsString='txn-123'
)

print(f"Record: {response['Record']}")

Feature Ingestion

import pandas as pd
from sagemaker.feature_store.feature_group import FeatureGroup

# Load data from S3
df = pd.read_csv('s3://my-bucket/raw-features.csv')

# Create feature group
feature_group = FeatureGroup(
    name='product-features',
    sagemaker_session=session
)

# Ingest data
feature_group.ingest(
    data_frame=df,
    max_workers=5,
    wait=True
)

print(f"Ingested {len(df)} records")

Querying Offline Store

import boto3
import pandas as pd

athena = boto3.client('athena')

# Query offline store using Athena
query = """
SELECT * FROM "sagemaker_featurestore"."customer_features_1234567890"
WHERE event_time >= '2024-01-01'
LIMIT 100
"""

response = athena.start_query_execution(
    QueryString=query,
    QueryExecutionContext={'Database': 'sagemaker_featurestore'},
    ResultConfiguration={'OutputLocation': f's3://{bucket}/athena-results/'}
)

# Get results
query_id = response['QueryExecutionId']
results = athena.get_query_results(QueryExecutionId=query_id)

df = pd.DataFrame(results['ResultSet']['Rows'])
print(df)

Feature Store Configuration

{
  "feature_group_config": {
    "feature_group_name": "customer-features",
    "record_identifier_name": "customer_id",
    "event_time_feature_name": "event_time",
    "feature_definitions": [
      {
        "feature_name": "age",
        "feature_type": "Integral"
      },
      {
        "feature_name": "income",
        "feature_type": "Integral"
      },
      {
        "feature_name": "credit_score",
        "feature_type": "Integral"
      }
    ],
    "online_store_config": {
      "enable_online_store": true
    },
    "offline_store_config": {
      "s3_storage_config": {
        "s3_uri": "s3://my-bucket/feature-store/"
      }
    }
  }
}

Using Features in Training

from sagemaker.feature_store.feature_group import FeatureGroup
import pandas as pd

# Get feature group
feature_group = FeatureGroup(
    name='customer-features',
    sagemaker_session=session
)

# Query offline store for training data
query = feature_group.athena_query()

query.run(
    query_string=f'SELECT * FROM "{query.table_name}"',
    output_location=f's3://{bucket}/query-results/'
)

# Load results
df = query.as_dataframe()

# Use for training
from sagemaker.estimator import Estimator

estimator = Estimator(
    image_uri='382416733822.dkr.ecr.us-east-1.amazonaws.com/sagemaker-xgboost:latest',
    role=role,
    instance_count=1,
    instance_type='ml.m5.xlarge',
    output_path=f's3://{bucket}/training-output',
    sagemaker_session=session
)

estimator.fit({'training': f's3://{bucket}/training-data/'})

Quiz 1

❓ What is the primary purpose of Feature Store?

Quiz 2

❓ What is the online store used for?

Quiz 3

❓ What is the offline store used for?

Quiz 4

❓ How do you query the offline store?

Quiz 5

❓ What is feature ingestion?

← Previous Continue interactively → Next →

Related Courses