Parallel scripting library for executing workflows across diverse computing resources
—
Parsl's app decorators transform ordinary Python functions into parallel apps that can be executed across distributed computing resources. These decorators handle dependency management, data flow, and asynchronous execution automatically.
Converts Python functions into parallel apps that execute on remote workers with automatic dependency tracking and data management.
def python_app(function=None, data_flow_kernel=None, cache=False,
executors='all', ignore_for_cache=None):
"""
Decorator for making Python functions into parallel apps.
Parameters:
- function: The function to decorate (automatically provided)
- data_flow_kernel: DataFlowKernel instance (optional, uses current DFK)
- cache: Enable caching of app results (default: False)
- executors: List of executor labels or 'all' (default: 'all')
- ignore_for_cache: Parameter names to ignore for caching
Returns:
Decorated function that returns AppFuture when called
"""Usage Example:
@python_app
def process_data(data, multiplier=2):
"""Process data by multiplying each element."""
return [x * multiplier for x in data]
@python_app(executors=['compute_nodes'], cache=True)
def expensive_computation(n):
"""CPU-intensive computation with caching enabled."""
result = 0
for i in range(n):
result += i ** 2
return result
# Execute and get futures
future1 = process_data([1, 2, 3, 4])
future2 = expensive_computation(1000000)
# Get results (blocks until complete)
processed = future1.result() # [2, 4, 6, 8]
computed = future2.result() # computed valueConverts functions that return bash command strings into parallel apps that execute shell commands on remote workers.
def bash_app(function=None, data_flow_kernel=None, cache=False,
executors='all', ignore_for_cache=None):
"""
Decorator for making bash command functions into parallel apps.
Parameters:
- function: The function to decorate (returns bash command string)
- data_flow_kernel: DataFlowKernel instance (optional, uses current DFK)
- cache: Enable caching of app results (default: False)
- executors: List of executor labels or 'all' (default: 'all')
- ignore_for_cache: Parameter names to ignore for caching
Returns:
Decorated function that returns AppFuture when called
"""Usage Example:
from parsl.data_provider.files import File
@bash_app
def process_file(input_file, output_file, inputs=[], outputs=[]):
"""Process a file using bash commands."""
return f'sort {inputs[0]} > {outputs[0]}'
@bash_app
def compile_code(source_file, executable, inputs=[], outputs=[]):
"""Compile source code."""
return f'gcc {inputs[0]} -o {outputs[0]}'
# Execute with file dependencies
input_file = File('data.txt')
output_file = File('sorted_data.txt')
future = process_file(
input_file, output_file,
inputs=[input_file],
outputs=[output_file]
)
# Wait for completion
future.result() # Returns exit codeSpecial decorator for join operations that need to run on the submit-side rather than remote workers, typically used for aggregating results from multiple parallel tasks. Join apps can return Future objects that are awaited without blocking workers, preventing deadlocks.
def join_app(function=None, data_flow_kernel=None, cache=False,
ignore_for_cache=None):
"""
Decorator for join apps that run on submit-side internal executor.
Parameters:
- function: The function to decorate (automatically provided)
- data_flow_kernel: DataFlowKernel instance (optional, uses current DFK)
- cache: Enable caching of app results (default: False)
- ignore_for_cache: Parameter names to ignore for caching
Returns:
Decorated function that returns AppFuture when called
Note: Always executes on "_parsl_internal" executor
"""Relationship to python_app:
join_app is functionally equivalent to python_app(join=True, executors=["_parsl_internal"])python_app decorator currently does not expose the join parameter@join_app for apps that need to wait for Futures without blocking workersUsage Example:
@python_app
def generate_data(size):
"""Generate data chunk."""
return list(range(size))
@join_app
def aggregate_results(futures_list):
"""Aggregate results from multiple parallel tasks."""
results = []
for future in futures_list:
results.extend(future.result())
return sorted(results)
# Create multiple parallel tasks
futures = [generate_data(100) for _ in range(5)]
# Aggregate on submit-side
aggregated = aggregate_results(futures)
final_result = aggregated.result()App-decorated functions can accept special parameters that control execution behavior:
# These parameters are automatically handled by Parsl:
# - inputs: List of input File objects
# - outputs: List of output File objects
# - stdout: File object or string for stdout redirection
# - stderr: File object or string for stderr redirection
# - walltime: Maximum execution time in seconds
# - parsl_resource_specification: Resource requirements dictExample with execution control:
@bash_app
def long_running_task(input_data, outputs=[], stdout=None, stderr=None, walltime=3600):
"""Long-running task with resource control."""
return f'process_data {input_data} > {outputs[0]}'
@python_app
def cpu_intensive_task(data, parsl_resource_specification={}):
"""Task with specific resource requirements."""
# parsl_resource_specification can specify cores, memory, etc.
return sum(x**2 for x in data)
# Execute with resource control
future = long_running_task(
'input.dat',
outputs=[File('output.dat')],
stdout='task.out',
stderr='task.err',
walltime=1800, # 30 minutes
parsl_resource_specification={'cores': 4, 'memory': '8GB'}
).result() blocks until completion and returns valueApps automatically handle dependencies through Future objects:
@python_app
def step1():
return "step1_result"
@python_app
def step2(input_data):
return f"step2_with_{input_data}"
@python_app
def step3(data1, data2):
return f"final_{data1}_{data2}"
# Create dependency chain
future1 = step1()
future2 = step2(future1) # Waits for future1
future3 = step3(future1, future2) # Waits for both
result = future3.result() # "final_step1_result_step2_with_step1_result"Install with Tessl CLI
npx tessl i tessl/pypi-parsl