or run

tessl search
Log in

Version

Files

docs

ai-registry.mdclarify.mddata-io.mddebugger.mdevaluation.mdexperiments.mdexplainer-config.mdindex.mdjumpstart.mdlineage.mdmlops.mdmonitoring.mdprocessing.mdremote-functions.mdresources.mds3-utilities.mdserving.mdtraining.mdworkflow-primitives.md
tile.json

processing.mddocs/

Processing & Transform

Data processing and batch transformation capabilities for preprocessing, feature engineering, and batch inference.

Package Information

Import paths:

# Core processors (available from sagemaker.core)
from sagemaker.core import Processor, ScriptProcessor, FrameworkProcessor, Transformer

# Spark processors (requires full module path)
from sagemaker.core.spark import PySparkProcessor, SparkJarProcessor

# Input/Output classes
from sagemaker.core.inputs import ProcessingInput, ProcessingOutput, TransformInput

Capabilities

Processor

Base processor for running SageMaker Processing Jobs with custom containers.

class Processor:
    """
    Base processor for SageMaker Processing Jobs.

    Parameters:
        role: str - IAM role ARN (required)
            - Needs: sagemaker:CreateProcessingJob, s3:GetObject, s3:PutObject
        
        image_uri: str - Container image URI (required)
            - ECR URI: "{account}.dkr.ecr.{region}.amazonaws.com/image:tag"
            - Public registry: "public.ecr.aws/..."
        
        instance_count: int - Number of instances (default: 1)
            - Range: 1-100
            - Use >1 for distributed processing
        
        instance_type: str - EC2 instance type (required)
            - Processing instances: ml.m5.xlarge, ml.c5.2xlarge, etc.
        
        volume_size_in_gb: int - EBS volume size (default: 30)
            - Range: 1-16384 GB
            - Must accommodate input data + processing space
        
        volume_kms_key: Optional[str] - KMS key for volume encryption
        output_kms_key: Optional[str] - KMS key for output encryption
        
        max_runtime_in_seconds: Optional[int] - Maximum runtime (default: 86400)
            - Range: 1-432000 (5 days)
        
        base_job_name: Optional[str] - Base job name for generated names
        sagemaker_session: Optional[Session] - SageMaker session
        
        env: Optional[Dict[str, str]] - Environment variables
            - Passed to container
            - Maximum 512 entries
        
        tags: Optional[List[Tag]] - Resource tags
        network_config: Optional[NetworkConfig] - Network configuration

    Methods:
        run(inputs=None, outputs=None, arguments=None, wait=True, logs=True, 
            job_name=None, experiment_config=None, kms_key=None) -> None
            Run processing job.
            
            Parameters:
                inputs: Optional[List[ProcessingInput]] - Input data configurations
                outputs: Optional[List[ProcessingOutput]] - Output configurations
                arguments: Optional[List[str]] - Command-line arguments
                wait: bool - Block until job completes (default: True)
                logs: bool - Show CloudWatch logs (default: True)
                job_name: Optional[str] - Custom job name
                experiment_config: Optional - Experiment tracking configuration
                kms_key: Optional[str] - KMS key for encryption
            
            Raises:
                ValueError: Invalid configuration
                ClientError: AWS API errors
                RuntimeError: Processing job failures
        
        wait() -> None
            Wait for processing job to complete.
            
            Raises:
                WaiterError: If job fails or times out
        
        stop() -> None
            Stop the processing job immediately.
            
            Raises:
                ClientError: If job already completed or failed
        
        describe() -> Dict
            Get processing job details.
            
            Returns:
                Dict: Complete job description including status, inputs, outputs

    Attributes:
        latest_job: ProcessingJob - Most recent processing job
        jobs: List[ProcessingJob] - List of all processing jobs from this processor
    
    Notes:
        - Container receives inputs at /opt/ml/processing/input/{input_name}
        - Container writes outputs to /opt/ml/processing/output/{output_name}
        - Arguments passed as command-line args to container entrypoint
        - Environment variables available in container
        - Multiple instances process data in parallel with distribution
    """

Usage:

from sagemaker.core import Processor
from sagemaker.core.inputs import ProcessingInput, ProcessingOutput

# Create processor with custom container
processor = Processor(
    role="arn:aws:iam::123456789012:role/SageMakerRole",
    image_uri="123456789012.dkr.ecr.us-west-2.amazonaws.com/my-processor:latest",
    instance_count=1,
    instance_type="ml.m5.xlarge",
    volume_size_in_gb=50,
    env={
        "LOG_LEVEL": "INFO",
        "NUM_WORKERS": "4"
    }
)

