or run

tessl search
Log in

Version

Files

docs

ai-registry.mdclarify.mddata-io.mddebugger.mdevaluation.mdexperiments.mdexplainer-config.mdindex.mdjumpstart.mdlineage.mdmlops.mdmonitoring.mdprocessing.mdremote-functions.mdresources.mds3-utilities.mdserving.mdtraining.mdworkflow-primitives.md
tile.json

remote-functions.mddocs/

Remote Functions

Execute Python functions remotely on SageMaker infrastructure with automatic dependency management and resource allocation.

Capabilities

@remote Decorator

Decorator for converting local Python functions to remote SageMaker executions.

def remote(
    role: Optional[str] = None,
    instance_type: str = "ml.m5.large",
    instance_count: int = 1,
    keep_alive_period_in_seconds: int = 0,
    volume_size: int = 30,
    volume_kms_key: Optional[str] = None,
    max_runtime_in_seconds: int = 86400,
    image_uri: Optional[str] = None,
    environment_variables: Optional[Dict[str, str]] = None,
    dependencies: Optional[Union[str, List[str]]] = None,
    include_local_workdir: bool = False,
    custom_file_filter: Optional[CustomFileFilter] = None,
    instance_type_for_spark: Optional[str] = None,
    spark_config: Optional[SparkConfig] = None,
    s3_root_uri: Optional[str] = None,
    s3_kms_key: Optional[str] = None,
    job_name_prefix: Optional[str] = None,
    tags: Optional[Dict[str, str]] = None,
    job_conda_env: Optional[str] = None,
    sagemaker_session: Optional[Session] = None
):
    """
    Decorator to run Python functions remotely on SageMaker.

    Parameters:
        role: Optional[str] - IAM role ARN
            - Required unless in SageMaker environment
        instance_type: str - EC2 instance type (default: "ml.m5.large")
            - Any SageMaker training instance type
        instance_count: int - Number of instances (default: 1)
            - Use >1 for distributed processing
        keep_alive_period_in_seconds: int - Warm pool duration (default: 0)
            - Range: 0-3600 seconds
            - Reuse instance for faster subsequent calls
        volume_size: int - EBS volume size in GB (default: 30)
            - Range: 1-16384
        volume_kms_key: Optional[str] - KMS key for volume encryption
        max_runtime_in_seconds: int - Maximum runtime (default: 86400 = 24 hours)
            - Range: 1-432000 (5 days)
        image_uri: Optional[str] - Custom container image URI
            - Auto-detected based on Python version if not provided
        environment_variables: Optional[Dict[str, str]] - Environment variables
            - Passed to remote function execution
        dependencies: Optional[Union[str, List[str]]] - Dependencies
            - Path to requirements.txt
            - List of packages: ["pandas==2.0.0", "scikit-learn"]
        include_local_workdir: bool - Include local working directory (default: False)
            - Upload entire directory with function
        custom_file_filter: Optional[CustomFileFilter] - Custom file filter
            - Control which files uploaded
        instance_type_for_spark: Optional[str] - Instance type for Spark processing
        spark_config: Optional[SparkConfig] - Spark configuration
        s3_root_uri: Optional[str] - S3 root for artifacts
            - Defaults to: s3://sagemaker-{region}-{account}/
        s3_kms_key: Optional[str] - KMS key for S3 encryption
        job_name_prefix: Optional[str] - Prefix for job names
        tags: Optional[Dict[str, str]] - Resource tags
        job_conda_env: Optional[str] - Conda environment name
        sagemaker_session: Optional[Session] - SageMaker session

    Returns:
        Decorated function that executes remotely on SageMaker.
        Return value: Same as original function (pickled and transferred).

    Usage:
        Apply to any Python function to run it on SageMaker infrastructure.
        Function and return values must be picklable.
    
    Notes:
        - Function code and dependencies automatically uploaded
        - Function executed in SageMaker Training job
        - Results returned automatically
        - Keep-alive reduces cold start for repeated calls
        - Use for compute-intensive tasks
        - Billing per second of execution
    
    Raises:
        ValueError: Invalid configuration
        RuntimeError: Remote execution errors
    """

