or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

amazon-algorithms.mdautoml.mdcore-training.mddata-processing.mddebugging-profiling.mdexperiments.mdframework-training.mdhyperparameter-tuning.mdindex.mdmodel-monitoring.mdmodel-serving.mdremote-functions.md

remote-functions.mddocs/

0

# Remote Functions

1

2

Execute Python functions remotely on SageMaker compute with automatic dependency management, data transfer, and result retrieval, enabling seamless scaling of compute-intensive workloads without infrastructure management.

3

4

## Capabilities

5

6

### Remote Function Decorator

7

8

The `@remote` decorator transforms regular Python functions into remotely executable functions on SageMaker managed infrastructure.

9

10

```python { .api }

11

@remote(

12

instance_type: str,

13

instance_count: int = 1,

14

role: str = None,

15

image_uri: str = None,

16

volume_size: int = 30,

17

volume_kms_key: str = None,

18

max_runtime_in_seconds: int = None,

19

keep_alive_period_in_seconds: int = 0,

20

base_job_name: str = None,

21

sagemaker_session: Session = None,

22

environment: dict = None,

23

tags: List[dict] = None,

24

subnets: List[str] = None,

25

security_group_ids: List[str] = None,

26

encrypt_inter_container_traffic: bool = False,

27

use_spot_instances: bool = False,

28

max_wait_time_in_seconds: int = None,

29

max_retry_attempts: int = 1,

30

dependencies: str = "auto",

31

pre_execution_commands: List[str] = None,

32

pre_execution_script: str = None,

33

enable_network_isolation: bool = False,

34

custom_file_filter: 'CustomFileFilter' = None,

35

spark_config: 'SparkConfig' = None,

36

**kwargs

37

)

38

def remote_function():

39

"""

40

Decorator to execute Python functions remotely on SageMaker.

41

42

Parameters:

43

- instance_type (str): EC2 instance type for remote execution

44

- instance_count (int, optional): Number of instances for distributed execution

45

- role (str, optional): IAM role ARN with SageMaker permissions

46

- image_uri (str, optional): Custom Docker image URI

47

- volume_size (int, optional): EBS volume size in GB

48

- volume_kms_key (str, optional): KMS key for volume encryption

49

- max_runtime_in_seconds (int, optional): Maximum execution time

50

- keep_alive_period_in_seconds (int, optional): Keep infrastructure alive

51

- base_job_name (str, optional): Base name for remote jobs

52

- sagemaker_session (Session, optional): SageMaker session

53

- environment (dict, optional): Environment variables

54

- tags (List[dict], optional): Resource tags

55

- subnets (List[str], optional): VPC subnet IDs

56

- security_group_ids (List[str], optional): VPC security group IDs

57

- encrypt_inter_container_traffic (bool, optional): Enable encryption

58

- use_spot_instances (bool, optional): Use EC2 Spot instances

59

- max_wait_time_in_seconds (int, optional): Max wait time for Spot

60

- max_retry_attempts (int, optional): Maximum retry attempts

61

- dependencies (str, optional): Dependency management ("auto" or path)

62

- pre_execution_commands (List[str], optional): Commands before execution

63

- pre_execution_script (str, optional): Script to run before execution

64

- enable_network_isolation (bool, optional): Enable network isolation

65

- custom_file_filter (CustomFileFilter, optional): File filtering

66

- spark_config (SparkConfig, optional): Spark configuration

67

68

Returns:

69

- RemoteFunction: Decorated function that executes remotely

70

"""

71

```

72

73

### Remote Executor

74

75

Direct execution manager for running functions remotely with fine-grained control over execution parameters.

76

77