# Define inputs and outputs
inputs = [
    ProcessingInput(
        source="s3://my-bucket/raw-data",
        destination="/opt/ml/processing/input/data",
        input_name="raw_data"
    ),
    ProcessingInput(
        source="s3://my-bucket/config.json",
        destination="/opt/ml/processing/input/config",
        input_name="config"
    )
]

outputs = [
    ProcessingOutput(
        source="/opt/ml/processing/output/processed",
        destination="s3://my-bucket/processed-data",
        output_name="processed_data"
    ),
    ProcessingOutput(
        source="/opt/ml/processing/output/metrics",
        destination="s3://my-bucket/metrics",
        output_name="metrics"
    )
]

# Run processing job with error handling
try:
    processor.run(
        inputs=inputs,
        outputs=outputs,
        arguments=["--mode", "production", "--batch-size", "1000"],
        wait=True,
        logs=True
    )
    
    print(f"Processing completed: {processor.latest_job.processing_job_name}")
    
except RuntimeError as e:
    print(f"Processing failed: {e}")
    # Check CloudWatch logs for details
    job = processor.latest_job
    print(f"Job ARN: {job.processing_job_arn}")

ScriptProcessor

Processor for running custom Python or other scripts with pre-built containers.

class ScriptProcessor(Processor):
    """
    Processor for running scripts in processing jobs.

    Parameters:
        role: str - IAM role ARN (required)
        image_uri: str - Container image URI (required)
            - Must have Python or script interpreter installed
        command: List[str] - Command to run (required)
            - Example: ["python3"], ["bash"], ["Rscript"]
        instance_type: str - EC2 instance type (required)
        instance_count: int - Number of instances (default: 1)
        volume_size_in_gb: int - EBS volume size (default: 30)
        volume_kms_key: Optional[str] - KMS key for volume encryption
        output_kms_key: Optional[str] - KMS key for output encryption
        max_runtime_in_seconds: Optional[int] - Maximum runtime (default: 86400)
        base_job_name: Optional[str] - Base job name
        sagemaker_session: Optional[Session] - SageMaker session
        env: Optional[Dict[str, str]] - Environment variables
        tags: Optional[List[Tag]] - Resource tags
        network_config: Optional[NetworkConfig] - Network configuration

    Methods:
        run(code, inputs=None, outputs=None, arguments=None, wait=True, logs=True, 
            job_name=None, experiment_config=None, kms_key=None) -> None
            Run processing script.
            
            Parameters:
                code: str - Script file path (required)
                    - Local file (uploaded automatically)
                    - S3 URI (must be accessible)
                inputs: Optional[List[ProcessingInput]] - Input data
                outputs: Optional[List[ProcessingOutput]] - Output data
                arguments: Optional[List[str]] - Script arguments
                wait: bool - Block until completion (default: True)
                logs: bool - Show logs (default: True)
                job_name: Optional[str] - Custom job name
                experiment_config: Optional - Experiment tracking
                kms_key: Optional[str] - KMS key for encryption
            
            Raises:
                ValueError: If code not provided or invalid
                ClientError: AWS API errors
                RuntimeError: Script execution errors
        
        wait() -> None
            Wait for completion.
        
        stop() -> None
            Stop the job.

    Notes:
        - Script uploaded to S3 and downloaded to container
        - Script available at /opt/ml/processing/input/code/{script_name}
        - Use command to specify interpreter
        - Arguments passed after script path
    """

Usage:

from sagemaker.core import ScriptProcessor

# Create Python script processor
processor = ScriptProcessor(
    role="arn:aws:iam::123456789012:role/SageMakerRole",
    image_uri="763104351884.dkr.ecr.us-west-2.amazonaws.com/pytorch-training:2.0-cpu-py310",
    command=["python3"],
    instance_type="ml.m5.xlarge",
    instance_count=2,  # Distributed processing
    volume_size_in_gb=50
)

# Run Python script
try:
    processor.run(
        code="preprocess.py",  # Local file
        inputs=[
            ProcessingInput(
                source="s3://my-bucket/raw-data",
                destination="/opt/ml/processing/input/data"
            )
        ],
        outputs=[
            ProcessingOutput(
                source="/opt/ml/processing/output/train",
                destination="s3://my-bucket/processed/train"
            ),
            ProcessingOutput(
                source="/opt/ml/processing/output/val",
                destination="s3://my-bucket/processed/val"
            )
        ],
        arguments=["--split-ratio", "0.8", "--random-seed", "42"],
        wait=True,
        logs=True
    )
    
    print("Processing completed successfully")
    
except RuntimeError as e:
    print(f"Script error: {e}")
    # Check logs for Python stack trace

Distributed Processing Script:

# preprocess.py - Handles distributed processing
import os
import json
import argparse