Basic Usage:

from sagemaker.core.remote_function import remote

@remote(
    role="arn:aws:iam::123456789012:role/SageMakerRole",
    instance_type="ml.m5.xlarge",
    keep_alive_period_in_seconds=300  # 5 minute warm pool
)
def train_model(data_path, hyperparameters):
    """Train a model remotely on SageMaker."""
    import pandas as pd
    from sklearn.ensemble import RandomForestClassifier
    import joblib
    
    # Load data from S3
    df = pd.read_csv(data_path)
    X = df.drop('target', axis=1)
    y = df['target']
    
    # Train model
    model = RandomForestClassifier(**hyperparameters)
    model.fit(X, y)
    
    # Save model to SageMaker output location
    joblib.dump(model, '/opt/ml/model/model.joblib')
    
    # Return results
    train_accuracy = model.score(X, y)
    feature_importance = dict(zip(X.columns, model.feature_importances_))
    
    return {
        "accuracy": train_accuracy,
        "feature_importance": feature_importance,
        "n_estimators": model.n_estimators
    }

# Call function - executes remotely on SageMaker
try:
    result = train_model(
        data_path="s3://my-bucket/data.csv",
        hyperparameters={"n_estimators": 100, "max_depth": 10, "random_state": 42}
    )
    
    print(f"Training accuracy: {result['accuracy']:.4f}")
    print(f"Top features: {sorted(result['feature_importance'].items(), key=lambda x: x[1], reverse=True)[:5]}")
    
except RuntimeError as e:
    print(f"Remote execution failed: {e}")

With Dependencies:

# Specify dependencies
@remote(
    role=role,
    instance_type="ml.m5.xlarge",
    dependencies=["pandas==2.0.0", "scikit-learn==1.3.0", "joblib==1.3.2"]
)
def process_data(input_path, output_path):
    """Process data with specific package versions."""
    import pandas as pd
    from sklearn.preprocessing import StandardScaler
    
    df = pd.read_csv(input_path)
    
    # Process data
    scaler = StandardScaler()
    df_scaled = pd.DataFrame(
        scaler.fit_transform(df),
        columns=df.columns
    )
    
    # Save to S3
    df_scaled.to_csv(output_path, index=False)
    
    return {"rows_processed": len(df_scaled)}

# Or use requirements.txt
@remote(
    role=role,
    instance_type="ml.m5.xlarge",
    dependencies="requirements.txt"  # File in current directory
)
def complex_processing():
    # All packages from requirements.txt available
    pass

GPU Functions:

@remote(
    role=role,
    instance_type="ml.p3.2xlarge",  # GPU instance
    image_uri="763104351884.dkr.ecr.us-west-2.amazonaws.com/pytorch-training:2.0-gpu-py310"
)
def train_on_gpu(model_config, data_path):
    """Train deep learning model on GPU."""
    import torch
    import torch.nn as nn
    
    # Check GPU availability
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    print(f"Using device: {device}")
    
    if torch.cuda.is_available():
        print(f"GPU: {torch.cuda.get_device_name(0)}")
        print(f"Memory: {torch.cuda.get_device_properties(0).total_memory / 1e9:.1f} GB")
    
    # Build and train model
    model = build_model(model_config).to(device)
    data = load_data(data_path, device)
    
    # Training loop
    optimizer = torch.optim.Adam(model.parameters())
    
    for epoch in range(model_config['epochs']):
        loss = train_epoch(model, data, optimizer, device)
        print(f"Epoch {epoch}: loss={loss:.4f}")
    
    # Save model
    torch.save(model.state_dict(), '/opt/ml/model/model.pt')
    
    return {"final_loss": loss, "epochs_completed": epoch + 1}

# Execute on GPU
result = train_on_gpu(
    model_config={"hidden_dims": [128, 64], "epochs": 10},
    data_path="s3://bucket/training-data.csv"
)

Spot Instances for Cost Savings:

