SageMaker Feature Store
Duration: 55 min
Feature Store centralizes feature management for ML workflows. This module covers feature groups, online/offline stores, feature ingestion, and retrieval for training and inference.
What is Feature Store?
Feature Store is a centralized repository for ML features. It manages feature definitions, stores features in online and offline stores, and provides APIs for feature retrieval during training and inference.
Creating Feature Groups
from sagemaker.feature_store.feature_group import FeatureGroup
import sagemaker
import pandas as pd
session = sagemaker.Session()
role = 'arn:aws:iam::123456789012:role/SageMakerRole'
bucket = session.default_bucket()
# Create sample data
df = pd.DataFrame({
'customer_id': [1, 2, 3],
'age': [25, 30, 35],
'income': [50000, 60000, 70000],
'event_time': pd.date_range('2024-01-01', periods=3)
})
# Create feature group
feature_group = FeatureGroup(
name='customer-features',
sagemaker_session=session
)
# Load data
feature_group.load_feature_definitions(data_frame=df)
# Create feature group
feature_group.create(
s3_uri=f's3://{bucket}/feature-store/',
record_identifier_name='customer_id',
event_time_feature_name='event_time',
role_arn=role,
enable_online_store=True
)
# Ingest data
feature_group.ingest(df, max_workers=3, wait=True)Online and Offline Stores
from sagemaker.feature_store.feature_group import FeatureGroup
# Create feature group with both stores
feature_group = FeatureGroup(
name='transaction-features',
sagemaker_session=session
)
feature_group.create(
s3_uri=f's3://{bucket}/feature-store/',
record_identifier_name='transaction_id',
event_time_feature_name='timestamp',
role_arn=role,
enable_online_store=True, # For real-time inference
online_store_kms_key_id=None,
offline_store_kms_key_id=None
)
# Query online store for inference
import boto3
fs_client = boto3.client('sagemaker-featurestore-runtime')
response = fs_client.get_record(
FeatureGroupName='transaction-features',
RecordIdentifierValueAsString='txn-123'
)
print(f"Record: {response['Record']}")Feature Ingestion
import pandas as pd
from sagemaker.feature_store.feature_group import FeatureGroup
# Load data from S3
df = pd.read_csv('s3://my-bucket/raw-features.csv')
# Create feature group
feature_group = FeatureGroup(
name='product-features',
sagemaker_session=session
)
# Ingest data
feature_group.ingest(
data_frame=df,
max_workers=5,
wait=True
)
print(f"Ingested {len(df)} records")Querying Offline Store
import boto3
import pandas as pd
athena = boto3.client('athena')
# Query offline store using Athena
query = """
SELECT * FROM "sagemaker_featurestore"."customer_features_1234567890"
WHERE event_time >= '2024-01-01'
LIMIT 100
"""
response = athena.start_query_execution(
QueryString=query,
QueryExecutionContext={'Database': 'sagemaker_featurestore'},
ResultConfiguration={'OutputLocation': f's3://{bucket}/athena-results/'}
)
# Get results
query_id = response['QueryExecutionId']
results = athena.get_query_results(QueryExecutionId=query_id)
df = pd.DataFrame(results['ResultSet']['Rows'])
print(df)Feature Store Configuration
{
"feature_group_config": {
"feature_group_name": "customer-features",
"record_identifier_name": "customer_id",
"event_time_feature_name": "event_time",
"feature_definitions": [
{
"feature_name": "age",
"feature_type": "Integral"
},
{
"feature_name": "income",
"feature_type": "Integral"
},
{
"feature_name": "credit_score",
"feature_type": "Integral"
}
],
"online_store_config": {
"enable_online_store": true
},
"offline_store_config": {
"s3_storage_config": {
"s3_uri": "s3://my-bucket/feature-store/"
}
}
}
}Using Features in Training
from sagemaker.feature_store.feature_group import FeatureGroup
import pandas as pd
# Get feature group
feature_group = FeatureGroup(
name='customer-features',
sagemaker_session=session
)
# Query offline store for training data
query = feature_group.athena_query()
query.run(
query_string=f'SELECT * FROM "{query.table_name}"',
output_location=f's3://{bucket}/query-results/'
)
# Load results
df = query.as_dataframe()
# Use for training
from sagemaker.estimator import Estimator
estimator = Estimator(
image_uri='382416733822.dkr.ecr.us-east-1.amazonaws.com/sagemaker-xgboost:latest',
role=role,
instance_count=1,
instance_type='ml.m5.xlarge',
output_path=f's3://{bucket}/training-output',
sagemaker_session=session
)
estimator.fit({'training': f's3://{bucket}/training-data/'})Quiz 1
❓ What is the primary purpose of Feature Store?
Quiz 2
❓ What is the online store used for?
Quiz 3
❓ What is the offline store used for?
Quiz 4
❓ How do you query the offline store?
Quiz 5
❓ What is feature ingestion?