def main():
    parser = argparse.ArgumentParser()
    parser.add_argument('--split-ratio', type=float, default=0.8)
    parser.add_argument('--random-seed', type=int, default=42)
    args = parser.parse_args()
    
    # Distributed processing: each instance processes subset
    instance_count = int(os.environ.get('SM_NUM_INSTANCES', 1))
    current_instance = int(os.environ.get('SM_CURRENT_INSTANCE', 0))
    
    # Input/output paths from environment
    input_path = os.environ['SM_CHANNEL_DATA']
    train_output = '/opt/ml/processing/output/train'
    val_output = '/opt/ml/processing/output/val'
    
    # Process this instance's shard
    process_shard(
        input_path=input_path,
        train_output=train_output,
        val_output=val_output,
        instance_id=current_instance,
        total_instances=instance_count,
        split_ratio=args.split_ratio,
        seed=args.random_seed
    )

if __name__ == '__main__':
    main()

FrameworkProcessor

Processor optimized for ML framework code with automatic dependency packaging.

class FrameworkProcessor(Processor):
    """
    Processor for ML framework code with dependency packaging.

    Parameters:
        framework_version: str - Framework version (required)
            - Example: "2.0" for PyTorch
        role: str - IAM role ARN (required)
        instance_type: str - EC2 instance type (required)
        instance_count: int - Number of instances (default: 1)
        volume_size_in_gb: int - EBS volume size (default: 30)
        volume_kms_key: Optional[str] - KMS key for volume encryption
        output_kms_key: Optional[str] - KMS key for output encryption
        max_runtime_in_seconds: Optional[int] - Maximum runtime (default: 86400)
        base_job_name: Optional[str] - Base job name
        sagemaker_session: Optional[Session] - SageMaker session
        env: Optional[Dict[str, str]] - Environment variables
        tags: Optional[List[Tag]] - Resource tags
        network_config: Optional[NetworkConfig] - Network configuration

    Methods:
        run(code, source_dir=None, dependencies=None, inputs=None, outputs=None, 
            arguments=None, wait=True, logs=True, job_name=None, experiment_config=None) -> None
            Run with framework.
            
            Parameters:
                code: str - Entry script filename (required)
                source_dir: Optional[str] - Directory containing code
                    - Entire directory uploaded and available in container
                dependencies: Optional[Union[str, List[str]]] - Dependencies
                    - requirements.txt path or list of packages
                inputs: Optional[List[ProcessingInput]] - Input data
                outputs: Optional[List[ProcessingOutput]] - Output data
                arguments: Optional[List[str]] - Script arguments
                wait: bool - Block until completion (default: True)
                logs: bool - Show logs (default: True)
                job_name: Optional[str] - Custom job name
                experiment_config: Optional - Experiment tracking
            
            Raises:
                ValueError: Invalid configuration
                ClientError: AWS API errors
        
        wait() -> None
            Wait for completion.
        
        stop() -> None
            Stop the job.

    Notes:
        - Automatically packages source_dir contents
        - Installs dependencies from requirements.txt or list
        - Framework-specific optimizations
        - Use for processing requiring ML libraries
    """

Usage:

from sagemaker.core import FrameworkProcessor

# Create PyTorch framework processor
processor = FrameworkProcessor(
    framework_version="2.0",
    role="arn:aws:iam::123456789012:role/SageMakerRole",
    instance_type="ml.m5.xlarge",
    instance_count=2,
    env={"PYTHONUNBUFFERED": "1"}
)

# Run with automatic dependency packaging
processor.run(
    code="process.py",
    source_dir="./processing",  # All files uploaded
    dependencies=["requirements.txt"],  # Or list: ["pandas==2.0.0", "scikit-learn==1.3.0"]
    inputs=[
        ProcessingInput(
            source="s3://my-bucket/data",
            destination="/opt/ml/processing/input"
        )
    ],
    outputs=[
        ProcessingOutput(
            source="/opt/ml/processing/output",
            destination="s3://my-bucket/output"
        )
    ],
    arguments=["--config", "config.json"],
    wait=True
)

PySparkProcessor

Processor for running PySpark applications on SageMaker.