```python { .api }

78

class RemoteExecutor:

79

"""

80

Executor for running Python functions remotely on SageMaker compute.

81

82

Parameters:

83

- instance_type (str): EC2 instance type

84

- instance_count (int, optional): Number of instances

85

- role (str, optional): IAM role ARN

86

- image_uri (str, optional): Docker image URI

87

- max_runtime_in_seconds (int, optional): Maximum runtime

88

- keep_alive_period_in_seconds (int, optional): Keep alive period

89

- volume_size (int, optional): EBS volume size in GB

90

- volume_kms_key (str, optional): KMS key for volume encryption

91

- sagemaker_session (Session, optional): SageMaker session

92

- environment (dict, optional): Environment variables

93

- dependencies (str, optional): Dependency management

94

- include_local_workdir (bool, optional): Include local working directory

95

- custom_file_filter (CustomFileFilter, optional): File filtering

96

- pre_execution_commands (List[str], optional): Pre-execution commands

97

- pre_execution_script (str, optional): Pre-execution script

98

- job_conda_env (str, optional): Conda environment name

99

- tags (List[dict], optional): Resource tags

100

- **kwargs: Additional parameters

101

"""

102

def __init__(self, instance_type: str, **kwargs): ...

103

104

def submit(self, func, *args, **kwargs) -> 'Future':

105

"""

106

Submit a function for remote execution.

107

108

Parameters:

109

- func (callable): Function to execute remotely

110

- *args: Positional arguments for the function

111

- **kwargs: Keyword arguments for the function

112

113

Returns:

114

- Future: Future object for retrieving results

115

"""

116

117

def map(self, func, *iterables) -> List['Future']:

118

"""

119

Apply function to iterables in parallel across remote instances.

120

121

Parameters:

122

- func (callable): Function to apply

123

- *iterables: Input iterables

124

125

Returns:

126

- List[Future]: List of Future objects

127

"""

128

129

def shutdown(self, wait: bool = True):

130

"""

131

Shutdown the remote executor and clean up resources.

132

133

Parameters:

134

- wait (bool, optional): Wait for running jobs to complete

135

"""

136

```

137

138

### File Management and Filtering

139

140

Classes for managing file transfer and filtering for remote execution environments.

141

142

```python { .api }

143

class CustomFileFilter:

144

"""

145

Custom file filter for controlling which local files are transferred to remote execution.

146

147

Parameters:

148

- ignore_name_patterns (List[str], optional): File name patterns to ignore

149

- ignore_path_patterns (List[str], optional): Path patterns to ignore

150

- ignore_directories (List[str], optional): Directory names to ignore

151

- ignore_files (List[str], optional): Specific files to ignore

152

"""

153

def __init__(self, ignore_name_patterns: List[str] = None,

154

ignore_path_patterns: List[str] = None,

155

ignore_directories: List[str] = None,

156

ignore_files: List[str] = None): ...

157

158

class IncludeLocalWorkDirFilter:

159

"""

160

Filter that includes the local working directory in remote execution.

161

"""

162

def __init__(self): ...

163

```

164

165

### Checkpointing and State Management

166

167

Configuration for managing long-running remote functions with checkpointing capabilities.

168

169

```python { .api }

170

class CheckpointLocation:

171

"""

172

Configuration for checkpointing remote function state to enable recovery.

173

174

Parameters:

175

- checkpoint_s3_uri (str): S3 URI for storing checkpoints

176

- local_path (str, optional): Local path for checkpoint files

177

- kms_key_id (str, optional): KMS key for checkpoint encryption

178

"""

179

def __init__(self, checkpoint_s3_uri: str, local_path: str = "/tmp/checkpoints",

180

kms_key_id: str = None): ...

181

```

182

183

### Spark Integration

184

185

Configuration for running Spark workloads as remote functions on SageMaker.

186

187

```python { .api }

188

class SparkConfig:

189

"""

190

Configuration for Apache Spark integration with remote functions.

191

192

Parameters:

193

- submit_app (str): Path to Spark application

194

- submit_py_files (List[str], optional): Python files for Spark

195

- submit_files (List[str], optional): Additional files for Spark

196

- submit_jars (List[str], optional): JAR files for Spark

197

- submit_class (str, optional): Main class for Spark application

198

- spark_event_logs_s3_uri (str, optional): S3 URI for Spark event logs

199

- configuration (dict, optional): Spark configuration properties

200

"""

201

def __init__(self, submit_app: str, **kwargs): ...

202

```

203

204

### Results and Future Objects

205

206

Classes for managing asynchronous execution results and status monitoring.

207

208

