Parallel scripting library for executing workflows across diverse computing resources
—
Parsl executors are the execution backends that run parallel tasks on different computing resources. Each executor type is optimized for specific use cases, from local parallel execution to large-scale distributed computing on HPC systems and cloud platforms.
Scalable executor designed for high-throughput parallel workflows using an interchange process and worker pools. Ideal for running many tasks across multiple nodes.
class HighThroughputExecutor:
def __init__(self, label='HighThroughputExecutor', provider=None,
launch_cmd=None, address=None, worker_ports=None,
worker_port_range=(54000, 55000), interchange_port_range=(55000, 56000),
storage_access=None, working_dir=None, worker_debug=False,
cores_per_worker=1, mem_per_worker=None, max_workers=float('inf'),
prefetch_capacity=0, heartbeat_threshold=120, heartbeat_period=30,
poll_period=10, address_probe_timeout=30, worker_logdir_root=None,
container_image=None, encrypted=None, cert_dir=None):
"""
High-throughput executor for scalable parallel execution.
Parameters:
- label: Executor label for task targeting
- provider: ExecutionProvider for resource management
- cores_per_worker: CPU cores per worker process (default: 1)
- max_workers: Maximum number of workers (default: unlimited)
- mem_per_worker: Memory per worker in MB
- prefetch_capacity: Number of tasks to prefetch per worker
- heartbeat_threshold: Worker heartbeat timeout in seconds
- worker_debug: Enable worker debugging logs
- container_image: Container image for containerized execution
- encrypted: Enable encrypted communication
"""Usage Example:
from parsl.executors import HighThroughputExecutor
from parsl.providers import LocalProvider, SlurmProvider
# Local high-throughput execution
htex = HighThroughputExecutor(
label='local_htex',
cores_per_worker=2,
max_workers=8,
provider=LocalProvider(
init_blocks=1,
max_blocks=2
)
)
# HPC cluster execution
htex_hpc = HighThroughputExecutor(
label='cluster_htex',
cores_per_worker=4,
mem_per_worker=4000, # 4GB per worker
max_workers=100,
provider=SlurmProvider(
partition='compute',
nodes_per_block=2,
init_blocks=1,
max_blocks=10,
walltime='02:00:00'
)
)Local thread-based executor for lightweight parallel tasks that don't require distributed execution. Best for I/O-bound tasks and quick local parallelism.
class ThreadPoolExecutor:
def __init__(self, max_threads=2, thread_name_prefix='', label='threads'):
"""
Thread pool executor for local parallel execution.
Parameters:
- max_threads: Maximum number of concurrent threads (default: 2)
- thread_name_prefix: Prefix for thread names
- label: Executor label for task targeting
"""Usage Example:
from parsl.executors import ThreadPoolExecutor
# Light parallel tasks
threads_exec = ThreadPoolExecutor(
max_threads=4,
label='local_threads'
)
# I/O intensive tasks
io_exec = ThreadPoolExecutor(
max_threads=10,
label='io_tasks'
)Integration with the Work Queue distributed computing system, enabling dynamic resource allocation and fault tolerance across diverse computing resources.
class WorkQueueExecutor:
def __init__(self, label='WorkQueue', port=9123, project_name=None,
env=None, shared_fs=True, use_cache=True, init_command='',
worker_executable=None, container_image=None,
autolabel=True, autocategory=True, should_transfer_worker_stdout=False,
worker_options=None, factory_options=None):
"""
Work Queue executor for dynamic resource management.
Parameters:
- port: Port for Work Queue master (default: 9123)
- project_name: Project name for worker discovery
- shared_fs: Whether workers share filesystem (default: True)
- use_cache: Enable result caching (default: True)
- container_image: Container image for workers
- autolabel: Enable automatic worker labeling
- autocategory: Enable automatic task categorization
"""Usage Example:
from parsl.executors import WorkQueueExecutor
# Work Queue with dynamic workers
wq_exec = WorkQueueExecutor(
label='work_queue',
port=9123,
project_name='my_parsl_project',
shared_fs=False, # Handle file transfers
autolabel=True,
autocategory=True
)Simplified interface for HighThroughputExecutor tuned for executing multi-node (e.g., MPI) tasks. Places a single pool of workers on the first node of a block, which can then make system calls using MPI launchers.
class MPIExecutor:
def __init__(self, label='MPIExecutor', provider=None, launch_cmd=None,
interchange_launch_cmd=None, address=None, loopback_address='127.0.0.1',
worker_ports=None, worker_port_range=(54000, 55000),
interchange_port_range=(55000, 56000), storage_access=None,
working_dir=None, worker_debug=False, max_workers_per_block=1,
prefetch_capacity=0, heartbeat_threshold=120, heartbeat_period=30,
drain_period=None, poll_period=10, address_probe_timeout=None,
worker_logdir_root=None, mpi_launcher='mpiexec',
block_error_handler=True, encrypted=False):
"""
MPI executor for multi-node parallel applications.
Parameters:
- label: Executor label for task targeting (default: 'MPIExecutor')
- provider: ExecutionProvider for resource management (default: LocalProvider)
- max_workers_per_block: Maximum MPI applications per block (default: 1)
- mpi_launcher: MPI launcher type ('mpiexec', 'srun', 'aprun') (default: 'mpiexec')
- block_error_handler: Enable automatic block error handling (default: True)
- encrypted: Enable encrypted communication (default: False)
- All other parameters inherited from HighThroughputExecutor
"""Usage Example:
from parsl.executors import MPIExecutor
from parsl.providers import LocalProvider
from parsl.launchers import SimpleLauncher
# Local MPI execution (requires SimpleLauncher)
mpi_exec = MPIExecutor(
label='local_mpi',
max_workers_per_block=1,
mpi_launcher='mpiexec',
provider=LocalProvider(
launcher=SimpleLauncher() # Required for MPI mode
)
)
# HPC cluster MPI execution
from parsl.providers import SlurmProvider
mpi_hpc = MPIExecutor(
label='cluster_mpi',
max_workers_per_block=1,
mpi_launcher='srun',
provider=SlurmProvider(
partition='compute',
nodes_per_block=4,
launcher=SimpleLauncher(), # Must use SimpleLauncher
walltime='01:00:00'
)
)Executor that uses Flux to schedule and run jobs, wrapping every callable into a Flux job. Excellent for jobs with large resource requirements and varying resource needs, but not suitable for large numbers of small, fast jobs (Flux is capped at ~50 jobs per second).
class FluxExecutor:
def __init__(self, provider=None, working_dir=None, label='FluxExecutor',
flux_executor_kwargs={}, flux_path=None, launch_cmd=None):
"""
Flux executor for advanced resource management with per-task resource specifications.
Parameters:
- provider: ExecutionProvider for compute resources (default: LocalProvider)
- working_dir: Directory for executor files (auto-generated if None)
- label: Executor label for task targeting (default: 'FluxExecutor')
- flux_executor_kwargs: Keyword arguments passed to flux.job.FluxExecutor
- flux_path: Path to flux installation (searches PATH if None)
- launch_cmd: Command for launching executor backend (has reasonable default)
"""Resource Specification Support:
FluxExecutor supports detailed per-task resource specifications:
# Supported resource specification keys:
resource_spec = {
'num_tasks': 1, # Number of tasks (MPI ranks)
'cores_per_task': 1, # Cores per task
'gpus_per_task': 0, # GPUs per task
'num_nodes': 0 # Distribute across N nodes if > 0
}Usage Examples:
from parsl.executors import FluxExecutor
from parsl.providers import LocalProvider, SlurmProvider
from parsl.launchers import SrunLauncher
# Local Flux execution
flux_exec = FluxExecutor(
label='local_flux',
provider=LocalProvider()
)
# HPC cluster Flux execution
flux_hpc = FluxExecutor(
label='cluster_flux',
provider=SlurmProvider(
partition='compute',
account='my_account',
launcher=SrunLauncher(overrides='--mpibind=off'),
nodes_per_block=1,
walltime='00:30:00'
)
)
# Submit tasks with resource specifications
@python_app(executors=['cluster_flux'])
def compute_task(data, parsl_resource_specification={}):
# Task requiring specific resources
return process_data(data)
# Execute with 4 cores per task
future = compute_task(
large_dataset,
parsl_resource_specification={'cores_per_task': 4}
)
# MPI task with multiple ranks across nodes
@python_app(executors=['cluster_flux'])
def mpi_task(data, parsl_resource_specification={}):
# MPI-aware computation
return mpi_computation(data)
mpi_future = mpi_task(
mpi_data,
parsl_resource_specification={
'num_tasks': 8, # 8 MPI ranks
'cores_per_task': 2, # 2 cores per rank
'num_nodes': 2 # Distribute across 2 nodes
}
)Integration with the RADICAL Pilot framework for large-scale distributed computing with advanced resource management.
class RadicalPilotExecutor:
def __init__(self, label='RadicalPilot', resource_config=None,
bulk_mode=True, launch_cmd=None):
"""
RADICAL Pilot executor for large-scale distributed computing.
Parameters:
- label: Executor label for task targeting
- resource_config: ResourceConfig for RADICAL Pilot setup
- bulk_mode: Enable bulk task submission (default: True)
- launch_cmd: Custom launch command
"""Usage Example:
from parsl.executors.radical import RadicalPilotExecutor, ResourceConfig
# RADICAL Pilot configuration
resource_config = ResourceConfig(
resource='local.localhost',
walltime=60, # minutes
cpus=4,
gpus=0,
project='my_project'
)
rp_exec = RadicalPilotExecutor(
label='radical_pilot',
resource_config=resource_config,
bulk_mode=True
)Control which executors run specific tasks using executor labels:
from parsl import python_app
@python_app(executors=['local_threads'])
def light_task():
return "completed on threads"
@python_app(executors=['cluster_htex'])
def heavy_task():
import time
time.sleep(60) # Heavy computation
return "completed on cluster"
@python_app(executors=['mpi_tasks'])
def mpi_task():
# MPI-aware code
from mpi4py import MPI
comm = MPI.COMM_WORLD
return f"MPI rank {comm.Get_rank()}"Specify resource requirements for tasks:
@python_app
def resource_intensive_task(data, parsl_resource_specification={}):
"""Task with specific resource needs."""
# Process data requiring specific resources
return processed_data
# Execute with resource requirements
future = resource_intensive_task(
large_dataset,
parsl_resource_specification={
'cores': 8,
'memory': '16GB',
'disk': '100GB',
'walltime': '02:00:00'
}
)Executors are managed automatically by the DataFlowKernel:
from parsl.config import Config
import parsl
# Executors start when configuration is loaded
config = Config(executors=[htex, threads_exec, wq_exec])
parsl.load(config)
# Submit tasks to different executors
futures = []
for i in range(100):
if i % 2 == 0:
futures.append(light_task()) # -> threads_exec
else:
futures.append(heavy_task()) # -> htex
# Wait for completion
results = [f.result() for f in futures]
# Executors shutdown when DFK is cleared
parsl.clear()Handle executor-specific errors and failures:
from parsl.executors.errors import ExecutorError, ScalingError
try:
parsl.load(config)
except ExecutorError as e:
print(f"Executor initialization failed: {e}")
except ScalingError as e:
print(f"Resource scaling error: {e}")
# Monitor executor status
dfk = parsl.dfk()
for executor in dfk.executors.values():
print(f"Executor {executor.label}: {executor.status()}")Install with Tessl CLI
npx tessl i tessl/pypi-parsl