class PySparkProcessor(ScriptProcessor):
    """
    Processor for PySpark jobs on SageMaker.

    Parameters:
        role: str - IAM role ARN (required)
        instance_type: Union[str, PipelineVariable] - EC2 instance type (required)
        instance_count: Union[int, PipelineVariable] - Number of instances (default: 1)
        framework_version: Optional[str] - SageMaker PySpark version (default: latest)
        py_version: Optional[str] - Python version (default: "py39")
        container_version: Optional[str] - Spark container version
        image_uri: Optional[Union[str, PipelineVariable]] - Container image URI
        volume_size_in_gb: Union[int, PipelineVariable] - EBS volume size (default: 30)
        volume_kms_key: Optional[Union[str, PipelineVariable]] - KMS key for volume
        output_kms_key: Optional[Union[str, PipelineVariable]] - KMS key for output
        configuration_location: Optional[str] - S3 prefix for EMR configuration
        dependency_location: Optional[str] - S3 prefix for Spark dependencies (.jar, .py files)
        max_runtime_in_seconds: Optional[Union[int, PipelineVariable]] - Maximum runtime
        base_job_name: Optional[str] - Base job name
        sagemaker_session: Optional[Session] - SageMaker session
        env: Optional[Dict[str, Union[str, PipelineVariable]]] - Environment variables
        tags: Optional[Tags] - Resource tags
        network_config: Optional[NetworkConfig] - Network configuration

    Methods:
        run(submit_app, submit_py_files=None, submit_jars=None, submit_files=None, 
            inputs=None, outputs=None, arguments=None, wait=True, logs=True, 
            job_name=None, experiment_config=None, configuration=None, 
            spark_event_logs_s3_uri=None, kms_key=None) -> None
            Run PySpark job.
            
            Parameters:
                submit_app: str - Main PySpark script (required)
                submit_py_files: Optional[List[str]] - Additional Python files
                    - Local paths or S3 URIs
                    - Available in Spark driver/executors
                submit_jars: Optional[List[str]] - JAR dependencies
                submit_files: Optional[List[str]] - Additional files (configs, data)
                inputs: Optional[List[ProcessingInput]] - Input data
                outputs: Optional[List[ProcessingOutput]] - Output data
                arguments: Optional[List[str]] - Spark application arguments
                wait: bool - Block until completion (default: True)
                logs: bool - Show logs (default: True)
                job_name: Optional[str] - Custom job name
                experiment_config: Optional - Experiment tracking
                configuration: Optional[List[Dict]] - Spark configuration
                    - Format: [{"Classification": "spark-defaults", "Properties": {...}}]
                spark_event_logs_s3_uri: Optional[str] - S3 URI for Spark event logs
                kms_key: Optional[str] - KMS key
            
            Raises:
                ValueError: Invalid Spark configuration
                ClientError: AWS API errors
        
        get_run_args(...) -> Dict
            Get normalized run arguments for validation.
            
            Returns:
                Dict: Normalized arguments
        
        start_history_server(spark_event_logs_s3_uri=None) -> None
            Start Spark history server for UI access.
            
            Parameters:
                spark_event_logs_s3_uri: Optional[str] - Event logs S3 URI
            
            Returns:
                None (prints history server URL)
        
        terminate_history_server() -> None
            Terminate Spark history server.
        
        wait() -> None
            Wait for completion.
        
        stop() -> None
            Stop the job.

    Notes:
        - Spark driver runs on one instance
        - Executors distributed across all instances
        - Use configuration for Spark tuning (memory, cores, etc.)
        - Event logs enable Spark UI for debugging
        - History server provides web UI for completed jobs
    """

Usage:

from sagemaker.core.spark import PySparkProcessor
from sagemaker.core.processing import ProcessingInput, ProcessingOutput

# Create PySpark processor
processor = PySparkProcessor(
    role="arn:aws:iam::123456789012:role/SageMakerRole",
    instance_type="ml.m5.xlarge",
    instance_count=4,  # 1 driver + 3 executors
    framework_version="3.3",
    py_version="py39"
)

# Run PySpark job with full configuration
processor.run(
    submit_app="preprocess.py",  # Main PySpark script
    submit_py_files=[
        "utils.py",  # Additional Python modules
        "s3://my-bucket/shared/helpers.py"
    ],
    submit_jars=[
        "s3://my-bucket/jars/custom-lib.jar"
    ],
    submit_files=[
        "config.properties"
    ],
    inputs=[
        ProcessingInput(
            source="s3://my-bucket/raw-data",
            destination="/opt/ml/processing/input",
            s3_data_distribution_type="ShardedByS3Key"  # Distribute across instances
        )
    ],
    outputs=[
        ProcessingOutput(
            source="/opt/ml/processing/output",
            destination="s3://my-bucket/processed-data",
            s3_upload_mode="Continuous"  # Upload during processing
        )
    ],
    arguments=["--input-format", "parquet", "--output-format", "parquet"],
    configuration=[
        {
            "Classification": "spark-defaults",
            "Properties": {
                "spark.executor.memory": "4g",
                "spark.executor.cores": "2",
                "spark.driver.memory": "4g",
                "spark.sql.shuffle.partitions": "200"
            }
        }
    ],
    spark_event_logs_s3_uri="s3://my-bucket/spark-logs/",
    wait=True,
    logs=True
)