@remote(
    role=role,
    instance_type="ml.m5.xlarge",
    use_spot_instances=True,  # Up to 70% cost savings
    max_wait_time_in_seconds=7200,  # 2 hours total
    max_runtime_in_seconds=3600  # 1 hour execution
)
def cost_effective_processing(data):
    """Run on spot instances."""
    # Implement checkpointing for interruption handling
    checkpoint_path = '/opt/ml/checkpoints/progress.pkl'
    
    # Resume from checkpoint if exists
    if os.path.exists(checkpoint_path):
        progress = load_checkpoint(checkpoint_path)
    else:
        progress = 0
    
    # Process data with checkpointing
    result = process_with_checkpoints(data, progress, checkpoint_path)
    
    return result

result = cost_effective_processing(large_dataset)

RemoteExecutor

Executor for running remote functions with advanced configuration.

class RemoteExecutor:
    """
    Executor for remote function execution.

    Parameters:
        role: str - IAM role ARN (required)
        instance_type: str - EC2 instance type (default: "ml.m5.large")
        instance_count: int - Number of instances (default: 1)
        max_parallel_job: Optional[int] - Maximum parallel jobs
            - Limits concurrent remote executions
        volume_size: int - EBS volume size (default: 30)
        volume_kms_key: Optional[str] - KMS key for encryption
        max_runtime_in_seconds: int - Maximum runtime (default: 86400)
        keep_alive_period_in_seconds: int - Warm pool duration (default: 0)
        container_log_level: int - Container logging level (default: logging.INFO)
        tags: Optional[List[Dict]] - Resource tags
        s3_root_uri: Optional[str] - S3 root URI
        s3_kms_key: Optional[str] - KMS key for S3
        environment_variables: Optional[Dict[str, str]] - Environment variables
        image_uri: Optional[str] - Container image URI
        dependencies: Optional[Union[str, List[str]]] - Dependencies
        include_local_workdir: bool - Include local workdir (default: False)
        custom_file_filter: Optional[CustomFileFilter] - File filter
        sagemaker_session: Optional[Session] - SageMaker session

    Methods:
        submit(func, *args, **kwargs) -> Future
            Submit function for remote execution.
            
            Parameters:
                func: callable - Function to execute (required)
                *args: Positional arguments for function
                **kwargs: Keyword arguments for function
            
            Returns:
                Future: Future object for result retrieval
            
            Raises:
                ValueError: Invalid function or arguments
        
        map(func, *iterables) -> List[Future]
            Map function over iterables remotely.
            
            Parameters:
                func: callable - Function to map (required)
                *iterables: Iterables to map over
            
            Returns:
                List[Future]: Futures for each invocation
        
        shutdown(wait=True) -> None
            Shutdown executor and cleanup resources.
            
            Parameters:
                wait: bool - Wait for pending jobs (default: True)

    Usage:
        Execute multiple functions in parallel with consistent configuration.
        Context manager for automatic cleanup.
    
    Notes:
        - Use for batch processing multiple items
        - Manages job queue automatically
        - Reuses warm pool across jobs
        - Clean shutdown with context manager
    """

Usage:

from sagemaker.core.remote_function import RemoteExecutor
from concurrent.futures import as_completed

# Create executor for batch processing
with RemoteExecutor(
    role="arn:aws:iam::123456789012:role/SageMakerRole",
    instance_type="ml.c5.xlarge",
    max_parallel_job=10,  # Up to 10 concurrent jobs
    keep_alive_period_in_seconds=600  # 10 minute warm pool
) as executor:
    
    # Define processing function
    def process_data(file_path):
        """Process single data file."""
        import pandas as pd
        
        df = pd.read_csv(file_path)
        
        # Process data
        df_processed = df.dropna()
        df_processed = df_processed[df_processed['value'] > 0]
        
        # Save to S3
        output_path = file_path.replace('/raw/', '/processed/')
        df_processed.to_csv(output_path, index=False)
        
        return {
            "input_rows": len(df),
            "output_rows": len(df_processed),
            "file": file_path
        }
    
    # Submit multiple jobs
    files = [f"s3://bucket/raw/file_{i}.csv" for i in range(100)]
    futures = []
    
    for file_path in files:
        future = executor.submit(process_data, file_path)
        futures.append(future)
    
    # Collect results as they complete
    results = []
    for future in as_completed(futures):
        try:
            result = future.result()
            results.append(result)
            print(f"Processed {result['file']}: {result['input_rows']} -> {result['output_rows']} rows")
        except Exception as e:
            print(f"Processing failed: {e}")

