0
# Remote Functions
1
2
Execute Python functions remotely on SageMaker compute with automatic dependency management, data transfer, and result retrieval, enabling seamless scaling of compute-intensive workloads without infrastructure management.
3
4
## Capabilities
5
6
### Remote Function Decorator
7
8
The `@remote` decorator transforms regular Python functions into remotely executable functions on SageMaker managed infrastructure.
9
10
```python { .api }
11
@remote(
12
instance_type: str,
13
instance_count: int = 1,
14
role: str = None,
15
image_uri: str = None,
16
volume_size: int = 30,
17
volume_kms_key: str = None,
18
max_runtime_in_seconds: int = None,
19
keep_alive_period_in_seconds: int = 0,
20
base_job_name: str = None,
21
sagemaker_session: Session = None,
22
environment: dict = None,
23
tags: List[dict] = None,
24
subnets: List[str] = None,
25
security_group_ids: List[str] = None,
26
encrypt_inter_container_traffic: bool = False,
27
use_spot_instances: bool = False,
28
max_wait_time_in_seconds: int = None,
29
max_retry_attempts: int = 1,
30
dependencies: str = "auto",
31
pre_execution_commands: List[str] = None,
32
pre_execution_script: str = None,
33
enable_network_isolation: bool = False,
34
custom_file_filter: 'CustomFileFilter' = None,
35
spark_config: 'SparkConfig' = None,
36
**kwargs
37
)
38
def remote_function():
39
"""
40
Decorator to execute Python functions remotely on SageMaker.
41
42
Parameters:
43
- instance_type (str): EC2 instance type for remote execution
44
- instance_count (int, optional): Number of instances for distributed execution
45
- role (str, optional): IAM role ARN with SageMaker permissions
46
- image_uri (str, optional): Custom Docker image URI
47
- volume_size (int, optional): EBS volume size in GB
48
- volume_kms_key (str, optional): KMS key for volume encryption
49
- max_runtime_in_seconds (int, optional): Maximum execution time
50
- keep_alive_period_in_seconds (int, optional): Keep infrastructure alive
51
- base_job_name (str, optional): Base name for remote jobs
52
- sagemaker_session (Session, optional): SageMaker session
53
- environment (dict, optional): Environment variables
54
- tags (List[dict], optional): Resource tags
55
- subnets (List[str], optional): VPC subnet IDs
56
- security_group_ids (List[str], optional): VPC security group IDs
57
- encrypt_inter_container_traffic (bool, optional): Enable encryption
58
- use_spot_instances (bool, optional): Use EC2 Spot instances
59
- max_wait_time_in_seconds (int, optional): Max wait time for Spot
60
- max_retry_attempts (int, optional): Maximum retry attempts
61
- dependencies (str, optional): Dependency management ("auto" or path)
62
- pre_execution_commands (List[str], optional): Commands before execution
63
- pre_execution_script (str, optional): Script to run before execution
64
- enable_network_isolation (bool, optional): Enable network isolation
65
- custom_file_filter (CustomFileFilter, optional): File filtering
66
- spark_config (SparkConfig, optional): Spark configuration
67
68
Returns:
69
- RemoteFunction: Decorated function that executes remotely
70
"""
71
```
72
73
### Remote Executor
74
75
Direct execution manager for running functions remotely with fine-grained control over execution parameters.
76
77
```python { .api }
78
class RemoteExecutor:
79
"""
80
Executor for running Python functions remotely on SageMaker compute.
81
82
Parameters:
83
- instance_type (str): EC2 instance type
84
- instance_count (int, optional): Number of instances
85
- role (str, optional): IAM role ARN
86
- image_uri (str, optional): Docker image URI
87
- max_runtime_in_seconds (int, optional): Maximum runtime
88
- keep_alive_period_in_seconds (int, optional): Keep alive period
89
- volume_size (int, optional): EBS volume size in GB
90
- volume_kms_key (str, optional): KMS key for volume encryption
91
- sagemaker_session (Session, optional): SageMaker session
92
- environment (dict, optional): Environment variables
93
- dependencies (str, optional): Dependency management
94
- include_local_workdir (bool, optional): Include local working directory
95
- custom_file_filter (CustomFileFilter, optional): File filtering
96
- pre_execution_commands (List[str], optional): Pre-execution commands
97
- pre_execution_script (str, optional): Pre-execution script
98
- job_conda_env (str, optional): Conda environment name
99
- tags (List[dict], optional): Resource tags
100
- **kwargs: Additional parameters
101
"""
102
def __init__(self, instance_type: str, **kwargs): ...
103
104
def submit(self, func, *args, **kwargs) -> 'Future':
105
"""
106
Submit a function for remote execution.
107
108
Parameters:
109
- func (callable): Function to execute remotely
110
- *args: Positional arguments for the function
111
- **kwargs: Keyword arguments for the function
112
113
Returns:
114
- Future: Future object for retrieving results
115
"""
116
117
def map(self, func, *iterables) -> List['Future']:
118
"""
119
Apply function to iterables in parallel across remote instances.
120
121
Parameters:
122
- func (callable): Function to apply
123
- *iterables: Input iterables
124
125
Returns:
126
- List[Future]: List of Future objects
127
"""
128
129
def shutdown(self, wait: bool = True):
130
"""
131
Shutdown the remote executor and clean up resources.
132
133
Parameters:
134
- wait (bool, optional): Wait for running jobs to complete
135
"""
136
```
137
138
### File Management and Filtering
139
140
Classes for managing file transfer and filtering for remote execution environments.
141
142
```python { .api }
143
class CustomFileFilter:
144
"""
145
Custom file filter for controlling which local files are transferred to remote execution.
146
147
Parameters:
148
- ignore_name_patterns (List[str], optional): File name patterns to ignore
149
- ignore_path_patterns (List[str], optional): Path patterns to ignore
150
- ignore_directories (List[str], optional): Directory names to ignore
151
- ignore_files (List[str], optional): Specific files to ignore
152
"""
153
def __init__(self, ignore_name_patterns: List[str] = None,
154
ignore_path_patterns: List[str] = None,
155
ignore_directories: List[str] = None,
156
ignore_files: List[str] = None): ...
157
158
class IncludeLocalWorkDirFilter:
159
"""
160
Filter that includes the local working directory in remote execution.
161
"""
162
def __init__(self): ...
163
```
164
165
### Checkpointing and State Management
166
167
Configuration for managing long-running remote functions with checkpointing capabilities.
168
169
```python { .api }
170
class CheckpointLocation:
171
"""
172
Configuration for checkpointing remote function state to enable recovery.
173
174
Parameters:
175
- checkpoint_s3_uri (str): S3 URI for storing checkpoints
176
- local_path (str, optional): Local path for checkpoint files
177
- kms_key_id (str, optional): KMS key for checkpoint encryption
178
"""
179
def __init__(self, checkpoint_s3_uri: str, local_path: str = "/tmp/checkpoints",
180
kms_key_id: str = None): ...
181
```
182
183
### Spark Integration
184
185
Configuration for running Spark workloads as remote functions on SageMaker.
186
187
```python { .api }
188
class SparkConfig:
189
"""
190
Configuration for Apache Spark integration with remote functions.
191
192
Parameters:
193
- submit_app (str): Path to Spark application
194
- submit_py_files (List[str], optional): Python files for Spark
195
- submit_files (List[str], optional): Additional files for Spark
196
- submit_jars (List[str], optional): JAR files for Spark
197
- submit_class (str, optional): Main class for Spark application
198
- spark_event_logs_s3_uri (str, optional): S3 URI for Spark event logs
199
- configuration (dict, optional): Spark configuration properties
200
"""
201
def __init__(self, submit_app: str, **kwargs): ...
202
```
203
204
### Results and Future Objects
205
206
Classes for managing asynchronous execution results and status monitoring.
207
208
```python { .api }
209
class Future:
210
"""
211
Future object representing the result of a remote function execution.
212
213
Methods available:
214
- result(): Get the execution result (blocks until complete)
215
- done(): Check if execution is complete
216
- running(): Check if execution is running
217
- cancelled(): Check if execution was cancelled
218
- cancel(): Cancel the execution
219
- exception(): Get exception if execution failed
220
- add_done_callback(): Add callback for completion
221
"""
222
def result(self, timeout: float = None):
223
"""
224
Get the result of the remote execution.
225
226
Parameters:
227
- timeout (float, optional): Timeout in seconds
228
229
Returns:
230
- Any: Result of the remote function
231
"""
232
233
def done(self) -> bool:
234
"""Check if the remote execution is complete."""
235
236
def running(self) -> bool:
237
"""Check if the remote execution is currently running."""
238
239
def cancelled(self) -> bool:
240
"""Check if the remote execution was cancelled."""
241
242
def cancel(self) -> bool:
243
"""Cancel the remote execution if possible."""
244
245
def exception(self, timeout: float = None):
246
"""Get the exception raised by remote execution, if any."""
247
248
def add_done_callback(self, fn):
249
"""Add a callback to be called when execution completes."""
250
```
251
252
## Usage Examples
253
254
### Basic Remote Function
255
256
```python
257
from sagemaker.remote_function import remote
258
259
# Define a compute-intensive function
260
@remote(
261
instance_type="ml.m5.4xlarge",
262
role=role,
263
keep_alive_period_in_seconds=300 # Keep instance alive for 5 minutes
264
)
265
def process_large_dataset(data_path, output_path, num_workers=4):
266
import pandas as pd
267
import numpy as np
268
from multiprocessing import Pool
269
270
# Load and process data
271
df = pd.read_csv(data_path)
272
273
def process_chunk(chunk):
274
# CPU-intensive processing
275
return chunk.apply(lambda x: x ** 2 + np.log(x + 1))
276
277
# Parallel processing
278
chunks = np.array_split(df, num_workers)
279
with Pool(num_workers) as pool:
280
results = pool.map(process_chunk, chunks)
281
282
# Combine results
283
processed_df = pd.concat(results)
284
processed_df.to_csv(output_path, index=False)
285
286
return f"Processed {len(processed_df)} rows"
287
288
# Execute remotely (asynchronous)
289
future = process_large_dataset(
290
"s3://bucket/large-dataset.csv",
291
"s3://bucket/processed-dataset.csv",
292
num_workers=8
293
)
294
295
# Get result (blocks until complete)
296
result = future.result()
297
print(result) # "Processed 1000000 rows"
298
```
299
300
### Distributed Processing with Remote Executor
301
302
```python
303
from sagemaker.remote_function import RemoteExecutor
304
305
# Create remote executor for distributed processing
306
executor = RemoteExecutor(
307
instance_type="ml.c5.2xlarge",
308
instance_count=3, # 3 instances for parallel processing
309
role=role,
310
max_runtime_in_seconds=3600, # 1 hour timeout
311
use_spot_instances=True,
312
max_wait_time_in_seconds=600
313
)
314
315
def process_file_batch(file_paths):
316
"""Process a batch of files"""
317
import boto3
318
import pandas as pd
319
320
s3 = boto3.client('s3')
321
results = []
322
323
for file_path in file_paths:
324
# Download and process each file
325
bucket, key = file_path.replace('s3://', '').split('/', 1)
326
obj = s3.get_object(Bucket=bucket, Key=key)
327
df = pd.read_csv(obj['Body'])
328
329
# Process the data
330
processed = df.groupby('category').agg({
331
'value': ['mean', 'std', 'count']
332
}).round(2)
333
334
results.append({
335
'file': file_path,
336
'summary': processed.to_dict()
337
})
338
339
return results
340
341
# List of file batches to process
342
file_batches = [
343
["s3://bucket/data/batch1/file1.csv", "s3://bucket/data/batch1/file2.csv"],
344
["s3://bucket/data/batch2/file1.csv", "s3://bucket/data/batch2/file2.csv"],
345
["s3://bucket/data/batch3/file1.csv", "s3://bucket/data/batch3/file2.csv"]
346
]
347
348
# Submit parallel processing jobs
349
futures = [executor.submit(process_file_batch, batch) for batch in file_batches]
350
351
# Collect results as they complete
352
results = []
353
for future in futures:
354
batch_result = future.result()
355
results.extend(batch_result)
356
357
# Clean up
358
executor.shutdown()
359
360
print(f"Processed {len(results)} file batches")
361
```
362
363
### Advanced Configuration with Custom Dependencies
364
365
```python
366
from sagemaker.remote_function import remote, CustomFileFilter
367
368
# Custom file filter to exclude unnecessary files
369
file_filter = CustomFileFilter(
370
ignore_name_patterns=["*.pyc", "*.log", ".DS_Store"],
371
ignore_directories=[".git", "__pycache__", "node_modules"],
372
ignore_path_patterns=["*/tests/*", "*/docs/*"]
373
)
374
375
@remote(
376
instance_type="ml.p3.2xlarge", # GPU instance
377
role=role,
378
image_uri="your-account.dkr.ecr.region.amazonaws.com/ml-training:latest",
379
dependencies="requirements.txt", # Install dependencies from file
380
custom_file_filter=file_filter,
381
pre_execution_commands=[
382
"pip install --upgrade pip",
383
"pip install torch torchvision --extra-index-url https://download.pytorch.org/whl/cu118"
384
],
385
environment={
386
"CUDA_VISIBLE_DEVICES": "0",
387
"OMP_NUM_THREADS": "4"
388
},
389
max_runtime_in_seconds=7200, # 2 hours
390
volume_size=100 # 100 GB storage
391
)
392
def train_model(config_path, data_path, output_path):
393
"""Train a deep learning model remotely"""
394
import torch
395
import torch.nn as nn
396
import yaml
397
import joblib
398
399
# Load configuration
400
with open(config_path, 'r') as f:
401
config = yaml.safe_load(f)
402
403
# Setup training
404
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
405
print(f"Training on device: {device}")
406
407
# Your model training code here
408
# model = create_model(config)
409
# train_data = load_data(data_path)
410
# trained_model = train(model, train_data, config)
411
412
# Save model
413
# torch.save(trained_model.state_dict(), output_path)
414
415
return {
416
"status": "completed",
417
"device": str(device),
418
"model_path": output_path
419
}
420
421
# Execute with custom configuration
422
result_future = train_model(
423
config_path="./config/model_config.yaml",
424
data_path="s3://bucket/training-data/",
425
output_path="s3://bucket/models/trained_model.pth"
426
)
427
428
# Monitor progress and get result
429
result = result_future.result()
430
print(f"Training completed: {result}")
431
```
432
433
### Spark Integration
434
435
```python
436
from sagemaker.remote_function import remote, SparkConfig
437
438
# Configure Spark
439
spark_config = SparkConfig(
440
submit_app="data_processing.py",
441
submit_py_files=["utils.py", "transformers.py"],
442
configuration={
443
"spark.executor.memory": "4g",
444
"spark.executor.cores": "2",
445
"spark.sql.adaptive.enabled": "true",
446
"spark.sql.adaptive.coalescePartitions.enabled": "true"
447
},
448
spark_event_logs_s3_uri="s3://bucket/spark-logs/"
449
)
450
451
@remote(
452
instance_type="ml.m5.4xlarge",
453
instance_count=2,
454
role=role,
455
spark_config=spark_config,
456
max_runtime_in_seconds=3600
457
)
458
def process_big_data(input_path, output_path, num_partitions=100):
459
"""Process large datasets with Spark"""
460
from pyspark.sql import SparkSession
461
from pyspark.sql.functions import col, avg, count, stddev
462
463
# Create Spark session
464
spark = SparkSession.builder.appName("DataProcessing").getOrCreate()
465
466
# Read data
467
df = spark.read.option("header", "true").csv(input_path)
468
469
# Process data
470
result = df.groupBy("category") \
471
.agg(avg("value").alias("avg_value"),
472
count("*").alias("count"),
473
stddev("value").alias("std_value")) \
474
.coalesce(num_partitions)
475
476
# Write results
477
result.write.mode("overwrite").option("header", "true").csv(output_path)
478
479
# Get summary stats
480
total_rows = df.count()
481
num_categories = result.count()
482
483
spark.stop()
484
485
return {
486
"total_rows": total_rows,
487
"num_categories": num_categories,
488
"output_path": output_path
489
}
490
491
# Execute Spark job remotely
492
spark_future = process_big_data(
493
input_path="s3://bucket/big-data/*.csv",
494
output_path="s3://bucket/processed-data/",
495
num_partitions=50
496
)
497
498
result = spark_future.result()
499
print(f"Processed {result['total_rows']} rows across {result['num_categories']} categories")
500
```