# Start history server to view Spark UI
processor.start_history_server()
# Access UI and analyze job performance
# When done:
processor.terminate_history_server()

PySpark Script Example:

# preprocess.py
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, when, lit
import argparse
import sys

def main():
    parser = argparse.ArgumentParser()
    parser.add_argument('--input-format', default='csv')
    parser.add_argument('--output-format', default='parquet')
    args = parser.parse_args()
    
    # Create Spark session
    spark = SparkSession.builder \
        .appName("DataPreprocessing") \
        .getOrCreate()
    
    # Read data
    input_path = "/opt/ml/processing/input"
    df = spark.read.format(args.input_format).load(input_path)
    
    # Process data
    df_processed = df \
        .filter(col("value").isNotNull()) \
        .withColumn("normalized", col("value") / 100.0) \
        .withColumn("category", when(col("score") > 0.5, "high").otherwise("low"))
    
    # Write output
    output_path = "/opt/ml/processing/output"
    df_processed.write.format(args.output_format).mode("overwrite").save(output_path)
    
    spark.stop()

if __name__ == '__main__':
    main()

SparkJarProcessor

Processor for running Java or Scala Spark applications on SageMaker.

class SparkJarProcessor(ScriptProcessor):
    """
    Processor for Spark jobs using Java/Scala JAR files.

    Parameters:
        role: str - IAM role ARN (required)
        instance_type: Union[str, PipelineVariable] - EC2 instance type (required)
        instance_count: Union[int, PipelineVariable] - Number of instances (default: 1)
        framework_version: Optional[str] - SageMaker Spark version
        py_version: Optional[str] - Python version (for PySpark dependencies)
        container_version: Optional[str] - Spark container version
        image_uri: Optional[Union[str, PipelineVariable]] - Container image URI
        volume_size_in_gb: Union[int, PipelineVariable] - EBS volume size (default: 30)
        volume_kms_key: Optional[Union[str, PipelineVariable]] - KMS key
        output_kms_key: Optional[Union[str, PipelineVariable]] - KMS key for output
        configuration_location: Optional[str] - S3 prefix for EMR configuration
        dependency_location: Optional[str] - S3 prefix for Spark dependencies
        max_runtime_in_seconds: Optional[Union[int, PipelineVariable]] - Maximum runtime
        base_job_name: Optional[str] - Base job name
        sagemaker_session: Optional[Session] - SageMaker session
        env: Optional[Dict[str, Union[str, PipelineVariable]]] - Environment variables
        tags: Optional[Tags] - Resource tags
        network_config: Optional[NetworkConfig] - Network configuration

    Methods:
        run(submit_app, submit_class=None, submit_jars=None, submit_files=None, 
            inputs=None, outputs=None, arguments=None, wait=True, logs=True, 
            job_name=None, experiment_config=None, configuration=None, 
            spark_event_logs_s3_uri=None, kms_key=None) -> None
            Run Spark JAR job.
            
            Parameters:
                submit_app: str - Main JAR file path (required)
                    - S3 URI: "s3://bucket/app.jar"
                    - Local file: "./target/app.jar"
                submit_class: Optional[str] - Main class name
                    - Example: "com.example.spark.DataProcessor"
                    - Required for JAR without manifest
                submit_jars: Optional[List[str]] - Dependency JARs
                submit_files: Optional[List[str]] - Additional files
                inputs: Optional[List[ProcessingInput]] - Input data
                outputs: Optional[List[ProcessingOutput]] - Output data
                arguments: Optional[List[str]] - Application arguments
                wait: bool - Block until completion (default: True)
                logs: bool - Show logs (default: True)
                job_name: Optional[str] - Custom job name
                experiment_config: Optional - Experiment tracking
                configuration: Optional[List[Dict]] - Spark configuration
                spark_event_logs_s3_uri: Optional[str] - Event logs S3 URI
                kms_key: Optional[str] - KMS key
            
            Raises:
                ValueError: Invalid JAR or configuration
                ClientError: AWS API errors
        
        get_run_args(...) -> Dict
            Get normalized run arguments.
        
        start_history_server(spark_event_logs_s3_uri=None) -> None
            Start Spark history server.
        
        terminate_history_server() -> None
            Terminate history server.

    Notes:
        - Use for production Spark applications in Java/Scala
        - Better performance than PySpark for compute-intensive tasks
        - Requires compiled JAR file
        - Submit class needed if JAR has no manifest
    """

Usage:

from sagemaker.core.spark import SparkJarProcessor
from sagemaker.core.processing import ProcessingInput, ProcessingOutput

