CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-parsl

Parallel scripting library for executing workflows across diverse computing resources

Pending
Overview
Eval results
Files

executors.mddocs/

Executors

Parsl executors are the execution backends that run parallel tasks on different computing resources. Each executor type is optimized for specific use cases, from local parallel execution to large-scale distributed computing on HPC systems and cloud platforms.

Capabilities

HighThroughputExecutor

Scalable executor designed for high-throughput parallel workflows using an interchange process and worker pools. Ideal for running many tasks across multiple nodes.

class HighThroughputExecutor:
    def __init__(self, label='HighThroughputExecutor', provider=None,
                 launch_cmd=None, address=None, worker_ports=None,
                 worker_port_range=(54000, 55000), interchange_port_range=(55000, 56000),
                 storage_access=None, working_dir=None, worker_debug=False,
                 cores_per_worker=1, mem_per_worker=None, max_workers=float('inf'),
                 prefetch_capacity=0, heartbeat_threshold=120, heartbeat_period=30,
                 poll_period=10, address_probe_timeout=30, worker_logdir_root=None,
                 container_image=None, encrypted=None, cert_dir=None):
        """
        High-throughput executor for scalable parallel execution.
        
        Parameters:
        - label: Executor label for task targeting
        - provider: ExecutionProvider for resource management
        - cores_per_worker: CPU cores per worker process (default: 1)
        - max_workers: Maximum number of workers (default: unlimited)
        - mem_per_worker: Memory per worker in MB
        - prefetch_capacity: Number of tasks to prefetch per worker
        - heartbeat_threshold: Worker heartbeat timeout in seconds
        - worker_debug: Enable worker debugging logs
        - container_image: Container image for containerized execution
        - encrypted: Enable encrypted communication
        """

Usage Example:

from parsl.executors import HighThroughputExecutor
from parsl.providers import LocalProvider, SlurmProvider

# Local high-throughput execution
htex = HighThroughputExecutor(
    label='local_htex',
    cores_per_worker=2,
    max_workers=8,  
    provider=LocalProvider(
        init_blocks=1,
        max_blocks=2
    )
)

# HPC cluster execution
htex_hpc = HighThroughputExecutor(
    label='cluster_htex',
    cores_per_worker=4,
    mem_per_worker=4000,  # 4GB per worker
    max_workers=100,
    provider=SlurmProvider(
        partition='compute',
        nodes_per_block=2,
        init_blocks=1,
        max_blocks=10,
        walltime='02:00:00'
    )
)

ThreadPoolExecutor

Local thread-based executor for lightweight parallel tasks that don't require distributed execution. Best for I/O-bound tasks and quick local parallelism.

class ThreadPoolExecutor:
    def __init__(self, max_threads=2, thread_name_prefix='', label='threads'):
        """
        Thread pool executor for local parallel execution.
        
        Parameters:
        - max_threads: Maximum number of concurrent threads (default: 2)
        - thread_name_prefix: Prefix for thread names
        - label: Executor label for task targeting
        """

Usage Example:

from parsl.executors import ThreadPoolExecutor

# Light parallel tasks
threads_exec = ThreadPoolExecutor(
    max_threads=4,
    label='local_threads'
)

# I/O intensive tasks
io_exec = ThreadPoolExecutor(
    max_threads=10,
    label='io_tasks'  
)

WorkQueueExecutor

Integration with the Work Queue distributed computing system, enabling dynamic resource allocation and fault tolerance across diverse computing resources.

class WorkQueueExecutor:
    def __init__(self, label='WorkQueue', port=9123, project_name=None,
                 env=None, shared_fs=True, use_cache=True, init_command='',
                 worker_executable=None, container_image=None,
                 autolabel=True, autocategory=True, should_transfer_worker_stdout=False,
                 worker_options=None, factory_options=None):
        """
        Work Queue executor for dynamic resource management.
        
        Parameters:
        - port: Port for Work Queue master (default: 9123)
        - project_name: Project name for worker discovery
        - shared_fs: Whether workers share filesystem (default: True)  
        - use_cache: Enable result caching (default: True)
        - container_image: Container image for workers
        - autolabel: Enable automatic worker labeling
        - autocategory: Enable automatic task categorization
        """

Usage Example:

from parsl.executors import WorkQueueExecutor