# Executor automatically shuts down
print(f"\nTotal processed: {len(results)} files")

Map Function:

# Map function over iterable
def analyze_file(file_path):
    """Analyze single file."""
    import pandas as pd
    
    df = pd.read_csv(file_path)
    
    return {
        "file": file_path,
        "rows": len(df),
        "mean": df['value'].mean(),
        "std": df['value'].std()
    }

with RemoteExecutor(
    role=role,
    instance_type="ml.m5.xlarge",
    max_parallel_job=20
) as executor:
    files = [f"s3://bucket/data/file_{i}.csv" for i in range(200)]
    
    # Map function over all files
    results = list(executor.map(analyze_file, files))
    
    # Aggregate results
    total_rows = sum(r['rows'] for r in results)
    print(f"Total rows across all files: {total_rows}")

Configuration Classes

SparkConfig

class SparkConfig:
    """
    Spark configuration for remote functions.

    Parameters:
        spark_version: str - Spark version (default: "3.3")
            - Supported: "3.0", "3.1", "3.2", "3.3"
        num_executors: Optional[int] - Number of executors
        executor_memory: Optional[str] - Executor memory (e.g., "4g")
        executor_cores: Optional[int] - Cores per executor
        driver_memory: Optional[str] - Driver memory (e.g., "4g")
        driver_cores: Optional[int] - Driver cores
        configuration: Optional[Dict[str, str]] - Additional Spark config
            - spark.* properties

    Usage:
        Configure Spark for distributed data processing in remote functions.
    
    Notes:
        - Requires instance_type_for_spark parameter in @remote
        - Spark cluster automatically provisioned
        - Configuration applied to SparkSession
    """

Usage with Spark:

from sagemaker.core.remote_function import remote, SparkConfig

spark_config = SparkConfig(
    spark_version="3.3",
    num_executors=4,
    executor_memory="8g",
    executor_cores=4,
    driver_memory="4g",
    configuration={
        "spark.sql.adaptive.enabled": "true",
        "spark.sql.adaptive.coalescePartitions.enabled": "true"
    }
)

@remote(
    role=role,
    instance_type="ml.m5.4xlarge",  # Driver instance
    instance_count=1,
    instance_type_for_spark="ml.m5.2xlarge",  # Executor instances
    spark_config=spark_config
)
def process_large_dataset(data_path, output_path):
    """Process large dataset with Spark."""
    from pyspark.sql import SparkSession
    from pyspark.sql.functions import col, avg, count
    
    # Spark session automatically configured
    spark = SparkSession.builder \
        .appName("RemoteDataProcessing") \
        .getOrCreate()
    
    # Read large dataset
    df = spark.read.parquet(data_path)
    
    # Process data at scale
    result = df \
        .filter(col("value").isNotNull()) \
        .groupBy("category") \
        .agg(
            count("*").alias("count"),
            avg("value").alias("avg_value")
        )
    
    # Write output
    result.write.parquet(output_path, mode="overwrite")
    
    spark.stop()
    
    return {
        "input_rows": df.count(),
        "output_categories": result.count()
    }

# Execute with Spark
result = process_large_dataset(
    data_path="s3://bucket/large-dataset",
    output_path="s3://bucket/aggregated"
)

print(f"Processed {result['input_rows']} rows into {result['output_categories']} categories")

CustomFileFilter