# Create SparkJar processor
processor = SparkJarProcessor(
    role="arn:aws:iam::123456789012:role/SageMakerRole",
    instance_type="ml.m5.2xlarge",
    instance_count=5,  # 1 driver + 4 executors
    framework_version="3.3"
)

# Run Spark JAR job
processor.run(
    submit_app="s3://my-bucket/jars/data-processor-1.0.0.jar",
    submit_class="com.mycompany.spark.DataProcessor",
    submit_jars=[
        "s3://my-bucket/jars/spark-avro_2.12-3.3.0.jar",
        "s3://my-bucket/jars/delta-core_2.12-2.4.0.jar"
    ],
    submit_files=["application.conf"],
    inputs=[
        ProcessingInput(
            source="s3://my-bucket/input-data",
            destination="/opt/ml/processing/input"
        )
    ],
    outputs=[
        ProcessingOutput(
            source="/opt/ml/processing/output",
            destination="s3://my-bucket/output-data"
        )
    ],
    arguments=["--mode", "production", "--date", "2024-01-15"],
    configuration=[
        {
            "Classification": "spark-defaults",
            "Properties": {
                "spark.executor.memory": "8g",
                "spark.driver.memory": "4g",
                "spark.executor.cores": "4",
                "spark.dynamicAllocation.enabled": "false",
                "spark.sql.adaptive.enabled": "true"
            }
        },
        {
            "Classification": "spark-env",
            "Properties": {},
            "Configurations": [{
                "Classification": "export",
                "Properties": {
                    "JAVA_HOME": "/usr/lib/jvm/java-11-amazon-corretto"
                }
            }]
        }
    ],
    spark_event_logs_s3_uri="s3://my-bucket/spark-history/",
    wait=True
)

Transformer

Handles batch transform jobs for offline inference on large datasets.

class Transformer:
    """
    Batch transform for offline inference.

    Parameters:
        model_name: str - SageMaker Model name (required)
        instance_count: int - Number of instances (required)
            - Range: 1-100
        instance_type: str - EC2 instance type (required)
        strategy: Optional[str] - Transform strategy
            - "MultiRecord": Batch multiple records per request (default)
            - "SingleRecord": One record per request
        assemble_with: Optional[str] - Assembly strategy
            - "None": No assembly
            - "Line": Assemble line-delimited records
        output_path: str - S3 output path (required)
        output_kms_key: Optional[str] - KMS key for output encryption
        accept: Optional[str] - Accept type for output
        max_concurrent_transforms: Optional[int] - Maximum concurrent transforms (default: 1)
            - Range: 1-100
            - Concurrent requests per instance
        max_payload: Optional[int] - Maximum payload size in MB (default: 6)
            - Range: 1-100 MB
        tags: Optional[List[Tag]] - Resource tags
        env: Optional[Dict[str, str]] - Environment variables
        base_transform_job_name: Optional[str] - Base job name
        sagemaker_session: Optional[Session] - SageMaker session
        volume_kms_key: Optional[str] - KMS key for volume encryption

    Methods:
        transform(data, data_type="S3Prefix", content_type=None, compression_type=None, 
                 split_type=None, job_name=None, input_filter=None, output_filter=None, 
                 join_source=None, model_client_config=None, batch_data_capture_config=None,
                 wait=True, logs=True) -> None
            Run batch transform job.
            
            Parameters:
                data: str - S3 URI for input data (required)
                data_type: str - "S3Prefix" or "ManifestFile" (default: "S3Prefix")
                content_type: Optional[str] - Input content type
                compression_type: Optional[str] - "None" or "Gzip"
                split_type: Optional[str] - Split type
                    - "None": Entire file per request
                    - "Line": One line per request
                    - "RecordIO": RecordIO records
                    - "TFRecord": TensorFlow records
                job_name: Optional[str] - Custom job name
                input_filter: Optional[str] - JSONPath to filter input
                output_filter: Optional[str] - JSONPath to filter output
                join_source: Optional[str] - Join output with input
                    - "None": Output only
                    - "Input": Join with input records
                model_client_config: Optional[Dict] - Model client config
                batch_data_capture_config: Optional[Dict] - Data capture config
                wait: bool - Block until completion (default: True)
                logs: bool - Show logs (default: True)
            
            Raises:
                ValueError: Invalid configuration
                ClientError: AWS API errors
        
        wait() -> None
            Wait for transform job to complete.
        
        stop() -> None
            Stop the transform job.
        
        describe() -> Dict
            Get transform job details.
            
            Returns:
                Dict: Complete job description

    Attributes:
        latest_transform_job: TransformJob - Most recent transform job
        output_path: str - S3 output path
    
    Notes:
        - More cost-effective than real-time endpoints for batch
        - Processes large datasets offline
        - Results written to S3
        - Use MultiRecord strategy for higher throughput
        - max_concurrent_transforms * max_payload determines throughput
        - Input/output filtering reduces data transfer
    """