# Work Queue with dynamic workers
wq_exec = WorkQueueExecutor(
    label='work_queue',
    port=9123,
    project_name='my_parsl_project',
    shared_fs=False,  # Handle file transfers
    autolabel=True,
    autocategory=True
)

MPIExecutor

Simplified interface for HighThroughputExecutor tuned for executing multi-node (e.g., MPI) tasks. Places a single pool of workers on the first node of a block, which can then make system calls using MPI launchers.

class MPIExecutor:
    def __init__(self, label='MPIExecutor', provider=None, launch_cmd=None,
                 interchange_launch_cmd=None, address=None, loopback_address='127.0.0.1',
                 worker_ports=None, worker_port_range=(54000, 55000),
                 interchange_port_range=(55000, 56000), storage_access=None,
                 working_dir=None, worker_debug=False, max_workers_per_block=1,
                 prefetch_capacity=0, heartbeat_threshold=120, heartbeat_period=30,
                 drain_period=None, poll_period=10, address_probe_timeout=None,
                 worker_logdir_root=None, mpi_launcher='mpiexec',
                 block_error_handler=True, encrypted=False):
        """
        MPI executor for multi-node parallel applications.
        
        Parameters:
        - label: Executor label for task targeting (default: 'MPIExecutor')
        - provider: ExecutionProvider for resource management (default: LocalProvider)
        - max_workers_per_block: Maximum MPI applications per block (default: 1)
        - mpi_launcher: MPI launcher type ('mpiexec', 'srun', 'aprun') (default: 'mpiexec')
        - block_error_handler: Enable automatic block error handling (default: True)
        - encrypted: Enable encrypted communication (default: False)
        - All other parameters inherited from HighThroughputExecutor
        """

Usage Example:

from parsl.executors import MPIExecutor
from parsl.providers import LocalProvider
from parsl.launchers import SimpleLauncher

# Local MPI execution (requires SimpleLauncher)
mpi_exec = MPIExecutor(
    label='local_mpi',
    max_workers_per_block=1,
    mpi_launcher='mpiexec',
    provider=LocalProvider(
        launcher=SimpleLauncher()  # Required for MPI mode
    )
)

# HPC cluster MPI execution
from parsl.providers import SlurmProvider

mpi_hpc = MPIExecutor(
    label='cluster_mpi',
    max_workers_per_block=1,
    mpi_launcher='srun',
    provider=SlurmProvider(
        partition='compute',
        nodes_per_block=4,
        launcher=SimpleLauncher(),  # Must use SimpleLauncher
        walltime='01:00:00'
    )
)

FluxExecutor

Executor that uses Flux to schedule and run jobs, wrapping every callable into a Flux job. Excellent for jobs with large resource requirements and varying resource needs, but not suitable for large numbers of small, fast jobs (Flux is capped at ~50 jobs per second).

class FluxExecutor:
    def __init__(self, provider=None, working_dir=None, label='FluxExecutor',
                 flux_executor_kwargs={}, flux_path=None, launch_cmd=None):
        """
        Flux executor for advanced resource management with per-task resource specifications.
        
        Parameters:
        - provider: ExecutionProvider for compute resources (default: LocalProvider)
        - working_dir: Directory for executor files (auto-generated if None)
        - label: Executor label for task targeting (default: 'FluxExecutor')
        - flux_executor_kwargs: Keyword arguments passed to flux.job.FluxExecutor
        - flux_path: Path to flux installation (searches PATH if None)
        - launch_cmd: Command for launching executor backend (has reasonable default)
        """

Resource Specification Support:

FluxExecutor supports detailed per-task resource specifications:

# Supported resource specification keys:
resource_spec = {
    'num_tasks': 1,        # Number of tasks (MPI ranks)
    'cores_per_task': 1,   # Cores per task
    'gpus_per_task': 0,    # GPUs per task
    'num_nodes': 0         # Distribute across N nodes if > 0
}

Usage Examples:

from parsl.executors import FluxExecutor
from parsl.providers import LocalProvider, SlurmProvider
from parsl.launchers import SrunLauncher

# Local Flux execution
flux_exec = FluxExecutor(
    label='local_flux',
    provider=LocalProvider()
)