class CustomFileFilter:
    """
    Custom file filter for dependency packaging.

    Parameters:
        ignore_name_patterns: Optional[List[str]] - Patterns to ignore (glob)
            - Example: ["*.pyc", "*.log", ".git*"]
        ignore_dirs: Optional[List[str]] - Directories to ignore
            - Example: [".git", "__pycache__", "tests", ".venv"]

    Methods:
        should_include(path: str) -> bool
            Check if file should be included.
            
            Parameters:
                path: str - File path to check
            
            Returns:
                bool: True if file should be included

    Usage:
        Control which local files are packaged with remote function.
        Reduces upload size and time.
    
    Notes:
        - Applied when include_local_workdir=True
        - Default filter excludes common patterns
        - Custom filter overrides default
    """

Usage:

from sagemaker.core.remote_function import remote, CustomFileFilter

# Create custom filter
file_filter = CustomFileFilter(
    ignore_name_patterns=[
        "*.pyc",
        "*.pyo",
        "*.pyd",
        "*~",
        "*.log",
        "*.tmp",
        ".DS_Store",
        ".git*"
    ],
    ignore_dirs=[
        ".git",
        "__pycache__",
        ".pytest_cache",
        ".mypy_cache",
        "tests",
        "docs",
        ".venv",
        "venv",
        "node_modules"
    ]
)

@remote(
    role=role,
    instance_type="ml.m5.xlarge",
    include_local_workdir=True,  # Upload current directory
    custom_file_filter=file_filter  # Filter unnecessary files
)
def my_function_with_local_modules():
    """Function using local modules."""
    from my_local_module import helper_function
    
    result = helper_function()
    return result

# Only relevant Python files uploaded, not test/cache files
result = my_function_with_local_modules()

Advanced Usage

Environment Variables

@remote(
    role=role,
    instance_type="ml.m5.xlarge",
    environment_variables={
        "API_KEY": "secret-key-value",
        "MODEL_VERSION": "v2.0",
        "DEBUG": "false",
        "LOG_LEVEL": "INFO"
    }
)
def function_with_env():
    """Access environment variables in remote function."""
    import os
    
    api_key = os.environ.get("API_KEY")
    model_version = os.environ.get("MODEL_VERSION")
    debug = os.environ.get("DEBUG") == "true"
    
    # Use environment variables
    result = call_api(api_key, model_version, debug=debug)
    
    return result

result = function_with_env()

Warm Pools for Repeated Execution

# Use warm pools to reduce cold start time
@remote(
    role=role,
    instance_type="ml.m5.xlarge",
    keep_alive_period_in_seconds=1800  # 30 minutes
)
def repeated_function(data):
    """Function called repeatedly."""
    return process(data)

# First call: provisions instance (~3-5 minutes)
result1 = repeated_function(data1)

# Subsequent calls within 30 min: reuse warm instance (~10-30 seconds)
result2 = repeated_function(data2)
result3 = repeated_function(data3)
result4 = repeated_function(data4)

# After 30 minutes of inactivity: instance released
# Next call provisions new instance

Note on Warm Pool Costs:

  • Warm instances billed for keep-alive period
  • Balance cost vs latency reduction
  • Good for iterative development or batch processing
  • Not recommended for infrequent calls

Job Naming and Tags

@remote(
    role=role,
    instance_type="ml.m5.xlarge",
    job_name_prefix="data-processing",
    tags={
        "Project": "MLPipeline",
        "Team": "DataScience",
        "Environment": "Production",
        "CostCenter": "Engineering"
    }
)
def tagged_function(data):
    """Function with custom naming and tags."""
    return process(data)

# Job name: data-processing-2024-01-15-10-30-00-abc123
# All tags applied to training job resource
result = tagged_function(data)

S3 Integration

@remote(
    role=role,
    instance_type="ml.m5.xlarge",
    s3_root_uri="s3://my-bucket/remote-functions",  # Custom S3 location
    s3_kms_key="arn:aws:kms:us-west-2:123:key/abc"  # Encryption
)
def function_with_s3():
    """Function with custom S3 configuration."""
    # Artifacts stored in: s3://my-bucket/remote-functions/...
    # All data encrypted with KMS key
    
    result = expensive_computation()
    
    # Intermediate results automatically saved to S3
    return result

Error Handling in Remote Functions

@remote(role=role, instance_type="ml.m5.xlarge")
def robust_function(data):
    """Function with comprehensive error handling."""
    import logging
    
    logger = logging.getLogger(__name__)
    
    try:
        # Main processing logic
        result = process_data(data)
        
        return {
            "status": "success",
            "result": result
        }
        
    except FileNotFoundError as e:
        logger.error(f"Data file not found: {e}")
        return {
            "status": "error",
            "error_type": "FileNotFoundError",
            "message": str(e)
        }
        
    except ValueError as e:
        logger.error(f"Invalid data format: {e}")
        return {
            "status": "error",
            "error_type": "ValueError",
            "message": str(e)
        }
        
    except Exception as e:
        logger.error(f"Unexpected error: {e}", exc_info=True)
        return {
            "status": "error",
            "error_type": type(e).__name__,
            "message": str(e)
        }

# Call with error handling
result = robust_function(data)

if result["status"] == "error":
    print(f"Error occurred: {result['error_type']}: {result['message']}")
else:
    print(f"Success: {result['result']}")

Local Testing Before Remote Execution

# Develop and test locally first
def my_function(data):
    """Function to test locally."""
    import pandas as pd
    
    df = pd.DataFrame(data)
    result = df.describe().to_dict()
    
    return result

# Test locally with sample data
test_data = {"values": [1, 2, 3, 4, 5]}
local_result = my_function(test_data)
print(f"Local test: {local_result}")

# After validation, add @remote decorator
from sagemaker.core.remote_function import remote

@remote(role=role, instance_type="ml.m5.xlarge")
def my_function(data):
    """Now runs remotely on SageMaker."""
    import pandas as pd
    
    df = pd.DataFrame(data)
    result = df.describe().to_dict()
    
    return result

# Execute remotely
remote_result = my_function(test_data)
print(f"Remote test: {remote_result}")

Conda Environments

@remote(
    role=role,
    instance_type="ml.m5.xlarge",
    job_conda_env="my_conda_env"  # Pre-configured conda environment
)
def function_with_conda():
    """Runs in specified conda environment."""
    # Conda environment must be configured in container
    import specialized_package_from_conda
    
    result = specialized_package_from_conda.process()
    return result

Validation and Constraints

Remote Function Constraints

  • Function requirements: Must be picklable (no lambda, nested functions)
  • Arguments: Must be picklable (no file handles, sockets, etc.)
  • Return value: Must be picklable
  • Maximum runtime: 5 days (432,000 seconds)
  • Maximum keep-alive: 3600 seconds (1 hour)
  • Maximum concurrent jobs: 100 per executor

Dependencies Constraints

  • Requirements file size: Maximum 100 MB
  • Dependency installation time: Counted in runtime
  • Package compatibility: Must be pip-installable
  • Local workdir size: Maximum 100 MB

Data Transfer Constraints

  • S3 upload: Function code and dependencies
  • S3 download: Input arguments if large
  • S3 upload: Return value if large
  • Maximum object size: 5 TB (S3 limit)

Common Error Scenarios

  1. Function Not Picklable:

    • Cause: Using lambda or nested functions
    • Solution: Define top-level functions only
  2. Dependency Installation Failed:

    • Cause: Package not available or version conflict
    • Solution: Verify package names/versions, use requirements.txt
  3. Out of Memory:

    • Cause: Processing data too large for instance
    • Solution: Use larger instance type or process in chunks
  4. Timeout Exceeded:

    • Cause: Function runs longer than max_runtime_in_seconds
    • Solution: Increase timeout or optimize function
  5. Spot Interruption:

    • Cause: Spot instance reclaimed
    • Solution: Implement checkpointing, increase max_wait_time
  6. Return Value Too Large:

    • Cause: Return value exceeds pickle/S3 limits
    • Solution: Write large results directly to S3, return URI instead