Usage:

from sagemaker.core import Transformer

# Create transformer from model
transformer = Transformer(
    model_name="my-model",
    instance_count=4,
    instance_type="ml.m5.xlarge",
    output_path="s3://my-bucket/transform-output",
    strategy="MultiRecord",
    max_concurrent_transforms=8,  # 8 concurrent requests per instance
    max_payload=6,  # 6 MB per request
    accept="application/json"
)

# Run batch transform
try:
    transformer.transform(
        data="s3://my-bucket/batch-input",
        content_type="application/json",
        split_type="Line",  # One JSON object per line
        join_source="Input",  # Include input with output
        input_filter="$.features",  # Extract only features from input
        output_filter="$.predictions[0]",  # Extract first prediction
        wait=True,
        logs=True
    )
    
    print(f"Transform completed: {transformer.output_path}")
    print(f"Job: {transformer.latest_transform_job.transform_job_name}")
    
except RuntimeError as e:
    print(f"Transform failed: {e}")

Advanced Transform with Filtering:

# Input file (input.jsonl):
# {"id": 123, "features": [1.0, 2.0, 3.0], "metadata": {...}}
# {"id": 456, "features": [4.0, 5.0, 6.0], "metadata": {...}}

# Transform with filtering
transformer.transform(
    data="s3://bucket/input.jsonl",
    content_type="application/jsonlines",
    split_type="Line",
    input_filter="$.features",  # Send only features to model
    output_filter="$.score",  # Extract only score from model output
    join_source="Input"  # Join with original input
)

# Output file (input.jsonl.out):
# {"id": 123, "features": [1.0, 2.0, 3.0], "metadata": {...}, "score": 0.85}
# {"id": 456, "features": [4.0, 5.0, 6.0], "metadata": {...}, "score": 0.92}

Input/Output Configuration

ProcessingInput

class ProcessingInput:
    """
    Input data configuration for processing jobs.

    Fields:
        source: str - S3 URI or local path (required)
        destination: str - Container path (required)
            - Example: "/opt/ml/processing/input"
        input_name: Optional[str] - Input name
        s3_data_type: Optional[str] - S3 data type (default: "S3Prefix")
            - "S3Prefix": All objects under prefix
            - "ManifestFile": Objects listed in manifest
        s3_input_mode: Optional[str] - Input mode (default: "File")
            - "File": Download before processing
            - "Pipe": Stream during processing
        s3_data_distribution_type: Optional[str] - Distribution type (default: "FullyReplicated")
            - "FullyReplicated": Copy to all instances
            - "ShardedByS3Key": Distribute across instances
        s3_compression_type: Optional[str] - Compression (default: "None")
            - "None" or "Gzip"

    Notes:
        - ShardedByS3Key for distributed processing of large datasets
        - Pipe mode reduces startup time
        - Gzip automatically decompressed
        - Each instance mounts data at destination path
    """

ProcessingOutput

class ProcessingOutput:
    """
    Output data configuration for processing jobs.

    Fields:
        source: str - Container path (required)
            - Example: "/opt/ml/processing/output"
        destination: str - S3 URI for output (required)
        output_name: Optional[str] - Output name
        s3_upload_mode: Optional[str] - Upload mode (default: "EndOfJob")
            - "EndOfJob": Upload after job completes
            - "Continuous": Upload during job execution
        app_managed: Optional[bool] - Application-managed output (default: False)
            - True: Application writes directly to S3
            - False: SageMaker uploads from local path
        feature_store_output: Optional[FeatureStoreOutput] - Feature Store output config

    Notes:
        - Continuous upload reduces final upload time
        - Use app_managed for large outputs (write directly to S3)
        - Multiple outputs can have different destinations
    """

TransformInput

class TransformInput:
    """
    Input data configuration for batch transform.

    Fields:
        data: str - S3 URI for input data (required)
        data_type: str - Data type (default: "S3Prefix")
            - "S3Prefix" or "ManifestFile"
        content_type: Optional[str] - Content type
        compression_type: Optional[str] - Compression (default: "None")
            - "None" or "Gzip"
        split_type: Optional[str] - Split type (default: "None")
            - "None": Entire file per request
            - "Line": One line per request
            - "RecordIO": RecordIO splits
            - "TFRecord": TensorFlow record splits

    Notes:
        - Split type determines request granularity
        - Line split most common for JSONL/CSV
        - None split for single large files
    """