# HPC cluster Flux execution
flux_hpc = FluxExecutor(
    label='cluster_flux',
    provider=SlurmProvider(
        partition='compute',
        account='my_account',
        launcher=SrunLauncher(overrides='--mpibind=off'),
        nodes_per_block=1,
        walltime='00:30:00'
    )
)

# Submit tasks with resource specifications
@python_app(executors=['cluster_flux'])
def compute_task(data, parsl_resource_specification={}):
    # Task requiring specific resources
    return process_data(data)

# Execute with 4 cores per task
future = compute_task(
    large_dataset,
    parsl_resource_specification={'cores_per_task': 4}
)

# MPI task with multiple ranks across nodes
@python_app(executors=['cluster_flux'])
def mpi_task(data, parsl_resource_specification={}):
    # MPI-aware computation
    return mpi_computation(data)

mpi_future = mpi_task(
    mpi_data,
    parsl_resource_specification={
        'num_tasks': 8,        # 8 MPI ranks
        'cores_per_task': 2,   # 2 cores per rank
        'num_nodes': 2         # Distribute across 2 nodes
    }
)

RadicalPilotExecutor

Integration with the RADICAL Pilot framework for large-scale distributed computing with advanced resource management.

class RadicalPilotExecutor:
    def __init__(self, label='RadicalPilot', resource_config=None,
                 bulk_mode=True, launch_cmd=None):
        """
        RADICAL Pilot executor for large-scale distributed computing.
        
        Parameters:
        - label: Executor label for task targeting
        - resource_config: ResourceConfig for RADICAL Pilot setup
        - bulk_mode: Enable bulk task submission (default: True) 
        - launch_cmd: Custom launch command
        """

Usage Example:

from parsl.executors.radical import RadicalPilotExecutor, ResourceConfig

# RADICAL Pilot configuration
resource_config = ResourceConfig(
    resource='local.localhost',
    walltime=60,  # minutes
    cpus=4,
    gpus=0,
    project='my_project'
)

rp_exec = RadicalPilotExecutor(
    label='radical_pilot',
    resource_config=resource_config,
    bulk_mode=True
)

Executor Selection and Task Targeting

Control which executors run specific tasks using executor labels:

from parsl import python_app

@python_app(executors=['local_threads'])
def light_task():
    return "completed on threads"

@python_app(executors=['cluster_htex'])  
def heavy_task():
    import time
    time.sleep(60)  # Heavy computation
    return "completed on cluster"

@python_app(executors=['mpi_tasks'])
def mpi_task():
    # MPI-aware code
    from mpi4py import MPI
    comm = MPI.COMM_WORLD
    return f"MPI rank {comm.Get_rank()}"

Resource Specification

Specify resource requirements for tasks:

@python_app
def resource_intensive_task(data, parsl_resource_specification={}):
    """Task with specific resource needs."""
    # Process data requiring specific resources
    return processed_data

# Execute with resource requirements
future = resource_intensive_task(
    large_dataset,
    parsl_resource_specification={
        'cores': 8,
        'memory': '16GB',
        'disk': '100GB',
        'walltime': '02:00:00'
    }
)

Executor Lifecycle Management

Executors are managed automatically by the DataFlowKernel:

from parsl.config import Config
import parsl

# Executors start when configuration is loaded
config = Config(executors=[htex, threads_exec, wq_exec])
parsl.load(config)

# Submit tasks to different executors
futures = []
for i in range(100):
    if i % 2 == 0:
        futures.append(light_task())  # -> threads_exec
    else:
        futures.append(heavy_task())  # -> htex

# Wait for completion
results = [f.result() for f in futures]

# Executors shutdown when DFK is cleared
parsl.clear()

Executor Error Handling

Handle executor-specific errors and failures:

from parsl.executors.errors import ExecutorError, ScalingError

try:
    parsl.load(config)
except ExecutorError as e:
    print(f"Executor initialization failed: {e}")
except ScalingError as e:
    print(f"Resource scaling error: {e}")

# Monitor executor status
dfk = parsl.dfk()
for executor in dfk.executors.values():
    print(f"Executor {executor.label}: {executor.status()}")

Install with Tessl CLI

npx tessl i tessl/pypi-parsl

docs

app-decorators.md

configuration.md

data-management.md

executors.md

index.md

launchers.md

monitoring.md

providers.md

workflow-management.md

tile.json