Parallel scripting library for executing workflows across diverse computing resources
—
Parsl's workflow management system provides functions for loading configurations, managing execution state, controlling task execution, and handling the DataFlowKernel lifecycle.
Load Parsl configurations and manage the DataFlowKernel that orchestrates workflow execution.
def load(config, close_bad_executors=True, disable_cleanup=False):
"""
Load a Parsl configuration and initialize the DataFlowKernel.
Parameters:
- config: Config object specifying executors and execution policies
- close_bad_executors: Close executors that fail to start (default: True)
- disable_cleanup: Disable automatic cleanup on exit (default: False)
Returns:
DataFlowKernel: The initialized workflow execution engine
Raises:
RuntimeError: If a DataFlowKernel is already loaded
ConfigurationError: If configuration is invalid
"""
def clear():
"""
Clear the current DataFlowKernel and shut down all executors.
This function blocks until all running tasks complete and resources
are properly cleaned up.
"""
@property
def dfk():
"""
Access the current DataFlowKernel instance.
Returns:
DataFlowKernel: Current DFK instance
Raises:
NoDataFlowKernelError: If no DFK is currently loaded
"""Basic Workflow Management:
import parsl
from parsl.config import Config
from parsl.executors import ThreadPoolExecutor
# Load configuration
config = Config(executors=[ThreadPoolExecutor(max_threads=4)])
dfk = parsl.load(config)
# Access current DataFlowKernel
current_dfk = parsl.dfk()
print(f"DFK ID: {current_dfk.run_id}")
# Submit tasks
futures = [my_app(i) for i in range(10)]
# Clean shutdown
parsl.clear()Control and monitor task execution across the workflow.
def wait_for_current_tasks():
"""
Wait for all currently submitted tasks to complete.
This function blocks until all tasks submitted to the current
DataFlowKernel have finished executing (successfully or with errors).
"""Task Execution Examples:
import parsl
from parsl import python_app
@python_app
def compute_task(x):
import time
time.sleep(x) # Simulate work
return x ** 2
# Load configuration
parsl.load(config)
# Submit batch of tasks
futures = []
for i in range(1, 6):
future = compute_task(i)
futures.append(future)
print(f"Submitted task {i}")
# Wait for all current tasks to complete
print("Waiting for all tasks...")
parsl.wait_for_current_tasks()
print("All tasks completed")
# Collect results
results = [f.result() for f in futures]
print(f"Results: {results}")
parsl.clear()Use Parsl as a context manager for automatic resource management.
# Context manager syntax:
# with parsl.load(config):
# # Submit and execute tasks
# # Automatic cleanup on exit based on config.exit_modeContext Manager Examples:
import parsl
from parsl.config import Config
from parsl.executors import ThreadPoolExecutor
# Basic context manager usage
config = Config(
executors=[ThreadPoolExecutor(max_threads=4)],
exit_mode='wait' # Wait for tasks on normal exit
)
with parsl.load(config):
# Submit tasks within context
futures = [compute_task(i) for i in range(5)]
# Tasks automatically complete before exiting context
results = [f.result() for f in futures]
print(f"Completed: {results}")
# DFK automatically cleared when exiting context
# Exception handling with context manager
try:
with parsl.load(config):
# Submit tasks
futures = [risky_task(i) for i in range(10)]
# If exception occurs, behavior depends on exit_mode
raise ValueError("Something went wrong")
except ValueError as e:
print(f"Workflow failed: {e}")
# DFK still cleaned up properlyDirect access to the DataFlowKernel for advanced workflow control and monitoring.
class DataFlowKernel:
"""
Core workflow execution engine managing task scheduling and dependencies.
Key properties:
- run_id: Unique identifier for this DFK instance
- executors: Dictionary of configured executors
- config: Current configuration object
- task_count: Number of tasks submitted
- tasks: Dictionary of task records
"""
def cleanup(self):
"""Clean up DFK resources and shut down executors."""
def wait_for_current_tasks(self):
"""Wait for all current tasks to complete."""
def submit(self, func, *args, **kwargs):
"""Submit a task for execution (internal use)."""Advanced DFK Usage:
import parsl
# Load configuration and access DFK
parsl.load(config)
dfk = parsl.dfk()
# Monitor workflow state
print(f"Run ID: {dfk.run_id}")
print(f"Executors: {list(dfk.executors.keys())}")
print(f"Tasks submitted: {dfk.task_count}")
# Submit tasks and monitor
futures = [compute_task(i) for i in range(10)]
# Check task states
for task_id, task_record in dfk.tasks.items():
print(f"Task {task_id}: {task_record.status}")
# Wait for completion
dfk.wait_for_current_tasks()
parsl.clear()Manage workflow execution state, including checkpointing and recovery.
# Checkpoint and recovery functions (accessed through config):
# - checkpoint_mode: When to create checkpoints
# - checkpoint_files: Previous checkpoints to load
# - checkpoint_period: Interval for periodic checkpointingCheckpointing Example:
from parsl.config import Config
from parsl.utils import get_all_checkpoints, get_last_checkpoint
# Configuration with checkpointing
config = Config(
executors=[ThreadPoolExecutor(max_threads=4)],
checkpoint_mode='task_exit', # Checkpoint after each task
checkpoint_files=get_all_checkpoints('workflow_checkpoints/'),
run_dir='workflow_run_20240101'
)
with parsl.load(config):
# Submit long-running workflow
futures = [long_running_task(i) for i in range(100)]
# Tasks are checkpointed automatically
# Workflow can be resumed if interrupted
results = [f.result() for f in futures]
# Recovery from checkpoint
recovery_config = Config(
executors=[ThreadPoolExecutor(max_threads=4)],
checkpoint_files=[get_last_checkpoint('workflow_checkpoints/')],
run_dir='workflow_run_20240101_recovery'
)
# Resume from last checkpoint
with parsl.load(recovery_config):
# Only incomplete tasks will be re-executed
print("Resumed from checkpoint")Handle workflow-level errors and exceptions.
from parsl.errors import NoDataFlowKernelError, ConfigurationError
# Common workflow errors:
# - NoDataFlowKernelError: No DFK loaded when required
# - ConfigurationError: Invalid configuration
# - ExecutorError: Executor initialization or operation failureError Handling Examples:
import parsl
from parsl.errors import NoDataFlowKernelError, ConfigurationError
# Handle missing DataFlowKernel
try:
# Attempt to access DFK without loading
dfk = parsl.dfk()
except NoDataFlowKernelError:
print("No DataFlowKernel loaded. Loading configuration...")
parsl.load(config)
dfk = parsl.dfk()
# Handle configuration errors
try:
invalid_config = Config(executors=[]) # Empty executor list
parsl.load(invalid_config)
except ConfigurationError as e:
print(f"Configuration error: {e}")
# Load valid configuration instead
parsl.load(valid_config)
# Handle executor failures
try:
parsl.load(config)
except Exception as e:
print(f"Failed to initialize executors: {e}")
# Try alternative configuration
fallback_config = Config(executors=[ThreadPoolExecutor(max_threads=2)])
parsl.load(fallback_config)
# Graceful workflow shutdown
def safe_shutdown():
"""Safely shutdown Parsl workflow."""
try:
# Wait for tasks with timeout
parsl.wait_for_current_tasks()
except KeyboardInterrupt:
print("Interrupt received, shutting down...")
finally:
# Always clear DFK
try:
parsl.clear()
except:
pass # Already cleared or never loaded
# Use in main workflow
if __name__ == "__main__":
try:
parsl.load(config)
# Execute workflow
futures = [my_task(i) for i in range(100)]
results = [f.result() for f in futures]
except KeyboardInterrupt:
print("Workflow interrupted by user")
except Exception as e:
print(f"Workflow failed: {e}")
finally:
safe_shutdown()Common patterns for structuring Parsl workflows.
Sequential Workflow:
@python_app
def step1(input_data):
return process_step1(input_data)
@python_app
def step2(step1_result):
return process_step2(step1_result)
@python_app
def step3(step2_result):
return process_step3(step2_result)
# Sequential execution with dependencies
with parsl.load(config):
result1 = step1("initial_data")
result2 = step2(result1) # Waits for step1
result3 = step3(result2) # Waits for step2
final_result = result3.result()Parallel Workflow:
@python_app
def parallel_task(item):
return process_item(item)
@python_app
def aggregate_results(futures_list):
"""Aggregate results from parallel tasks."""
results = [f.result() for f in futures_list]
return combine_results(results)
# Parallel execution with aggregation
with parsl.load(config):
# Launch parallel tasks
futures = [parallel_task(item) for item in data_items]
# Aggregate results
final_result = aggregate_results(futures)
print(f"Aggregated result: {final_result.result()}")Map-Reduce Workflow:
@python_app
def map_task(data_chunk):
"""Map operation on data chunk."""
return [transform(item) for item in data_chunk]
@python_app
def reduce_task(mapped_results):
"""Reduce operation on mapped results."""
flattened = [item for sublist in mapped_results for item in sublist]
return aggregate(flattened)
# Map-reduce pattern
with parsl.load(config):
# Split data into chunks
data_chunks = split_data(large_dataset, num_chunks=10)
# Map phase - parallel processing
map_futures = [map_task(chunk) for chunk in data_chunks]
# Reduce phase - aggregate results
final_result = reduce_task(map_futures)
print(f"Final result: {final_result.result()}")Install with Tessl CLI
npx tessl i tessl/pypi-parsl