```python { .api }

209

class Future:

210

"""

211

Future object representing the result of a remote function execution.

212

213

Methods available:

214

- result(): Get the execution result (blocks until complete)

215

- done(): Check if execution is complete

216

- running(): Check if execution is running

217

- cancelled(): Check if execution was cancelled

218

- cancel(): Cancel the execution

219

- exception(): Get exception if execution failed

220

- add_done_callback(): Add callback for completion

221

"""

222

def result(self, timeout: float = None):

223

"""

224

Get the result of the remote execution.

225

226

Parameters:

227

- timeout (float, optional): Timeout in seconds

228

229

Returns:

230

- Any: Result of the remote function

231

"""

232

233

def done(self) -> bool:

234

"""Check if the remote execution is complete."""

235

236

def running(self) -> bool:

237

"""Check if the remote execution is currently running."""

238

239

def cancelled(self) -> bool:

240

"""Check if the remote execution was cancelled."""

241

242

def cancel(self) -> bool:

243

"""Cancel the remote execution if possible."""

244

245

def exception(self, timeout: float = None):

246

"""Get the exception raised by remote execution, if any."""

247

248

def add_done_callback(self, fn):

249

"""Add a callback to be called when execution completes."""

250

```

251

252

## Usage Examples

253

254

### Basic Remote Function

255

256

```python

257

from sagemaker.remote_function import remote

258

259

# Define a compute-intensive function

260

@remote(

261

instance_type="ml.m5.4xlarge",

262

role=role,

263

keep_alive_period_in_seconds=300 # Keep instance alive for 5 minutes

264

)

265

def process_large_dataset(data_path, output_path, num_workers=4):

266

import pandas as pd

267

import numpy as np

268

from multiprocessing import Pool

269

270

# Load and process data

271

df = pd.read_csv(data_path)

272

273

def process_chunk(chunk):

274

# CPU-intensive processing

275

return chunk.apply(lambda x: x ** 2 + np.log(x + 1))

276

277

# Parallel processing

278

chunks = np.array_split(df, num_workers)

279

with Pool(num_workers) as pool:

280

results = pool.map(process_chunk, chunks)

281

282

# Combine results

283

processed_df = pd.concat(results)

284

processed_df.to_csv(output_path, index=False)

285

286

return f"Processed {len(processed_df)} rows"

287

288

# Execute remotely (asynchronous)

289

future = process_large_dataset(

290

"s3://bucket/large-dataset.csv",

291

"s3://bucket/processed-dataset.csv",

292

num_workers=8

293

)

294

295

# Get result (blocks until complete)

296

result = future.result()

297

print(result) # "Processed 1000000 rows"

298

```

299

300

### Distributed Processing with Remote Executor

301

302

```python

303

from sagemaker.remote_function import RemoteExecutor

304

305

# Create remote executor for distributed processing

306

executor = RemoteExecutor(

307

instance_type="ml.c5.2xlarge",

308

instance_count=3, # 3 instances for parallel processing

309

role=role,

310

max_runtime_in_seconds=3600, # 1 hour timeout

311

use_spot_instances=True,

312

max_wait_time_in_seconds=600

313

)

314

315

def process_file_batch(file_paths):

316

"""Process a batch of files"""

317

import boto3

318

import pandas as pd

319

320

s3 = boto3.client('s3')

321

results = []

322

323

for file_path in file_paths:

324

# Download and process each file

325

bucket, key = file_path.replace('s3://', '').split('/', 1)

326

obj = s3.get_object(Bucket=bucket, Key=key)

327

df = pd.read_csv(obj['Body'])

328

329

# Process the data

330

processed = df.groupby('category').agg({

331

'value': ['mean', 'std', 'count']

332

}).round(2)

333

334

results.append({

335

'file': file_path,

336

'summary': processed.to_dict()

337

})

338

339

return results

340

341

# List of file batches to process

342

file_batches = [

343

["s3://bucket/data/batch1/file1.csv", "s3://bucket/data/batch1/file2.csv"],

344

["s3://bucket/data/batch2/file1.csv", "s3://bucket/data/batch2/file2.csv"],

345

["s3://bucket/data/batch3/file1.csv", "s3://bucket/data/batch3/file2.csv"]

346

]

347

348

# Submit parallel processing jobs

349

futures = [executor.submit(process_file_batch, batch) for batch in file_batches]

350

351

# Collect results as they complete

352

results = []

353

for future in futures:

354

batch_result = future.result()

355

results.extend(batch_result)

356

357

# Clean up

358

executor.shutdown()

359

360

print(f"Processed {len(results)} file batches")

361

```

362

363

### Advanced Configuration with Custom Dependencies

364

365

```python

366

from sagemaker.remote_function import remote, CustomFileFilter

367

368

# Custom file filter to exclude unnecessary files

369

file_filter = CustomFileFilter(

370

ignore_name_patterns=["*.pyc", "*.log", ".DS_Store"],

371

ignore_directories=[".git", "__pycache__", "node_modules"],

372

ignore_path_patterns=["*/tests/*", "*/docs/*"]

373

)

374

375

@remote(

376

instance_type="ml.p3.2xlarge", # GPU instance

377

role=role,

378

image_uri="your-account.dkr.ecr.region.amazonaws.com/ml-training:latest",

379

dependencies="requirements.txt", # Install dependencies from file

380

custom_file_filter=file_filter,

381

pre_execution_commands=[

382

"pip install --upgrade pip",

383

"pip install torch torchvision --extra-index-url https://download.pytorch.org/whl/cu118"

384

],

385

environment={

386

"CUDA_VISIBLE_DEVICES": "0",

387

"OMP_NUM_THREADS": "4"

388

},

389

max_runtime_in_seconds=7200, # 2 hours

390

volume_size=100 # 100 GB storage

391

)

392

def train_model(config_path, data_path, output_path):

393

"""Train a deep learning model remotely"""

394

import torch

395

import torch.nn as nn

396

import yaml

397

import joblib

398

399

# Load configuration

400

with open(config_path, 'r') as f:

401

config = yaml.safe_load(f)

402

403

# Setup training

404

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

405

print(f"Training on device: {device}")

406

407

# Your model training code here

408

# model = create_model(config)

409

# train_data = load_data(data_path)

410

# trained_model = train(model, train_data, config)

411

412

# Save model

413

# torch.save(trained_model.state_dict(), output_path)

414

415

return {

416

"status": "completed",

417

"device": str(device),

418

"model_path": output_path

419

}

420

421

# Execute with custom configuration

422

result_future = train_model(

423

config_path="./config/model_config.yaml",

424

data_path="s3://bucket/training-data/",

425

output_path="s3://bucket/models/trained_model.pth"

426

)

427

428

# Monitor progress and get result

429

result = result_future.result()

430

print(f"Training completed: {result}")

431

```

432

433

### Spark Integration

434

435

```python

436

from sagemaker.remote_function import remote, SparkConfig

437

438

# Configure Spark

439

spark_config = SparkConfig(

440

submit_app="data_processing.py",

441

submit_py_files=["utils.py", "transformers.py"],

442

configuration={

443

"spark.executor.memory": "4g",

444

"spark.executor.cores": "2",

445

"spark.sql.adaptive.enabled": "true",

446

"spark.sql.adaptive.coalescePartitions.enabled": "true"

447

},

448

spark_event_logs_s3_uri="s3://bucket/spark-logs/"

449

)

450

451

@remote(

452

instance_type="ml.m5.4xlarge",

453

instance_count=2,

454

role=role,

455

spark_config=spark_config,

456

max_runtime_in_seconds=3600

457

)

458

def process_big_data(input_path, output_path, num_partitions=100):

459

"""Process large datasets with Spark"""

460

from pyspark.sql import SparkSession

461

from pyspark.sql.functions import col, avg, count, stddev

462

463

# Create Spark session

464

spark = SparkSession.builder.appName("DataProcessing").getOrCreate()

465

466

# Read data

467

df = spark.read.option("header", "true").csv(input_path)

468

469

# Process data

470

result = df.groupBy("category") \

471

.agg(avg("value").alias("avg_value"),

472

count("*").alias("count"),

473

stddev("value").alias("std_value")) \

474

.coalesce(num_partitions)

475

476

# Write results

477

result.write.mode("overwrite").option("header", "true").csv(output_path)

478

479

# Get summary stats

480

total_rows = df.count()

481

num_categories = result.count()

482

483

spark.stop()

484

485

return {

486

"total_rows": total_rows,

487

"num_categories": num_categories,

488

"output_path": output_path

489

}

490

491

# Execute Spark job remotely

492

spark_future = process_big_data(

493

input_path="s3://bucket/big-data/*.csv",

494

output_path="s3://bucket/processed-data/",

495

num_partitions=50

496

)

497

498

result = spark_future.result()

499

print(f"Processed {result['total_rows']} rows across {result['num_categories']} categories")

500

```