0
# Executors
1
2
Parsl executors are the execution backends that run parallel tasks on different computing resources. Each executor type is optimized for specific use cases, from local parallel execution to large-scale distributed computing on HPC systems and cloud platforms.
3
4
## Capabilities
5
6
### HighThroughputExecutor
7
8
Scalable executor designed for high-throughput parallel workflows using an interchange process and worker pools. Ideal for running many tasks across multiple nodes.
9
10
```python { .api }
11
class HighThroughputExecutor:
12
def __init__(self, label='HighThroughputExecutor', provider=None,
13
launch_cmd=None, address=None, worker_ports=None,
14
worker_port_range=(54000, 55000), interchange_port_range=(55000, 56000),
15
storage_access=None, working_dir=None, worker_debug=False,
16
cores_per_worker=1, mem_per_worker=None, max_workers=float('inf'),
17
prefetch_capacity=0, heartbeat_threshold=120, heartbeat_period=30,
18
poll_period=10, address_probe_timeout=30, worker_logdir_root=None,
19
container_image=None, encrypted=None, cert_dir=None):
20
"""
21
High-throughput executor for scalable parallel execution.
22
23
Parameters:
24
- label: Executor label for task targeting
25
- provider: ExecutionProvider for resource management
26
- cores_per_worker: CPU cores per worker process (default: 1)
27
- max_workers: Maximum number of workers (default: unlimited)
28
- mem_per_worker: Memory per worker in MB
29
- prefetch_capacity: Number of tasks to prefetch per worker
30
- heartbeat_threshold: Worker heartbeat timeout in seconds
31
- worker_debug: Enable worker debugging logs
32
- container_image: Container image for containerized execution
33
- encrypted: Enable encrypted communication
34
"""
35
```
36
37
**Usage Example:**
38
39
```python
40
from parsl.executors import HighThroughputExecutor
41
from parsl.providers import LocalProvider, SlurmProvider
42
43
# Local high-throughput execution
44
htex = HighThroughputExecutor(
45
label='local_htex',
46
cores_per_worker=2,
47
max_workers=8,
48
provider=LocalProvider(
49
init_blocks=1,
50
max_blocks=2
51
)
52
)
53
54
# HPC cluster execution
55
htex_hpc = HighThroughputExecutor(
56
label='cluster_htex',
57
cores_per_worker=4,
58
mem_per_worker=4000, # 4GB per worker
59
max_workers=100,
60
provider=SlurmProvider(
61
partition='compute',
62
nodes_per_block=2,
63
init_blocks=1,
64
max_blocks=10,
65
walltime='02:00:00'
66
)
67
)
68
```
69
70
### ThreadPoolExecutor
71
72
Local thread-based executor for lightweight parallel tasks that don't require distributed execution. Best for I/O-bound tasks and quick local parallelism.
73
74
```python { .api }
75
class ThreadPoolExecutor:
76
def __init__(self, max_threads=2, thread_name_prefix='', label='threads'):
77
"""
78
Thread pool executor for local parallel execution.
79
80
Parameters:
81
- max_threads: Maximum number of concurrent threads (default: 2)
82
- thread_name_prefix: Prefix for thread names
83
- label: Executor label for task targeting
84
"""
85
```
86
87
**Usage Example:**
88
89
```python
90
from parsl.executors import ThreadPoolExecutor
91
92
# Light parallel tasks
93
threads_exec = ThreadPoolExecutor(
94
max_threads=4,
95
label='local_threads'
96
)
97
98
# I/O intensive tasks
99
io_exec = ThreadPoolExecutor(
100
max_threads=10,
101
label='io_tasks'
102
)
103
```
104
105
### WorkQueueExecutor
106
107
Integration with the Work Queue distributed computing system, enabling dynamic resource allocation and fault tolerance across diverse computing resources.
108
109
```python { .api }
110
class WorkQueueExecutor:
111
def __init__(self, label='WorkQueue', port=9123, project_name=None,
112
env=None, shared_fs=True, use_cache=True, init_command='',
113
worker_executable=None, container_image=None,
114
autolabel=True, autocategory=True, should_transfer_worker_stdout=False,
115
worker_options=None, factory_options=None):
116
"""
117
Work Queue executor for dynamic resource management.
118
119
Parameters:
120
- port: Port for Work Queue master (default: 9123)
121
- project_name: Project name for worker discovery
122
- shared_fs: Whether workers share filesystem (default: True)
123
- use_cache: Enable result caching (default: True)
124
- container_image: Container image for workers
125
- autolabel: Enable automatic worker labeling
126
- autocategory: Enable automatic task categorization
127
"""
128
```
129
130
**Usage Example:**
131
132
```python
133
from parsl.executors import WorkQueueExecutor
134
135
# Work Queue with dynamic workers
136
wq_exec = WorkQueueExecutor(
137
label='work_queue',
138
port=9123,
139
project_name='my_parsl_project',
140
shared_fs=False, # Handle file transfers
141
autolabel=True,
142
autocategory=True
143
)
144
```
145
146
### MPIExecutor
147
148
Simplified interface for HighThroughputExecutor tuned for executing multi-node (e.g., MPI) tasks. Places a single pool of workers on the first node of a block, which can then make system calls using MPI launchers.
149
150
```python { .api }
151
class MPIExecutor:
152
def __init__(self, label='MPIExecutor', provider=None, launch_cmd=None,
153
interchange_launch_cmd=None, address=None, loopback_address='127.0.0.1',
154
worker_ports=None, worker_port_range=(54000, 55000),
155
interchange_port_range=(55000, 56000), storage_access=None,
156
working_dir=None, worker_debug=False, max_workers_per_block=1,
157
prefetch_capacity=0, heartbeat_threshold=120, heartbeat_period=30,
158
drain_period=None, poll_period=10, address_probe_timeout=None,
159
worker_logdir_root=None, mpi_launcher='mpiexec',
160
block_error_handler=True, encrypted=False):
161
"""
162
MPI executor for multi-node parallel applications.
163
164
Parameters:
165
- label: Executor label for task targeting (default: 'MPIExecutor')
166
- provider: ExecutionProvider for resource management (default: LocalProvider)
167
- max_workers_per_block: Maximum MPI applications per block (default: 1)
168
- mpi_launcher: MPI launcher type ('mpiexec', 'srun', 'aprun') (default: 'mpiexec')
169
- block_error_handler: Enable automatic block error handling (default: True)
170
- encrypted: Enable encrypted communication (default: False)
171
- All other parameters inherited from HighThroughputExecutor
172
"""
173
```
174
175
**Usage Example:**
176
177
```python
178
from parsl.executors import MPIExecutor
179
from parsl.providers import LocalProvider
180
from parsl.launchers import SimpleLauncher
181
182
# Local MPI execution (requires SimpleLauncher)
183
mpi_exec = MPIExecutor(
184
label='local_mpi',
185
max_workers_per_block=1,
186
mpi_launcher='mpiexec',
187
provider=LocalProvider(
188
launcher=SimpleLauncher() # Required for MPI mode
189
)
190
)
191
192
# HPC cluster MPI execution
193
from parsl.providers import SlurmProvider
194
195
mpi_hpc = MPIExecutor(
196
label='cluster_mpi',
197
max_workers_per_block=1,
198
mpi_launcher='srun',
199
provider=SlurmProvider(
200
partition='compute',
201
nodes_per_block=4,
202
launcher=SimpleLauncher(), # Must use SimpleLauncher
203
walltime='01:00:00'
204
)
205
)
206
```
207
208
### FluxExecutor
209
210
Executor that uses Flux to schedule and run jobs, wrapping every callable into a Flux job. Excellent for jobs with large resource requirements and varying resource needs, but not suitable for large numbers of small, fast jobs (Flux is capped at ~50 jobs per second).
211
212
```python { .api }
213
class FluxExecutor:
214
def __init__(self, provider=None, working_dir=None, label='FluxExecutor',
215
flux_executor_kwargs={}, flux_path=None, launch_cmd=None):
216
"""
217
Flux executor for advanced resource management with per-task resource specifications.
218
219
Parameters:
220
- provider: ExecutionProvider for compute resources (default: LocalProvider)
221
- working_dir: Directory for executor files (auto-generated if None)
222
- label: Executor label for task targeting (default: 'FluxExecutor')
223
- flux_executor_kwargs: Keyword arguments passed to flux.job.FluxExecutor
224
- flux_path: Path to flux installation (searches PATH if None)
225
- launch_cmd: Command for launching executor backend (has reasonable default)
226
"""
227
```
228
229
**Resource Specification Support:**
230
231
FluxExecutor supports detailed per-task resource specifications:
232
233
```python
234
# Supported resource specification keys:
235
resource_spec = {
236
'num_tasks': 1, # Number of tasks (MPI ranks)
237
'cores_per_task': 1, # Cores per task
238
'gpus_per_task': 0, # GPUs per task
239
'num_nodes': 0 # Distribute across N nodes if > 0
240
}
241
```
242
243
**Usage Examples:**
244
245
```python
246
from parsl.executors import FluxExecutor
247
from parsl.providers import LocalProvider, SlurmProvider
248
from parsl.launchers import SrunLauncher
249
250
# Local Flux execution
251
flux_exec = FluxExecutor(
252
label='local_flux',
253
provider=LocalProvider()
254
)
255
256
# HPC cluster Flux execution
257
flux_hpc = FluxExecutor(
258
label='cluster_flux',
259
provider=SlurmProvider(
260
partition='compute',
261
account='my_account',
262
launcher=SrunLauncher(overrides='--mpibind=off'),
263
nodes_per_block=1,
264
walltime='00:30:00'
265
)
266
)
267
268
# Submit tasks with resource specifications
269
@python_app(executors=['cluster_flux'])
270
def compute_task(data, parsl_resource_specification={}):
271
# Task requiring specific resources
272
return process_data(data)
273
274
# Execute with 4 cores per task
275
future = compute_task(
276
large_dataset,
277
parsl_resource_specification={'cores_per_task': 4}
278
)
279
280
# MPI task with multiple ranks across nodes
281
@python_app(executors=['cluster_flux'])
282
def mpi_task(data, parsl_resource_specification={}):
283
# MPI-aware computation
284
return mpi_computation(data)
285
286
mpi_future = mpi_task(
287
mpi_data,
288
parsl_resource_specification={
289
'num_tasks': 8, # 8 MPI ranks
290
'cores_per_task': 2, # 2 cores per rank
291
'num_nodes': 2 # Distribute across 2 nodes
292
}
293
)
294
```
295
296
### RadicalPilotExecutor
297
298
Integration with the RADICAL Pilot framework for large-scale distributed computing with advanced resource management.
299
300
```python { .api }
301
class RadicalPilotExecutor:
302
def __init__(self, label='RadicalPilot', resource_config=None,
303
bulk_mode=True, launch_cmd=None):
304
"""
305
RADICAL Pilot executor for large-scale distributed computing.
306
307
Parameters:
308
- label: Executor label for task targeting
309
- resource_config: ResourceConfig for RADICAL Pilot setup
310
- bulk_mode: Enable bulk task submission (default: True)
311
- launch_cmd: Custom launch command
312
"""
313
```
314
315
**Usage Example:**
316
317
```python
318
from parsl.executors.radical import RadicalPilotExecutor, ResourceConfig
319
320
# RADICAL Pilot configuration
321
resource_config = ResourceConfig(
322
resource='local.localhost',
323
walltime=60, # minutes
324
cpus=4,
325
gpus=0,
326
project='my_project'
327
)
328
329
rp_exec = RadicalPilotExecutor(
330
label='radical_pilot',
331
resource_config=resource_config,
332
bulk_mode=True
333
)
334
```
335
336
## Executor Selection and Task Targeting
337
338
Control which executors run specific tasks using executor labels:
339
340
```python
341
from parsl import python_app
342
343
@python_app(executors=['local_threads'])
344
def light_task():
345
return "completed on threads"
346
347
@python_app(executors=['cluster_htex'])
348
def heavy_task():
349
import time
350
time.sleep(60) # Heavy computation
351
return "completed on cluster"
352
353
@python_app(executors=['mpi_tasks'])
354
def mpi_task():
355
# MPI-aware code
356
from mpi4py import MPI
357
comm = MPI.COMM_WORLD
358
return f"MPI rank {comm.Get_rank()}"
359
```
360
361
## Resource Specification
362
363
Specify resource requirements for tasks:
364
365
```python
366
@python_app
367
def resource_intensive_task(data, parsl_resource_specification={}):
368
"""Task with specific resource needs."""
369
# Process data requiring specific resources
370
return processed_data
371
372
# Execute with resource requirements
373
future = resource_intensive_task(
374
large_dataset,
375
parsl_resource_specification={
376
'cores': 8,
377
'memory': '16GB',
378
'disk': '100GB',
379
'walltime': '02:00:00'
380
}
381
)
382
```
383
384
## Executor Lifecycle Management
385
386
Executors are managed automatically by the DataFlowKernel:
387
388
```python
389
from parsl.config import Config
390
import parsl
391
392
# Executors start when configuration is loaded
393
config = Config(executors=[htex, threads_exec, wq_exec])
394
parsl.load(config)
395
396
# Submit tasks to different executors
397
futures = []
398
for i in range(100):
399
if i % 2 == 0:
400
futures.append(light_task()) # -> threads_exec
401
else:
402
futures.append(heavy_task()) # -> htex
403
404
# Wait for completion
405
results = [f.result() for f in futures]
406
407
# Executors shutdown when DFK is cleared
408
parsl.clear()
409
```
410
411
## Executor Error Handling
412
413
Handle executor-specific errors and failures:
414
415
```python
416
from parsl.executors.errors import ExecutorError, ScalingError
417
418
try:
419
parsl.load(config)
420
except ExecutorError as e:
421
print(f"Executor initialization failed: {e}")
422
except ScalingError as e:
423
print(f"Resource scaling error: {e}")
424
425
# Monitor executor status
426
dfk = parsl.dfk()
427
for executor in dfk.executors.values():
428
print(f"Executor {executor.label}: {executor.status()}")
429
```