Data processing and batch transformation capabilities for preprocessing, feature engineering, and batch inference.
Import paths:
# Core processors (available from sagemaker.core)
from sagemaker.core import Processor, ScriptProcessor, FrameworkProcessor, Transformer
# Spark processors (requires full module path)
from sagemaker.core.spark import PySparkProcessor, SparkJarProcessor
# Input/Output classes
from sagemaker.core.inputs import ProcessingInput, ProcessingOutput, TransformInputBase processor for running SageMaker Processing Jobs with custom containers.
class Processor:
"""
Base processor for SageMaker Processing Jobs.
Parameters:
role: str - IAM role ARN (required)
- Needs: sagemaker:CreateProcessingJob, s3:GetObject, s3:PutObject
image_uri: str - Container image URI (required)
- ECR URI: "{account}.dkr.ecr.{region}.amazonaws.com/image:tag"
- Public registry: "public.ecr.aws/..."
instance_count: int - Number of instances (default: 1)
- Range: 1-100
- Use >1 for distributed processing
instance_type: str - EC2 instance type (required)
- Processing instances: ml.m5.xlarge, ml.c5.2xlarge, etc.
volume_size_in_gb: int - EBS volume size (default: 30)
- Range: 1-16384 GB
- Must accommodate input data + processing space
volume_kms_key: Optional[str] - KMS key for volume encryption
output_kms_key: Optional[str] - KMS key for output encryption
max_runtime_in_seconds: Optional[int] - Maximum runtime (default: 86400)
- Range: 1-432000 (5 days)
base_job_name: Optional[str] - Base job name for generated names
sagemaker_session: Optional[Session] - SageMaker session
env: Optional[Dict[str, str]] - Environment variables
- Passed to container
- Maximum 512 entries
tags: Optional[List[Tag]] - Resource tags
network_config: Optional[NetworkConfig] - Network configuration
Methods:
run(inputs=None, outputs=None, arguments=None, wait=True, logs=True,
job_name=None, experiment_config=None, kms_key=None) -> None
Run processing job.
Parameters:
inputs: Optional[List[ProcessingInput]] - Input data configurations
outputs: Optional[List[ProcessingOutput]] - Output configurations
arguments: Optional[List[str]] - Command-line arguments
wait: bool - Block until job completes (default: True)
logs: bool - Show CloudWatch logs (default: True)
job_name: Optional[str] - Custom job name
experiment_config: Optional - Experiment tracking configuration
kms_key: Optional[str] - KMS key for encryption
Raises:
ValueError: Invalid configuration
ClientError: AWS API errors
RuntimeError: Processing job failures
wait() -> None
Wait for processing job to complete.
Raises:
WaiterError: If job fails or times out
stop() -> None
Stop the processing job immediately.
Raises:
ClientError: If job already completed or failed
describe() -> Dict
Get processing job details.
Returns:
Dict: Complete job description including status, inputs, outputs
Attributes:
latest_job: ProcessingJob - Most recent processing job
jobs: List[ProcessingJob] - List of all processing jobs from this processor
Notes:
- Container receives inputs at /opt/ml/processing/input/{input_name}
- Container writes outputs to /opt/ml/processing/output/{output_name}
- Arguments passed as command-line args to container entrypoint
- Environment variables available in container
- Multiple instances process data in parallel with distribution
"""Usage:
from sagemaker.core import Processor
from sagemaker.core.inputs import ProcessingInput, ProcessingOutput
# Create processor with custom container
processor = Processor(
role="arn:aws:iam::123456789012:role/SageMakerRole",
image_uri="123456789012.dkr.ecr.us-west-2.amazonaws.com/my-processor:latest",
instance_count=1,
instance_type="ml.m5.xlarge",
volume_size_in_gb=50,
env={
"LOG_LEVEL": "INFO",
"NUM_WORKERS": "4"
}
)
# Define inputs and outputs
inputs = [
ProcessingInput(
source="s3://my-bucket/raw-data",
destination="/opt/ml/processing/input/data",
input_name="raw_data"
),
ProcessingInput(
source="s3://my-bucket/config.json",
destination="/opt/ml/processing/input/config",
input_name="config"
)
]
outputs = [
ProcessingOutput(
source="/opt/ml/processing/output/processed",
destination="s3://my-bucket/processed-data",
output_name="processed_data"
),
ProcessingOutput(
source="/opt/ml/processing/output/metrics",
destination="s3://my-bucket/metrics",
output_name="metrics"
)
]
# Run processing job with error handling
try:
processor.run(
inputs=inputs,
outputs=outputs,
arguments=["--mode", "production", "--batch-size", "1000"],
wait=True,
logs=True
)
print(f"Processing completed: {processor.latest_job.processing_job_name}")
except RuntimeError as e:
print(f"Processing failed: {e}")
# Check CloudWatch logs for details
job = processor.latest_job
print(f"Job ARN: {job.processing_job_arn}")Processor for running custom Python or other scripts with pre-built containers.
class ScriptProcessor(Processor):
"""
Processor for running scripts in processing jobs.
Parameters:
role: str - IAM role ARN (required)
image_uri: str - Container image URI (required)
- Must have Python or script interpreter installed
command: List[str] - Command to run (required)
- Example: ["python3"], ["bash"], ["Rscript"]
instance_type: str - EC2 instance type (required)
instance_count: int - Number of instances (default: 1)
volume_size_in_gb: int - EBS volume size (default: 30)
volume_kms_key: Optional[str] - KMS key for volume encryption
output_kms_key: Optional[str] - KMS key for output encryption
max_runtime_in_seconds: Optional[int] - Maximum runtime (default: 86400)
base_job_name: Optional[str] - Base job name
sagemaker_session: Optional[Session] - SageMaker session
env: Optional[Dict[str, str]] - Environment variables
tags: Optional[List[Tag]] - Resource tags
network_config: Optional[NetworkConfig] - Network configuration
Methods:
run(code, inputs=None, outputs=None, arguments=None, wait=True, logs=True,
job_name=None, experiment_config=None, kms_key=None) -> None
Run processing script.
Parameters:
code: str - Script file path (required)
- Local file (uploaded automatically)
- S3 URI (must be accessible)
inputs: Optional[List[ProcessingInput]] - Input data
outputs: Optional[List[ProcessingOutput]] - Output data
arguments: Optional[List[str]] - Script arguments
wait: bool - Block until completion (default: True)
logs: bool - Show logs (default: True)
job_name: Optional[str] - Custom job name
experiment_config: Optional - Experiment tracking
kms_key: Optional[str] - KMS key for encryption
Raises:
ValueError: If code not provided or invalid
ClientError: AWS API errors
RuntimeError: Script execution errors
wait() -> None
Wait for completion.
stop() -> None
Stop the job.
Notes:
- Script uploaded to S3 and downloaded to container
- Script available at /opt/ml/processing/input/code/{script_name}
- Use command to specify interpreter
- Arguments passed after script path
"""Usage:
from sagemaker.core import ScriptProcessor
# Create Python script processor
processor = ScriptProcessor(
role="arn:aws:iam::123456789012:role/SageMakerRole",
image_uri="763104351884.dkr.ecr.us-west-2.amazonaws.com/pytorch-training:2.0-cpu-py310",
command=["python3"],
instance_type="ml.m5.xlarge",
instance_count=2, # Distributed processing
volume_size_in_gb=50
)
# Run Python script
try:
processor.run(
code="preprocess.py", # Local file
inputs=[
ProcessingInput(
source="s3://my-bucket/raw-data",
destination="/opt/ml/processing/input/data"
)
],
outputs=[
ProcessingOutput(
source="/opt/ml/processing/output/train",
destination="s3://my-bucket/processed/train"
),
ProcessingOutput(
source="/opt/ml/processing/output/val",
destination="s3://my-bucket/processed/val"
)
],
arguments=["--split-ratio", "0.8", "--random-seed", "42"],
wait=True,
logs=True
)
print("Processing completed successfully")
except RuntimeError as e:
print(f"Script error: {e}")
# Check logs for Python stack traceDistributed Processing Script:
# preprocess.py - Handles distributed processing
import os
import json
import argparse
def main():
parser = argparse.ArgumentParser()
parser.add_argument('--split-ratio', type=float, default=0.8)
parser.add_argument('--random-seed', type=int, default=42)
args = parser.parse_args()
# Distributed processing: each instance processes subset
instance_count = int(os.environ.get('SM_NUM_INSTANCES', 1))
current_instance = int(os.environ.get('SM_CURRENT_INSTANCE', 0))
# Input/output paths from environment
input_path = os.environ['SM_CHANNEL_DATA']
train_output = '/opt/ml/processing/output/train'
val_output = '/opt/ml/processing/output/val'
# Process this instance's shard
process_shard(
input_path=input_path,
train_output=train_output,
val_output=val_output,
instance_id=current_instance,
total_instances=instance_count,
split_ratio=args.split_ratio,
seed=args.random_seed
)
if __name__ == '__main__':
main()Processor optimized for ML framework code with automatic dependency packaging.
class FrameworkProcessor(Processor):
"""
Processor for ML framework code with dependency packaging.
Parameters:
framework_version: str - Framework version (required)
- Example: "2.0" for PyTorch
role: str - IAM role ARN (required)
instance_type: str - EC2 instance type (required)
instance_count: int - Number of instances (default: 1)
volume_size_in_gb: int - EBS volume size (default: 30)
volume_kms_key: Optional[str] - KMS key for volume encryption
output_kms_key: Optional[str] - KMS key for output encryption
max_runtime_in_seconds: Optional[int] - Maximum runtime (default: 86400)
base_job_name: Optional[str] - Base job name
sagemaker_session: Optional[Session] - SageMaker session
env: Optional[Dict[str, str]] - Environment variables
tags: Optional[List[Tag]] - Resource tags
network_config: Optional[NetworkConfig] - Network configuration
Methods:
run(code, source_dir=None, dependencies=None, inputs=None, outputs=None,
arguments=None, wait=True, logs=True, job_name=None, experiment_config=None) -> None
Run with framework.
Parameters:
code: str - Entry script filename (required)
source_dir: Optional[str] - Directory containing code
- Entire directory uploaded and available in container
dependencies: Optional[Union[str, List[str]]] - Dependencies
- requirements.txt path or list of packages
inputs: Optional[List[ProcessingInput]] - Input data
outputs: Optional[List[ProcessingOutput]] - Output data
arguments: Optional[List[str]] - Script arguments
wait: bool - Block until completion (default: True)
logs: bool - Show logs (default: True)
job_name: Optional[str] - Custom job name
experiment_config: Optional - Experiment tracking
Raises:
ValueError: Invalid configuration
ClientError: AWS API errors
wait() -> None
Wait for completion.
stop() -> None
Stop the job.
Notes:
- Automatically packages source_dir contents
- Installs dependencies from requirements.txt or list
- Framework-specific optimizations
- Use for processing requiring ML libraries
"""Usage:
from sagemaker.core import FrameworkProcessor
# Create PyTorch framework processor
processor = FrameworkProcessor(
framework_version="2.0",
role="arn:aws:iam::123456789012:role/SageMakerRole",
instance_type="ml.m5.xlarge",
instance_count=2,
env={"PYTHONUNBUFFERED": "1"}
)
# Run with automatic dependency packaging
processor.run(
code="process.py",
source_dir="./processing", # All files uploaded
dependencies=["requirements.txt"], # Or list: ["pandas==2.0.0", "scikit-learn==1.3.0"]
inputs=[
ProcessingInput(
source="s3://my-bucket/data",
destination="/opt/ml/processing/input"
)
],
outputs=[
ProcessingOutput(
source="/opt/ml/processing/output",
destination="s3://my-bucket/output"
)
],
arguments=["--config", "config.json"],
wait=True
)Processor for running PySpark applications on SageMaker.
class PySparkProcessor(ScriptProcessor):
"""
Processor for PySpark jobs on SageMaker.
Parameters:
role: str - IAM role ARN (required)
instance_type: Union[str, PipelineVariable] - EC2 instance type (required)
instance_count: Union[int, PipelineVariable] - Number of instances (default: 1)
framework_version: Optional[str] - SageMaker PySpark version (default: latest)
py_version: Optional[str] - Python version (default: "py39")
container_version: Optional[str] - Spark container version
image_uri: Optional[Union[str, PipelineVariable]] - Container image URI
volume_size_in_gb: Union[int, PipelineVariable] - EBS volume size (default: 30)
volume_kms_key: Optional[Union[str, PipelineVariable]] - KMS key for volume
output_kms_key: Optional[Union[str, PipelineVariable]] - KMS key for output
configuration_location: Optional[str] - S3 prefix for EMR configuration
dependency_location: Optional[str] - S3 prefix for Spark dependencies (.jar, .py files)
max_runtime_in_seconds: Optional[Union[int, PipelineVariable]] - Maximum runtime
base_job_name: Optional[str] - Base job name
sagemaker_session: Optional[Session] - SageMaker session
env: Optional[Dict[str, Union[str, PipelineVariable]]] - Environment variables
tags: Optional[Tags] - Resource tags
network_config: Optional[NetworkConfig] - Network configuration
Methods:
run(submit_app, submit_py_files=None, submit_jars=None, submit_files=None,
inputs=None, outputs=None, arguments=None, wait=True, logs=True,
job_name=None, experiment_config=None, configuration=None,
spark_event_logs_s3_uri=None, kms_key=None) -> None
Run PySpark job.
Parameters:
submit_app: str - Main PySpark script (required)
submit_py_files: Optional[List[str]] - Additional Python files
- Local paths or S3 URIs
- Available in Spark driver/executors
submit_jars: Optional[List[str]] - JAR dependencies
submit_files: Optional[List[str]] - Additional files (configs, data)
inputs: Optional[List[ProcessingInput]] - Input data
outputs: Optional[List[ProcessingOutput]] - Output data
arguments: Optional[List[str]] - Spark application arguments
wait: bool - Block until completion (default: True)
logs: bool - Show logs (default: True)
job_name: Optional[str] - Custom job name
experiment_config: Optional - Experiment tracking
configuration: Optional[List[Dict]] - Spark configuration
- Format: [{"Classification": "spark-defaults", "Properties": {...}}]
spark_event_logs_s3_uri: Optional[str] - S3 URI for Spark event logs
kms_key: Optional[str] - KMS key
Raises:
ValueError: Invalid Spark configuration
ClientError: AWS API errors
get_run_args(...) -> Dict
Get normalized run arguments for validation.
Returns:
Dict: Normalized arguments
start_history_server(spark_event_logs_s3_uri=None) -> None
Start Spark history server for UI access.
Parameters:
spark_event_logs_s3_uri: Optional[str] - Event logs S3 URI
Returns:
None (prints history server URL)
terminate_history_server() -> None
Terminate Spark history server.
wait() -> None
Wait for completion.
stop() -> None
Stop the job.
Notes:
- Spark driver runs on one instance
- Executors distributed across all instances
- Use configuration for Spark tuning (memory, cores, etc.)
- Event logs enable Spark UI for debugging
- History server provides web UI for completed jobs
"""Usage:
from sagemaker.core.spark import PySparkProcessor
from sagemaker.core.processing import ProcessingInput, ProcessingOutput
# Create PySpark processor
processor = PySparkProcessor(
role="arn:aws:iam::123456789012:role/SageMakerRole",
instance_type="ml.m5.xlarge",
instance_count=4, # 1 driver + 3 executors
framework_version="3.3",
py_version="py39"
)
# Run PySpark job with full configuration
processor.run(
submit_app="preprocess.py", # Main PySpark script
submit_py_files=[
"utils.py", # Additional Python modules
"s3://my-bucket/shared/helpers.py"
],
submit_jars=[
"s3://my-bucket/jars/custom-lib.jar"
],
submit_files=[
"config.properties"
],
inputs=[
ProcessingInput(
source="s3://my-bucket/raw-data",
destination="/opt/ml/processing/input",
s3_data_distribution_type="ShardedByS3Key" # Distribute across instances
)
],
outputs=[
ProcessingOutput(
source="/opt/ml/processing/output",
destination="s3://my-bucket/processed-data",
s3_upload_mode="Continuous" # Upload during processing
)
],
arguments=["--input-format", "parquet", "--output-format", "parquet"],
configuration=[
{
"Classification": "spark-defaults",
"Properties": {
"spark.executor.memory": "4g",
"spark.executor.cores": "2",
"spark.driver.memory": "4g",
"spark.sql.shuffle.partitions": "200"
}
}
],
spark_event_logs_s3_uri="s3://my-bucket/spark-logs/",
wait=True,
logs=True
)
# Start history server to view Spark UI
processor.start_history_server()
# Access UI and analyze job performance
# When done:
processor.terminate_history_server()PySpark Script Example:
# preprocess.py
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, when, lit
import argparse
import sys
def main():
parser = argparse.ArgumentParser()
parser.add_argument('--input-format', default='csv')
parser.add_argument('--output-format', default='parquet')
args = parser.parse_args()
# Create Spark session
spark = SparkSession.builder \
.appName("DataPreprocessing") \
.getOrCreate()
# Read data
input_path = "/opt/ml/processing/input"
df = spark.read.format(args.input_format).load(input_path)
# Process data
df_processed = df \
.filter(col("value").isNotNull()) \
.withColumn("normalized", col("value") / 100.0) \
.withColumn("category", when(col("score") > 0.5, "high").otherwise("low"))
# Write output
output_path = "/opt/ml/processing/output"
df_processed.write.format(args.output_format).mode("overwrite").save(output_path)
spark.stop()
if __name__ == '__main__':
main()Processor for running Java or Scala Spark applications on SageMaker.
class SparkJarProcessor(ScriptProcessor):
"""
Processor for Spark jobs using Java/Scala JAR files.
Parameters:
role: str - IAM role ARN (required)
instance_type: Union[str, PipelineVariable] - EC2 instance type (required)
instance_count: Union[int, PipelineVariable] - Number of instances (default: 1)
framework_version: Optional[str] - SageMaker Spark version
py_version: Optional[str] - Python version (for PySpark dependencies)
container_version: Optional[str] - Spark container version
image_uri: Optional[Union[str, PipelineVariable]] - Container image URI
volume_size_in_gb: Union[int, PipelineVariable] - EBS volume size (default: 30)
volume_kms_key: Optional[Union[str, PipelineVariable]] - KMS key
output_kms_key: Optional[Union[str, PipelineVariable]] - KMS key for output
configuration_location: Optional[str] - S3 prefix for EMR configuration
dependency_location: Optional[str] - S3 prefix for Spark dependencies
max_runtime_in_seconds: Optional[Union[int, PipelineVariable]] - Maximum runtime
base_job_name: Optional[str] - Base job name
sagemaker_session: Optional[Session] - SageMaker session
env: Optional[Dict[str, Union[str, PipelineVariable]]] - Environment variables
tags: Optional[Tags] - Resource tags
network_config: Optional[NetworkConfig] - Network configuration
Methods:
run(submit_app, submit_class=None, submit_jars=None, submit_files=None,
inputs=None, outputs=None, arguments=None, wait=True, logs=True,
job_name=None, experiment_config=None, configuration=None,
spark_event_logs_s3_uri=None, kms_key=None) -> None
Run Spark JAR job.
Parameters:
submit_app: str - Main JAR file path (required)
- S3 URI: "s3://bucket/app.jar"
- Local file: "./target/app.jar"
submit_class: Optional[str] - Main class name
- Example: "com.example.spark.DataProcessor"
- Required for JAR without manifest
submit_jars: Optional[List[str]] - Dependency JARs
submit_files: Optional[List[str]] - Additional files
inputs: Optional[List[ProcessingInput]] - Input data
outputs: Optional[List[ProcessingOutput]] - Output data
arguments: Optional[List[str]] - Application arguments
wait: bool - Block until completion (default: True)
logs: bool - Show logs (default: True)
job_name: Optional[str] - Custom job name
experiment_config: Optional - Experiment tracking
configuration: Optional[List[Dict]] - Spark configuration
spark_event_logs_s3_uri: Optional[str] - Event logs S3 URI
kms_key: Optional[str] - KMS key
Raises:
ValueError: Invalid JAR or configuration
ClientError: AWS API errors
get_run_args(...) -> Dict
Get normalized run arguments.
start_history_server(spark_event_logs_s3_uri=None) -> None
Start Spark history server.
terminate_history_server() -> None
Terminate history server.
Notes:
- Use for production Spark applications in Java/Scala
- Better performance than PySpark for compute-intensive tasks
- Requires compiled JAR file
- Submit class needed if JAR has no manifest
"""Usage:
from sagemaker.core.spark import SparkJarProcessor
from sagemaker.core.processing import ProcessingInput, ProcessingOutput
# Create SparkJar processor
processor = SparkJarProcessor(
role="arn:aws:iam::123456789012:role/SageMakerRole",
instance_type="ml.m5.2xlarge",
instance_count=5, # 1 driver + 4 executors
framework_version="3.3"
)
# Run Spark JAR job
processor.run(
submit_app="s3://my-bucket/jars/data-processor-1.0.0.jar",
submit_class="com.mycompany.spark.DataProcessor",
submit_jars=[
"s3://my-bucket/jars/spark-avro_2.12-3.3.0.jar",
"s3://my-bucket/jars/delta-core_2.12-2.4.0.jar"
],
submit_files=["application.conf"],
inputs=[
ProcessingInput(
source="s3://my-bucket/input-data",
destination="/opt/ml/processing/input"
)
],
outputs=[
ProcessingOutput(
source="/opt/ml/processing/output",
destination="s3://my-bucket/output-data"
)
],
arguments=["--mode", "production", "--date", "2024-01-15"],
configuration=[
{
"Classification": "spark-defaults",
"Properties": {
"spark.executor.memory": "8g",
"spark.driver.memory": "4g",
"spark.executor.cores": "4",
"spark.dynamicAllocation.enabled": "false",
"spark.sql.adaptive.enabled": "true"
}
},
{
"Classification": "spark-env",
"Properties": {},
"Configurations": [{
"Classification": "export",
"Properties": {
"JAVA_HOME": "/usr/lib/jvm/java-11-amazon-corretto"
}
}]
}
],
spark_event_logs_s3_uri="s3://my-bucket/spark-history/",
wait=True
)Handles batch transform jobs for offline inference on large datasets.
class Transformer:
"""
Batch transform for offline inference.
Parameters:
model_name: str - SageMaker Model name (required)
instance_count: int - Number of instances (required)
- Range: 1-100
instance_type: str - EC2 instance type (required)
strategy: Optional[str] - Transform strategy
- "MultiRecord": Batch multiple records per request (default)
- "SingleRecord": One record per request
assemble_with: Optional[str] - Assembly strategy
- "None": No assembly
- "Line": Assemble line-delimited records
output_path: str - S3 output path (required)
output_kms_key: Optional[str] - KMS key for output encryption
accept: Optional[str] - Accept type for output
max_concurrent_transforms: Optional[int] - Maximum concurrent transforms (default: 1)
- Range: 1-100
- Concurrent requests per instance
max_payload: Optional[int] - Maximum payload size in MB (default: 6)
- Range: 1-100 MB
tags: Optional[List[Tag]] - Resource tags
env: Optional[Dict[str, str]] - Environment variables
base_transform_job_name: Optional[str] - Base job name
sagemaker_session: Optional[Session] - SageMaker session
volume_kms_key: Optional[str] - KMS key for volume encryption
Methods:
transform(data, data_type="S3Prefix", content_type=None, compression_type=None,
split_type=None, job_name=None, input_filter=None, output_filter=None,
join_source=None, model_client_config=None, batch_data_capture_config=None,
wait=True, logs=True) -> None
Run batch transform job.
Parameters:
data: str - S3 URI for input data (required)
data_type: str - "S3Prefix" or "ManifestFile" (default: "S3Prefix")
content_type: Optional[str] - Input content type
compression_type: Optional[str] - "None" or "Gzip"
split_type: Optional[str] - Split type
- "None": Entire file per request
- "Line": One line per request
- "RecordIO": RecordIO records
- "TFRecord": TensorFlow records
job_name: Optional[str] - Custom job name
input_filter: Optional[str] - JSONPath to filter input
output_filter: Optional[str] - JSONPath to filter output
join_source: Optional[str] - Join output with input
- "None": Output only
- "Input": Join with input records
model_client_config: Optional[Dict] - Model client config
batch_data_capture_config: Optional[Dict] - Data capture config
wait: bool - Block until completion (default: True)
logs: bool - Show logs (default: True)
Raises:
ValueError: Invalid configuration
ClientError: AWS API errors
wait() -> None
Wait for transform job to complete.
stop() -> None
Stop the transform job.
describe() -> Dict
Get transform job details.
Returns:
Dict: Complete job description
Attributes:
latest_transform_job: TransformJob - Most recent transform job
output_path: str - S3 output path
Notes:
- More cost-effective than real-time endpoints for batch
- Processes large datasets offline
- Results written to S3
- Use MultiRecord strategy for higher throughput
- max_concurrent_transforms * max_payload determines throughput
- Input/output filtering reduces data transfer
"""Usage:
from sagemaker.core import Transformer
# Create transformer from model
transformer = Transformer(
model_name="my-model",
instance_count=4,
instance_type="ml.m5.xlarge",
output_path="s3://my-bucket/transform-output",
strategy="MultiRecord",
max_concurrent_transforms=8, # 8 concurrent requests per instance
max_payload=6, # 6 MB per request
accept="application/json"
)
# Run batch transform
try:
transformer.transform(
data="s3://my-bucket/batch-input",
content_type="application/json",
split_type="Line", # One JSON object per line
join_source="Input", # Include input with output
input_filter="$.features", # Extract only features from input
output_filter="$.predictions[0]", # Extract first prediction
wait=True,
logs=True
)
print(f"Transform completed: {transformer.output_path}")
print(f"Job: {transformer.latest_transform_job.transform_job_name}")
except RuntimeError as e:
print(f"Transform failed: {e}")Advanced Transform with Filtering:
# Input file (input.jsonl):
# {"id": 123, "features": [1.0, 2.0, 3.0], "metadata": {...}}
# {"id": 456, "features": [4.0, 5.0, 6.0], "metadata": {...}}
# Transform with filtering
transformer.transform(
data="s3://bucket/input.jsonl",
content_type="application/jsonlines",
split_type="Line",
input_filter="$.features", # Send only features to model
output_filter="$.score", # Extract only score from model output
join_source="Input" # Join with original input
)
# Output file (input.jsonl.out):
# {"id": 123, "features": [1.0, 2.0, 3.0], "metadata": {...}, "score": 0.85}
# {"id": 456, "features": [4.0, 5.0, 6.0], "metadata": {...}, "score": 0.92}class ProcessingInput:
"""
Input data configuration for processing jobs.
Fields:
source: str - S3 URI or local path (required)
destination: str - Container path (required)
- Example: "/opt/ml/processing/input"
input_name: Optional[str] - Input name
s3_data_type: Optional[str] - S3 data type (default: "S3Prefix")
- "S3Prefix": All objects under prefix
- "ManifestFile": Objects listed in manifest
s3_input_mode: Optional[str] - Input mode (default: "File")
- "File": Download before processing
- "Pipe": Stream during processing
s3_data_distribution_type: Optional[str] - Distribution type (default: "FullyReplicated")
- "FullyReplicated": Copy to all instances
- "ShardedByS3Key": Distribute across instances
s3_compression_type: Optional[str] - Compression (default: "None")
- "None" or "Gzip"
Notes:
- ShardedByS3Key for distributed processing of large datasets
- Pipe mode reduces startup time
- Gzip automatically decompressed
- Each instance mounts data at destination path
"""class ProcessingOutput:
"""
Output data configuration for processing jobs.
Fields:
source: str - Container path (required)
- Example: "/opt/ml/processing/output"
destination: str - S3 URI for output (required)
output_name: Optional[str] - Output name
s3_upload_mode: Optional[str] - Upload mode (default: "EndOfJob")
- "EndOfJob": Upload after job completes
- "Continuous": Upload during job execution
app_managed: Optional[bool] - Application-managed output (default: False)
- True: Application writes directly to S3
- False: SageMaker uploads from local path
feature_store_output: Optional[FeatureStoreOutput] - Feature Store output config
Notes:
- Continuous upload reduces final upload time
- Use app_managed for large outputs (write directly to S3)
- Multiple outputs can have different destinations
"""class TransformInput:
"""
Input data configuration for batch transform.
Fields:
data: str - S3 URI for input data (required)
data_type: str - Data type (default: "S3Prefix")
- "S3Prefix" or "ManifestFile"
content_type: Optional[str] - Content type
compression_type: Optional[str] - Compression (default: "None")
- "None" or "Gzip"
split_type: Optional[str] - Split type (default: "None")
- "None": Entire file per request
- "Line": One line per request
- "RecordIO": RecordIO splits
- "TFRecord": TensorFlow record splits
Notes:
- Split type determines request granularity
- Line split most common for JSONL/CSV
- None split for single large files
"""class TransformOutput:
"""
Output data configuration for batch transform.
Fields:
s3_output_path: str - S3 URI for output (required)
accept: Optional[str] - Accept type for output
assemble_with: Optional[str] - Assembly (default: "None")
- "None": One output file per input file
- "Line": Concatenate outputs line-by-line
kms_key_id: Optional[str] - KMS key for encryption
Notes:
- Output files mirror input structure by default
- Line assembly creates single output file
- Encryption applied to all output objects
"""from sagemaker.core import Processor, ProcessingInput, ProcessingOutput
# Multi-instance processing with data sharding
processor = Processor(
role=role,
image_uri="my-processing-image:latest",
instance_count=8, # 8 parallel instances
instance_type="ml.m5.4xlarge"
)
# Shard data across instances
inputs = [
ProcessingInput(
source="s3://my-bucket/large-dataset", # 100 GB dataset
destination="/opt/ml/processing/input",
s3_data_distribution_type="ShardedByS3Key" # Each instance gets subset
)
]
outputs = [
ProcessingOutput(
source="/opt/ml/processing/output",
destination="s3://my-bucket/processed",
s3_upload_mode="Continuous" # Upload during processing
)
]
# Each instance processes ~12.5 GB
processor.run(inputs=inputs, outputs=outputs)from sagemaker.core import Processor
# Use spot instances for cost savings
processor = Processor(
role=role,
image_uri="my-image:latest",
instance_type="ml.m5.xlarge",
instance_count=4,
use_spot_instances=True,
max_wait_time_in_seconds=7200, # 2 hours total
max_runtime_in_seconds=3600 # 1 hour actual processing
)
# Up to 70% cost savings with spot
# May be interrupted - implement checkpointing if needed
processor.run(inputs=inputs, outputs=outputs)from sagemaker.core.inputs import ProcessingOutput, FeatureStoreOutput
# Write processing output directly to Feature Store
outputs = [
ProcessingOutput(
output_name="features",
source="/opt/ml/processing/output/features.csv",
feature_store_output=FeatureStoreOutput(
feature_group_name="customer-features"
),
app_managed=True # Application writes directly
)
]
processor.run(
code="compute_features.py",
inputs=inputs,
outputs=outputs
)# Configure model behavior for transform
model_client_config = {
"InvocationsTimeoutInSeconds": 600, # 10 minutes per request
"InvocationsMaxRetries": 3
}
transformer.transform(
data="s3://bucket/input",
content_type="application/json",
model_client_config=model_client_config
)Container Disk Full:
OOM Error in Processing:
Shard Distribution Failure:
Transform Timeout:
Spark Driver OOM:
File Not Found in Container: