Execute Python functions remotely on SageMaker infrastructure with automatic dependency management and resource allocation.
Decorator for converting local Python functions to remote SageMaker executions.
def remote(
role: Optional[str] = None,
instance_type: str = "ml.m5.large",
instance_count: int = 1,
keep_alive_period_in_seconds: int = 0,
volume_size: int = 30,
volume_kms_key: Optional[str] = None,
max_runtime_in_seconds: int = 86400,
image_uri: Optional[str] = None,
environment_variables: Optional[Dict[str, str]] = None,
dependencies: Optional[Union[str, List[str]]] = None,
include_local_workdir: bool = False,
custom_file_filter: Optional[CustomFileFilter] = None,
instance_type_for_spark: Optional[str] = None,
spark_config: Optional[SparkConfig] = None,
s3_root_uri: Optional[str] = None,
s3_kms_key: Optional[str] = None,
job_name_prefix: Optional[str] = None,
tags: Optional[Dict[str, str]] = None,
job_conda_env: Optional[str] = None,
sagemaker_session: Optional[Session] = None
):
"""
Decorator to run Python functions remotely on SageMaker.
Parameters:
role: Optional[str] - IAM role ARN
- Required unless in SageMaker environment
instance_type: str - EC2 instance type (default: "ml.m5.large")
- Any SageMaker training instance type
instance_count: int - Number of instances (default: 1)
- Use >1 for distributed processing
keep_alive_period_in_seconds: int - Warm pool duration (default: 0)
- Range: 0-3600 seconds
- Reuse instance for faster subsequent calls
volume_size: int - EBS volume size in GB (default: 30)
- Range: 1-16384
volume_kms_key: Optional[str] - KMS key for volume encryption
max_runtime_in_seconds: int - Maximum runtime (default: 86400 = 24 hours)
- Range: 1-432000 (5 days)
image_uri: Optional[str] - Custom container image URI
- Auto-detected based on Python version if not provided
environment_variables: Optional[Dict[str, str]] - Environment variables
- Passed to remote function execution
dependencies: Optional[Union[str, List[str]]] - Dependencies
- Path to requirements.txt
- List of packages: ["pandas==2.0.0", "scikit-learn"]
include_local_workdir: bool - Include local working directory (default: False)
- Upload entire directory with function
custom_file_filter: Optional[CustomFileFilter] - Custom file filter
- Control which files uploaded
instance_type_for_spark: Optional[str] - Instance type for Spark processing
spark_config: Optional[SparkConfig] - Spark configuration
s3_root_uri: Optional[str] - S3 root for artifacts
- Defaults to: s3://sagemaker-{region}-{account}/
s3_kms_key: Optional[str] - KMS key for S3 encryption
job_name_prefix: Optional[str] - Prefix for job names
tags: Optional[Dict[str, str]] - Resource tags
job_conda_env: Optional[str] - Conda environment name
sagemaker_session: Optional[Session] - SageMaker session
Returns:
Decorated function that executes remotely on SageMaker.
Return value: Same as original function (pickled and transferred).
Usage:
Apply to any Python function to run it on SageMaker infrastructure.
Function and return values must be picklable.
Notes:
- Function code and dependencies automatically uploaded
- Function executed in SageMaker Training job
- Results returned automatically
- Keep-alive reduces cold start for repeated calls
- Use for compute-intensive tasks
- Billing per second of execution
Raises:
ValueError: Invalid configuration
RuntimeError: Remote execution errors
"""Basic Usage:
from sagemaker.core.remote_function import remote
@remote(
role="arn:aws:iam::123456789012:role/SageMakerRole",
instance_type="ml.m5.xlarge",
keep_alive_period_in_seconds=300 # 5 minute warm pool
)
def train_model(data_path, hyperparameters):
"""Train a model remotely on SageMaker."""
import pandas as pd
from sklearn.ensemble import RandomForestClassifier
import joblib
# Load data from S3
df = pd.read_csv(data_path)
X = df.drop('target', axis=1)
y = df['target']
# Train model
model = RandomForestClassifier(**hyperparameters)
model.fit(X, y)
# Save model to SageMaker output location
joblib.dump(model, '/opt/ml/model/model.joblib')
# Return results
train_accuracy = model.score(X, y)
feature_importance = dict(zip(X.columns, model.feature_importances_))
return {
"accuracy": train_accuracy,
"feature_importance": feature_importance,
"n_estimators": model.n_estimators
}
# Call function - executes remotely on SageMaker
try:
result = train_model(
data_path="s3://my-bucket/data.csv",
hyperparameters={"n_estimators": 100, "max_depth": 10, "random_state": 42}
)
print(f"Training accuracy: {result['accuracy']:.4f}")
print(f"Top features: {sorted(result['feature_importance'].items(), key=lambda x: x[1], reverse=True)[:5]}")
except RuntimeError as e:
print(f"Remote execution failed: {e}")With Dependencies:
# Specify dependencies
@remote(
role=role,
instance_type="ml.m5.xlarge",
dependencies=["pandas==2.0.0", "scikit-learn==1.3.0", "joblib==1.3.2"]
)
def process_data(input_path, output_path):
"""Process data with specific package versions."""
import pandas as pd
from sklearn.preprocessing import StandardScaler
df = pd.read_csv(input_path)
# Process data
scaler = StandardScaler()
df_scaled = pd.DataFrame(
scaler.fit_transform(df),
columns=df.columns
)
# Save to S3
df_scaled.to_csv(output_path, index=False)
return {"rows_processed": len(df_scaled)}
# Or use requirements.txt
@remote(
role=role,
instance_type="ml.m5.xlarge",
dependencies="requirements.txt" # File in current directory
)
def complex_processing():
# All packages from requirements.txt available
passGPU Functions:
@remote(
role=role,
instance_type="ml.p3.2xlarge", # GPU instance
image_uri="763104351884.dkr.ecr.us-west-2.amazonaws.com/pytorch-training:2.0-gpu-py310"
)
def train_on_gpu(model_config, data_path):
"""Train deep learning model on GPU."""
import torch
import torch.nn as nn
# Check GPU availability
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {device}")
if torch.cuda.is_available():
print(f"GPU: {torch.cuda.get_device_name(0)}")
print(f"Memory: {torch.cuda.get_device_properties(0).total_memory / 1e9:.1f} GB")
# Build and train model
model = build_model(model_config).to(device)
data = load_data(data_path, device)
# Training loop
optimizer = torch.optim.Adam(model.parameters())
for epoch in range(model_config['epochs']):
loss = train_epoch(model, data, optimizer, device)
print(f"Epoch {epoch}: loss={loss:.4f}")
# Save model
torch.save(model.state_dict(), '/opt/ml/model/model.pt')
return {"final_loss": loss, "epochs_completed": epoch + 1}
# Execute on GPU
result = train_on_gpu(
model_config={"hidden_dims": [128, 64], "epochs": 10},
data_path="s3://bucket/training-data.csv"
)Spot Instances for Cost Savings:
@remote(
role=role,
instance_type="ml.m5.xlarge",
use_spot_instances=True, # Up to 70% cost savings
max_wait_time_in_seconds=7200, # 2 hours total
max_runtime_in_seconds=3600 # 1 hour execution
)
def cost_effective_processing(data):
"""Run on spot instances."""
# Implement checkpointing for interruption handling
checkpoint_path = '/opt/ml/checkpoints/progress.pkl'
# Resume from checkpoint if exists
if os.path.exists(checkpoint_path):
progress = load_checkpoint(checkpoint_path)
else:
progress = 0
# Process data with checkpointing
result = process_with_checkpoints(data, progress, checkpoint_path)
return result
result = cost_effective_processing(large_dataset)Executor for running remote functions with advanced configuration.
class RemoteExecutor:
"""
Executor for remote function execution.
Parameters:
role: str - IAM role ARN (required)
instance_type: str - EC2 instance type (default: "ml.m5.large")
instance_count: int - Number of instances (default: 1)
max_parallel_job: Optional[int] - Maximum parallel jobs
- Limits concurrent remote executions
volume_size: int - EBS volume size (default: 30)
volume_kms_key: Optional[str] - KMS key for encryption
max_runtime_in_seconds: int - Maximum runtime (default: 86400)
keep_alive_period_in_seconds: int - Warm pool duration (default: 0)
container_log_level: int - Container logging level (default: logging.INFO)
tags: Optional[List[Dict]] - Resource tags
s3_root_uri: Optional[str] - S3 root URI
s3_kms_key: Optional[str] - KMS key for S3
environment_variables: Optional[Dict[str, str]] - Environment variables
image_uri: Optional[str] - Container image URI
dependencies: Optional[Union[str, List[str]]] - Dependencies
include_local_workdir: bool - Include local workdir (default: False)
custom_file_filter: Optional[CustomFileFilter] - File filter
sagemaker_session: Optional[Session] - SageMaker session
Methods:
submit(func, *args, **kwargs) -> Future
Submit function for remote execution.
Parameters:
func: callable - Function to execute (required)
*args: Positional arguments for function
**kwargs: Keyword arguments for function
Returns:
Future: Future object for result retrieval
Raises:
ValueError: Invalid function or arguments
map(func, *iterables) -> List[Future]
Map function over iterables remotely.
Parameters:
func: callable - Function to map (required)
*iterables: Iterables to map over
Returns:
List[Future]: Futures for each invocation
shutdown(wait=True) -> None
Shutdown executor and cleanup resources.
Parameters:
wait: bool - Wait for pending jobs (default: True)
Usage:
Execute multiple functions in parallel with consistent configuration.
Context manager for automatic cleanup.
Notes:
- Use for batch processing multiple items
- Manages job queue automatically
- Reuses warm pool across jobs
- Clean shutdown with context manager
"""Usage:
from sagemaker.core.remote_function import RemoteExecutor
from concurrent.futures import as_completed
# Create executor for batch processing
with RemoteExecutor(
role="arn:aws:iam::123456789012:role/SageMakerRole",
instance_type="ml.c5.xlarge",
max_parallel_job=10, # Up to 10 concurrent jobs
keep_alive_period_in_seconds=600 # 10 minute warm pool
) as executor:
# Define processing function
def process_data(file_path):
"""Process single data file."""
import pandas as pd
df = pd.read_csv(file_path)
# Process data
df_processed = df.dropna()
df_processed = df_processed[df_processed['value'] > 0]
# Save to S3
output_path = file_path.replace('/raw/', '/processed/')
df_processed.to_csv(output_path, index=False)
return {
"input_rows": len(df),
"output_rows": len(df_processed),
"file": file_path
}
# Submit multiple jobs
files = [f"s3://bucket/raw/file_{i}.csv" for i in range(100)]
futures = []
for file_path in files:
future = executor.submit(process_data, file_path)
futures.append(future)
# Collect results as they complete
results = []
for future in as_completed(futures):
try:
result = future.result()
results.append(result)
print(f"Processed {result['file']}: {result['input_rows']} -> {result['output_rows']} rows")
except Exception as e:
print(f"Processing failed: {e}")
# Executor automatically shuts down
print(f"\nTotal processed: {len(results)} files")Map Function:
# Map function over iterable
def analyze_file(file_path):
"""Analyze single file."""
import pandas as pd
df = pd.read_csv(file_path)
return {
"file": file_path,
"rows": len(df),
"mean": df['value'].mean(),
"std": df['value'].std()
}
with RemoteExecutor(
role=role,
instance_type="ml.m5.xlarge",
max_parallel_job=20
) as executor:
files = [f"s3://bucket/data/file_{i}.csv" for i in range(200)]
# Map function over all files
results = list(executor.map(analyze_file, files))
# Aggregate results
total_rows = sum(r['rows'] for r in results)
print(f"Total rows across all files: {total_rows}")class SparkConfig:
"""
Spark configuration for remote functions.
Parameters:
spark_version: str - Spark version (default: "3.3")
- Supported: "3.0", "3.1", "3.2", "3.3"
num_executors: Optional[int] - Number of executors
executor_memory: Optional[str] - Executor memory (e.g., "4g")
executor_cores: Optional[int] - Cores per executor
driver_memory: Optional[str] - Driver memory (e.g., "4g")
driver_cores: Optional[int] - Driver cores
configuration: Optional[Dict[str, str]] - Additional Spark config
- spark.* properties
Usage:
Configure Spark for distributed data processing in remote functions.
Notes:
- Requires instance_type_for_spark parameter in @remote
- Spark cluster automatically provisioned
- Configuration applied to SparkSession
"""Usage with Spark:
from sagemaker.core.remote_function import remote, SparkConfig
spark_config = SparkConfig(
spark_version="3.3",
num_executors=4,
executor_memory="8g",
executor_cores=4,
driver_memory="4g",
configuration={
"spark.sql.adaptive.enabled": "true",
"spark.sql.adaptive.coalescePartitions.enabled": "true"
}
)
@remote(
role=role,
instance_type="ml.m5.4xlarge", # Driver instance
instance_count=1,
instance_type_for_spark="ml.m5.2xlarge", # Executor instances
spark_config=spark_config
)
def process_large_dataset(data_path, output_path):
"""Process large dataset with Spark."""
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, avg, count
# Spark session automatically configured
spark = SparkSession.builder \
.appName("RemoteDataProcessing") \
.getOrCreate()
# Read large dataset
df = spark.read.parquet(data_path)
# Process data at scale
result = df \
.filter(col("value").isNotNull()) \
.groupBy("category") \
.agg(
count("*").alias("count"),
avg("value").alias("avg_value")
)
# Write output
result.write.parquet(output_path, mode="overwrite")
spark.stop()
return {
"input_rows": df.count(),
"output_categories": result.count()
}
# Execute with Spark
result = process_large_dataset(
data_path="s3://bucket/large-dataset",
output_path="s3://bucket/aggregated"
)
print(f"Processed {result['input_rows']} rows into {result['output_categories']} categories")class CustomFileFilter:
"""
Custom file filter for dependency packaging.
Parameters:
ignore_name_patterns: Optional[List[str]] - Patterns to ignore (glob)
- Example: ["*.pyc", "*.log", ".git*"]
ignore_dirs: Optional[List[str]] - Directories to ignore
- Example: [".git", "__pycache__", "tests", ".venv"]
Methods:
should_include(path: str) -> bool
Check if file should be included.
Parameters:
path: str - File path to check
Returns:
bool: True if file should be included
Usage:
Control which local files are packaged with remote function.
Reduces upload size and time.
Notes:
- Applied when include_local_workdir=True
- Default filter excludes common patterns
- Custom filter overrides default
"""Usage:
from sagemaker.core.remote_function import remote, CustomFileFilter
# Create custom filter
file_filter = CustomFileFilter(
ignore_name_patterns=[
"*.pyc",
"*.pyo",
"*.pyd",
"*~",
"*.log",
"*.tmp",
".DS_Store",
".git*"
],
ignore_dirs=[
".git",
"__pycache__",
".pytest_cache",
".mypy_cache",
"tests",
"docs",
".venv",
"venv",
"node_modules"
]
)
@remote(
role=role,
instance_type="ml.m5.xlarge",
include_local_workdir=True, # Upload current directory
custom_file_filter=file_filter # Filter unnecessary files
)
def my_function_with_local_modules():
"""Function using local modules."""
from my_local_module import helper_function
result = helper_function()
return result
# Only relevant Python files uploaded, not test/cache files
result = my_function_with_local_modules()@remote(
role=role,
instance_type="ml.m5.xlarge",
environment_variables={
"API_KEY": "secret-key-value",
"MODEL_VERSION": "v2.0",
"DEBUG": "false",
"LOG_LEVEL": "INFO"
}
)
def function_with_env():
"""Access environment variables in remote function."""
import os
api_key = os.environ.get("API_KEY")
model_version = os.environ.get("MODEL_VERSION")
debug = os.environ.get("DEBUG") == "true"
# Use environment variables
result = call_api(api_key, model_version, debug=debug)
return result
result = function_with_env()# Use warm pools to reduce cold start time
@remote(
role=role,
instance_type="ml.m5.xlarge",
keep_alive_period_in_seconds=1800 # 30 minutes
)
def repeated_function(data):
"""Function called repeatedly."""
return process(data)
# First call: provisions instance (~3-5 minutes)
result1 = repeated_function(data1)
# Subsequent calls within 30 min: reuse warm instance (~10-30 seconds)
result2 = repeated_function(data2)
result3 = repeated_function(data3)
result4 = repeated_function(data4)
# After 30 minutes of inactivity: instance released
# Next call provisions new instanceNote on Warm Pool Costs:
@remote(
role=role,
instance_type="ml.m5.xlarge",
job_name_prefix="data-processing",
tags={
"Project": "MLPipeline",
"Team": "DataScience",
"Environment": "Production",
"CostCenter": "Engineering"
}
)
def tagged_function(data):
"""Function with custom naming and tags."""
return process(data)
# Job name: data-processing-2024-01-15-10-30-00-abc123
# All tags applied to training job resource
result = tagged_function(data)@remote(
role=role,
instance_type="ml.m5.xlarge",
s3_root_uri="s3://my-bucket/remote-functions", # Custom S3 location
s3_kms_key="arn:aws:kms:us-west-2:123:key/abc" # Encryption
)
def function_with_s3():
"""Function with custom S3 configuration."""
# Artifacts stored in: s3://my-bucket/remote-functions/...
# All data encrypted with KMS key
result = expensive_computation()
# Intermediate results automatically saved to S3
return result@remote(role=role, instance_type="ml.m5.xlarge")
def robust_function(data):
"""Function with comprehensive error handling."""
import logging
logger = logging.getLogger(__name__)
try:
# Main processing logic
result = process_data(data)
return {
"status": "success",
"result": result
}
except FileNotFoundError as e:
logger.error(f"Data file not found: {e}")
return {
"status": "error",
"error_type": "FileNotFoundError",
"message": str(e)
}
except ValueError as e:
logger.error(f"Invalid data format: {e}")
return {
"status": "error",
"error_type": "ValueError",
"message": str(e)
}
except Exception as e:
logger.error(f"Unexpected error: {e}", exc_info=True)
return {
"status": "error",
"error_type": type(e).__name__,
"message": str(e)
}
# Call with error handling
result = robust_function(data)
if result["status"] == "error":
print(f"Error occurred: {result['error_type']}: {result['message']}")
else:
print(f"Success: {result['result']}")# Develop and test locally first
def my_function(data):
"""Function to test locally."""
import pandas as pd
df = pd.DataFrame(data)
result = df.describe().to_dict()
return result
# Test locally with sample data
test_data = {"values": [1, 2, 3, 4, 5]}
local_result = my_function(test_data)
print(f"Local test: {local_result}")
# After validation, add @remote decorator
from sagemaker.core.remote_function import remote
@remote(role=role, instance_type="ml.m5.xlarge")
def my_function(data):
"""Now runs remotely on SageMaker."""
import pandas as pd
df = pd.DataFrame(data)
result = df.describe().to_dict()
return result
# Execute remotely
remote_result = my_function(test_data)
print(f"Remote test: {remote_result}")@remote(
role=role,
instance_type="ml.m5.xlarge",
job_conda_env="my_conda_env" # Pre-configured conda environment
)
def function_with_conda():
"""Runs in specified conda environment."""
# Conda environment must be configured in container
import specialized_package_from_conda
result = specialized_package_from_conda.process()
return resultFunction Not Picklable:
Dependency Installation Failed:
Out of Memory:
Timeout Exceeded:
Spot Interruption:
Return Value Too Large: