or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

app-decorators.mdconfiguration.mddata-management.mdexecutors.mdindex.mdlaunchers.mdmonitoring.mdproviders.mdworkflow-management.md

executors.mddocs/

0

# Executors

1

2

Parsl executors are the execution backends that run parallel tasks on different computing resources. Each executor type is optimized for specific use cases, from local parallel execution to large-scale distributed computing on HPC systems and cloud platforms.

3

4

## Capabilities

5

6

### HighThroughputExecutor

7

8

Scalable executor designed for high-throughput parallel workflows using an interchange process and worker pools. Ideal for running many tasks across multiple nodes.

9

10

```python { .api }

11

class HighThroughputExecutor:

12

def __init__(self, label='HighThroughputExecutor', provider=None,

13

launch_cmd=None, address=None, worker_ports=None,

14

worker_port_range=(54000, 55000), interchange_port_range=(55000, 56000),

15

storage_access=None, working_dir=None, worker_debug=False,

16

cores_per_worker=1, mem_per_worker=None, max_workers=float('inf'),

17

prefetch_capacity=0, heartbeat_threshold=120, heartbeat_period=30,

18

poll_period=10, address_probe_timeout=30, worker_logdir_root=None,

19

container_image=None, encrypted=None, cert_dir=None):

20

"""

21

High-throughput executor for scalable parallel execution.

22

23

Parameters:

24

- label: Executor label for task targeting

25

- provider: ExecutionProvider for resource management

26

- cores_per_worker: CPU cores per worker process (default: 1)

27

- max_workers: Maximum number of workers (default: unlimited)

28

- mem_per_worker: Memory per worker in MB

29

- prefetch_capacity: Number of tasks to prefetch per worker

30

- heartbeat_threshold: Worker heartbeat timeout in seconds

31

- worker_debug: Enable worker debugging logs

32

- container_image: Container image for containerized execution

33

- encrypted: Enable encrypted communication

34

"""

35

```

36

37

**Usage Example:**

38

39

```python

40

from parsl.executors import HighThroughputExecutor

41

from parsl.providers import LocalProvider, SlurmProvider

42

43

# Local high-throughput execution

44

htex = HighThroughputExecutor(

45

label='local_htex',

46

cores_per_worker=2,

47

max_workers=8,

48

provider=LocalProvider(

49

init_blocks=1,

50

max_blocks=2

51

)

52

)

53

54

# HPC cluster execution

55

htex_hpc = HighThroughputExecutor(

56

label='cluster_htex',

57

cores_per_worker=4,

58

mem_per_worker=4000, # 4GB per worker

59

max_workers=100,

60

provider=SlurmProvider(

61

partition='compute',

62

nodes_per_block=2,

63

init_blocks=1,

64

max_blocks=10,

65

walltime='02:00:00'

66

)

67

)

68

```

69

70

### ThreadPoolExecutor

71

72

Local thread-based executor for lightweight parallel tasks that don't require distributed execution. Best for I/O-bound tasks and quick local parallelism.

73

74

```python { .api }

75

class ThreadPoolExecutor:

76

def __init__(self, max_threads=2, thread_name_prefix='', label='threads'):

77

"""

78

Thread pool executor for local parallel execution.

79

80

Parameters:

81

- max_threads: Maximum number of concurrent threads (default: 2)

82

- thread_name_prefix: Prefix for thread names

83

- label: Executor label for task targeting

84

"""

85

```

86

87

**Usage Example:**

88

89

```python

90

from parsl.executors import ThreadPoolExecutor

91

92

# Light parallel tasks

93

threads_exec = ThreadPoolExecutor(

94

max_threads=4,

95

label='local_threads'

96

)

97

98

# I/O intensive tasks

99

io_exec = ThreadPoolExecutor(

100

max_threads=10,

101

label='io_tasks'

102

)

103

```

104

105

### WorkQueueExecutor

106

107

Integration with the Work Queue distributed computing system, enabling dynamic resource allocation and fault tolerance across diverse computing resources.

108

109

```python { .api }

110

class WorkQueueExecutor:

111

def __init__(self, label='WorkQueue', port=9123, project_name=None,

112

env=None, shared_fs=True, use_cache=True, init_command='',

113

worker_executable=None, container_image=None,

114

autolabel=True, autocategory=True, should_transfer_worker_stdout=False,

115

worker_options=None, factory_options=None):

116

"""

117

Work Queue executor for dynamic resource management.

118

119

Parameters:

120

- port: Port for Work Queue master (default: 9123)

121

- project_name: Project name for worker discovery

122

- shared_fs: Whether workers share filesystem (default: True)

123

- use_cache: Enable result caching (default: True)

124

- container_image: Container image for workers

125

- autolabel: Enable automatic worker labeling

126

- autocategory: Enable automatic task categorization

127

"""

128

```

129

130

**Usage Example:**

131

132

```python

133

from parsl.executors import WorkQueueExecutor

134

135

# Work Queue with dynamic workers

136

wq_exec = WorkQueueExecutor(

137

label='work_queue',

138

port=9123,

139

project_name='my_parsl_project',

140

shared_fs=False, # Handle file transfers

141

autolabel=True,

142

autocategory=True

143

)

144

```

145

146

### MPIExecutor

147

148

Simplified interface for HighThroughputExecutor tuned for executing multi-node (e.g., MPI) tasks. Places a single pool of workers on the first node of a block, which can then make system calls using MPI launchers.

149

150

```python { .api }

151

class MPIExecutor:

152

def __init__(self, label='MPIExecutor', provider=None, launch_cmd=None,

153

interchange_launch_cmd=None, address=None, loopback_address='127.0.0.1',

154

worker_ports=None, worker_port_range=(54000, 55000),

155

interchange_port_range=(55000, 56000), storage_access=None,

156

working_dir=None, worker_debug=False, max_workers_per_block=1,

157

prefetch_capacity=0, heartbeat_threshold=120, heartbeat_period=30,

158

drain_period=None, poll_period=10, address_probe_timeout=None,

159

worker_logdir_root=None, mpi_launcher='mpiexec',

160

block_error_handler=True, encrypted=False):

161

"""

162

MPI executor for multi-node parallel applications.

163

164

Parameters:

165

- label: Executor label for task targeting (default: 'MPIExecutor')

166

- provider: ExecutionProvider for resource management (default: LocalProvider)

167

- max_workers_per_block: Maximum MPI applications per block (default: 1)

168

- mpi_launcher: MPI launcher type ('mpiexec', 'srun', 'aprun') (default: 'mpiexec')

169

- block_error_handler: Enable automatic block error handling (default: True)

170

- encrypted: Enable encrypted communication (default: False)

171

- All other parameters inherited from HighThroughputExecutor

172

"""

173

```

174

175

**Usage Example:**

176

177

```python

178

from parsl.executors import MPIExecutor

179

from parsl.providers import LocalProvider

180

from parsl.launchers import SimpleLauncher

181

182

# Local MPI execution (requires SimpleLauncher)

183

mpi_exec = MPIExecutor(

184

label='local_mpi',

185

max_workers_per_block=1,

186

mpi_launcher='mpiexec',

187

provider=LocalProvider(

188

launcher=SimpleLauncher() # Required for MPI mode

189

)

190

)

191

192

# HPC cluster MPI execution

193

from parsl.providers import SlurmProvider

194

195

mpi_hpc = MPIExecutor(

196

label='cluster_mpi',

197

max_workers_per_block=1,

198

mpi_launcher='srun',

199

provider=SlurmProvider(

200

partition='compute',

201

nodes_per_block=4,

202

launcher=SimpleLauncher(), # Must use SimpleLauncher

203

walltime='01:00:00'

204

)

205

)

206

```

207

208

### FluxExecutor

209

210

Executor that uses Flux to schedule and run jobs, wrapping every callable into a Flux job. Excellent for jobs with large resource requirements and varying resource needs, but not suitable for large numbers of small, fast jobs (Flux is capped at ~50 jobs per second).

211

212

```python { .api }

213

class FluxExecutor:

214

def __init__(self, provider=None, working_dir=None, label='FluxExecutor',

215

flux_executor_kwargs={}, flux_path=None, launch_cmd=None):

216

"""

217

Flux executor for advanced resource management with per-task resource specifications.

218

219

Parameters:

220

- provider: ExecutionProvider for compute resources (default: LocalProvider)

221

- working_dir: Directory for executor files (auto-generated if None)

222

- label: Executor label for task targeting (default: 'FluxExecutor')

223

- flux_executor_kwargs: Keyword arguments passed to flux.job.FluxExecutor

224

- flux_path: Path to flux installation (searches PATH if None)

225

- launch_cmd: Command for launching executor backend (has reasonable default)

226

"""

227

```

228

229

**Resource Specification Support:**

230

231

FluxExecutor supports detailed per-task resource specifications:

232

233

```python

234

# Supported resource specification keys:

235

resource_spec = {

236

'num_tasks': 1, # Number of tasks (MPI ranks)

237

'cores_per_task': 1, # Cores per task

238

'gpus_per_task': 0, # GPUs per task

239

'num_nodes': 0 # Distribute across N nodes if > 0

240

}

241

```

242

243

**Usage Examples:**

244

245

```python

246

from parsl.executors import FluxExecutor

247

from parsl.providers import LocalProvider, SlurmProvider

248

from parsl.launchers import SrunLauncher

249

250

# Local Flux execution

251

flux_exec = FluxExecutor(

252

label='local_flux',

253

provider=LocalProvider()

254

)

255

256

# HPC cluster Flux execution

257

flux_hpc = FluxExecutor(

258

label='cluster_flux',

259

provider=SlurmProvider(

260

partition='compute',

261

account='my_account',

262

launcher=SrunLauncher(overrides='--mpibind=off'),

263

nodes_per_block=1,

264

walltime='00:30:00'

265

)

266

)

267

268

# Submit tasks with resource specifications

269

@python_app(executors=['cluster_flux'])

270

def compute_task(data, parsl_resource_specification={}):

271

# Task requiring specific resources

272

return process_data(data)

273

274

# Execute with 4 cores per task

275

future = compute_task(

276

large_dataset,

277

parsl_resource_specification={'cores_per_task': 4}

278

)

279

280

# MPI task with multiple ranks across nodes

281

@python_app(executors=['cluster_flux'])

282

def mpi_task(data, parsl_resource_specification={}):

283

# MPI-aware computation

284

return mpi_computation(data)

285

286

mpi_future = mpi_task(

287

mpi_data,

288

parsl_resource_specification={

289

'num_tasks': 8, # 8 MPI ranks

290

'cores_per_task': 2, # 2 cores per rank

291

'num_nodes': 2 # Distribute across 2 nodes

292

}

293

)

294

```

295

296

### RadicalPilotExecutor

297

298

Integration with the RADICAL Pilot framework for large-scale distributed computing with advanced resource management.

299

300

```python { .api }

301

class RadicalPilotExecutor:

302

def __init__(self, label='RadicalPilot', resource_config=None,

303

bulk_mode=True, launch_cmd=None):

304

"""

305

RADICAL Pilot executor for large-scale distributed computing.

306

307

Parameters:

308

- label: Executor label for task targeting

309

- resource_config: ResourceConfig for RADICAL Pilot setup

310

- bulk_mode: Enable bulk task submission (default: True)

311

- launch_cmd: Custom launch command

312

"""

313

```

314

315

**Usage Example:**

316

317

```python

318

from parsl.executors.radical import RadicalPilotExecutor, ResourceConfig

319

320

# RADICAL Pilot configuration

321

resource_config = ResourceConfig(

322

resource='local.localhost',

323

walltime=60, # minutes

324

cpus=4,

325

gpus=0,

326

project='my_project'

327

)

328

329

rp_exec = RadicalPilotExecutor(

330

label='radical_pilot',

331

resource_config=resource_config,

332

bulk_mode=True

333

)

334

```

335

336

## Executor Selection and Task Targeting

337

338

Control which executors run specific tasks using executor labels:

339

340

```python

341

from parsl import python_app

342

343

@python_app(executors=['local_threads'])

344

def light_task():

345

return "completed on threads"

346

347

@python_app(executors=['cluster_htex'])

348

def heavy_task():

349

import time

350

time.sleep(60) # Heavy computation

351

return "completed on cluster"

352

353

@python_app(executors=['mpi_tasks'])

354

def mpi_task():

355

# MPI-aware code

356

from mpi4py import MPI

357

comm = MPI.COMM_WORLD

358

return f"MPI rank {comm.Get_rank()}"

359

```

360

361

## Resource Specification

362

363

Specify resource requirements for tasks:

364

365

```python

366

@python_app

367

def resource_intensive_task(data, parsl_resource_specification={}):

368

"""Task with specific resource needs."""

369

# Process data requiring specific resources

370

return processed_data

371

372

# Execute with resource requirements

373

future = resource_intensive_task(

374

large_dataset,

375

parsl_resource_specification={

376

'cores': 8,

377

'memory': '16GB',

378

'disk': '100GB',

379

'walltime': '02:00:00'

380

}

381

)

382

```

383

384

## Executor Lifecycle Management

385

386

Executors are managed automatically by the DataFlowKernel:

387

388

```python

389

from parsl.config import Config

390

import parsl

391

392

# Executors start when configuration is loaded

393

config = Config(executors=[htex, threads_exec, wq_exec])

394

parsl.load(config)

395

396

# Submit tasks to different executors

397

futures = []

398

for i in range(100):

399

if i % 2 == 0:

400

futures.append(light_task()) # -> threads_exec

401

else:

402

futures.append(heavy_task()) # -> htex

403

404

# Wait for completion

405

results = [f.result() for f in futures]

406

407

# Executors shutdown when DFK is cleared

408

parsl.clear()

409

```

410

411

## Executor Error Handling

412

413

Handle executor-specific errors and failures:

414

415

```python

416

from parsl.executors.errors import ExecutorError, ScalingError

417

418

try:

419

parsl.load(config)

420

except ExecutorError as e:

421

print(f"Executor initialization failed: {e}")

422

except ScalingError as e:

423

print(f"Resource scaling error: {e}")

424

425

# Monitor executor status

426

dfk = parsl.dfk()

427

for executor in dfk.executors.values():

428

print(f"Executor {executor.label}: {executor.status()}")

429

```