TransformOutput

class TransformOutput:
    """
    Output data configuration for batch transform.

    Fields:
        s3_output_path: str - S3 URI for output (required)
        accept: Optional[str] - Accept type for output
        assemble_with: Optional[str] - Assembly (default: "None")
            - "None": One output file per input file
            - "Line": Concatenate outputs line-by-line
        kms_key_id: Optional[str] - KMS key for encryption

    Notes:
        - Output files mirror input structure by default
        - Line assembly creates single output file
        - Encryption applied to all output objects
    """

Advanced Usage

Distributed Processing with Sharding

from sagemaker.core import Processor, ProcessingInput, ProcessingOutput

# Multi-instance processing with data sharding
processor = Processor(
    role=role,
    image_uri="my-processing-image:latest",
    instance_count=8,  # 8 parallel instances
    instance_type="ml.m5.4xlarge"
)

# Shard data across instances
inputs = [
    ProcessingInput(
        source="s3://my-bucket/large-dataset",  # 100 GB dataset
        destination="/opt/ml/processing/input",
        s3_data_distribution_type="ShardedByS3Key"  # Each instance gets subset
    )
]

outputs = [
    ProcessingOutput(
        source="/opt/ml/processing/output",
        destination="s3://my-bucket/processed",
        s3_upload_mode="Continuous"  # Upload during processing
    )
]

# Each instance processes ~12.5 GB
processor.run(inputs=inputs, outputs=outputs)

Processing with Spot Instances

from sagemaker.core import Processor

# Use spot instances for cost savings
processor = Processor(
    role=role,
    image_uri="my-image:latest",
    instance_type="ml.m5.xlarge",
    instance_count=4,
    use_spot_instances=True,
    max_wait_time_in_seconds=7200,  # 2 hours total
    max_runtime_in_seconds=3600  # 1 hour actual processing
)

# Up to 70% cost savings with spot
# May be interrupted - implement checkpointing if needed
processor.run(inputs=inputs, outputs=outputs)

Feature Store Integration

from sagemaker.core.inputs import ProcessingOutput, FeatureStoreOutput

# Write processing output directly to Feature Store
outputs = [
    ProcessingOutput(
        output_name="features",
        source="/opt/ml/processing/output/features.csv",
        feature_store_output=FeatureStoreOutput(
            feature_group_name="customer-features"
        ),
        app_managed=True  # Application writes directly
    )
]

processor.run(
    code="compute_features.py",
    inputs=inputs,
    outputs=outputs
)

Transform with Model Client Config

# Configure model behavior for transform
model_client_config = {
    "InvocationsTimeoutInSeconds": 600,  # 10 minutes per request
    "InvocationsMaxRetries": 3
}

transformer.transform(
    data="s3://bucket/input",
    content_type="application/json",
    model_client_config=model_client_config
)

Validation and Constraints

Processing Job Constraints

  • Maximum runtime: 5 days (432,000 seconds)
  • Instance count: 1-100
  • Volume size: 1-16384 GB
  • Maximum inputs: 20
  • Maximum outputs: 20
  • Environment variables: Maximum 512
  • Job name length: 1-63 characters

Transform Job Constraints

  • Maximum instances: 100
  • Max concurrent transforms: 1-100 per instance
  • Max payload: 100 MB per request
  • Container timeout: 3600 seconds
  • Input data size: Unlimited (distributed across instances)
  • Split types: None, Line, RecordIO, TFRecord

Spark Constraints

  • Minimum instances: 2 (1 driver + 1 executor)
  • Maximum instances: 100
  • Spark version: 2.4, 3.0, 3.1, 3.2, 3.3
  • Python version: py37, py38, py39, py310
  • JVM heap: Configured via spark.executor.memory

Common Error Scenarios

  1. Container Disk Full:

    • Cause: Insufficient volume_size_in_gb for data
    • Solution: Increase volume size or use streaming (Pipe mode, Continuous upload)
  2. OOM Error in Processing:

    • Cause: Insufficient instance memory
    • Solution: Use memory-optimized instances (ml.r5) or reduce batch size
  3. Shard Distribution Failure:

    • Cause: Data not shardable by S3 key
    • Solution: Organize S3 keys properly or use FullyReplicated
  4. Transform Timeout:

    • Cause: Model inference too slow
    • Solution: Increase max_payload, reduce max_concurrent_transforms, or optimize model
  5. Spark Driver OOM:

    • Cause: Driver collecting too much data
    • Solution: Increase spark.driver.memory, avoid collect(), use write operations
  6. File Not Found in Container:

    • Cause: Incorrect destination path
    • Solution: Verify paths match between input config and processing code