Parallel scripting library for executing workflows across diverse computing resources
—
Parsl's configuration system provides comprehensive control over workflow execution, resource management, monitoring, and checkpointing through the Config class.
The main configuration class that binds together executors, monitoring, checkpointing, and execution policies.
class Config:
def __init__(self, executors=None, app_cache=True, checkpoint_files=None,
checkpoint_mode=None, checkpoint_period=None,
dependency_resolver=None, exit_mode='cleanup',
garbage_collect=True, internal_tasks_max_threads=10,
retries=0, retry_handler=None, run_dir='runinfo',
std_autopath=None, strategy='simple', strategy_period=5,
max_idletime=120.0, monitoring=None, usage_tracking=0,
project_name=None, initialize_logging=True):
"""
Parsl configuration specification.
Parameters:
- executors: List of ParslExecutor instances (default: [ThreadPoolExecutor()])
- app_cache: Enable app result caching (default: True)
- checkpoint_files: List of checkpoint file paths to load
- checkpoint_mode: 'dfk_exit', 'task_exit', 'periodic', 'manual', or None
- checkpoint_period: Time interval for periodic checkpointing (HH:MM:SS)
- dependency_resolver: Custom dependency resolver plugin
- exit_mode: Context manager exit behavior ('cleanup', 'skip', 'wait')
- garbage_collect: Enable task garbage collection (default: True)
- internal_tasks_max_threads: Max threads for internal operations (default: 10)
- retries: Default retry count for failed tasks (default: 0)
- retry_handler: Custom retry handler function
- run_dir: Directory for Parsl runtime information (default: 'runinfo')
- std_autopath: Standard output auto-path function
- strategy: Task execution strategy ('simple', 'htex_auto_scale', 'none')
- strategy_period: Strategy evaluation period in seconds (default: 5)
- max_idletime: Max idle time before cleanup in seconds (default: 120.0)
- monitoring: MonitoringHub instance for workflow monitoring
- usage_tracking: Usage tracking level 0-3 (default: 0)
- project_name: Project name for usage tracking identification
- initialize_logging: Set up logging automatically (default: True)
"""
@property
def executors(self):
"""Read-only property returning tuple of configured executors."""
def validate_usage_tracking(self, level):
"""Validate usage tracking level is between 0 and 3."""
def get_usage_information(self):
"""Get configuration usage information for tracking."""Basic Configuration Example:
from parsl.config import Config
from parsl.executors import ThreadPoolExecutor, HighThroughputExecutor
from parsl.providers import LocalProvider
# Simple local configuration
config = Config(
executors=[
ThreadPoolExecutor(
max_threads=4,
label='local_threads'
)
]
)
# More complex configuration with multiple executors
config = Config(
executors=[
ThreadPoolExecutor(max_threads=2, label='light_tasks'),
HighThroughputExecutor(
label='heavy_compute',
max_workers=8,
provider=LocalProvider(
init_blocks=1,
max_blocks=2,
)
)
],
app_cache=True,
retries=2,
checkpoint_mode='task_exit'
)Configure automatic checkpointing to enable workflow restart and recovery.
# Checkpoint modes:
# - 'dfk_exit': Checkpoint when DataFlowKernel exits
# - 'task_exit': Checkpoint after each task completion
# - 'periodic': Checkpoint at regular intervals
# - 'manual': Only checkpoint when explicitly called
# - None: Disable checkpointingCheckpointing Example:
from parsl.utils import get_all_checkpoints
# Configuration with checkpointing
config = Config(
executors=[ThreadPoolExecutor(max_threads=4)],
checkpoint_mode='task_exit', # Checkpoint after each task
checkpoint_files=get_all_checkpoints('checkpoints/'), # Load existing
run_dir='workflow_run_001'
)
# Periodic checkpointing
config = Config(
executors=[ThreadPoolExecutor(max_threads=4)],
checkpoint_mode='periodic',
checkpoint_period='00:10:00' # Every 10 minutes
)Configure workflow monitoring for performance tracking and resource usage analysis.
from parsl.monitoring import MonitoringHub
monitoring_config = MonitoringHub(
hub_address='localhost',
hub_port=55055,
monitoring_debug=False,
resource_monitoring_interval=30, # seconds
logging_endpoint=None,
logdir='monitoring_logs'
)Monitoring Example:
from parsl.monitoring import MonitoringHub
config = Config(
executors=[HighThroughputExecutor(max_workers=4)],
monitoring=MonitoringHub(
hub_address='localhost',
hub_port=55055,
resource_monitoring_interval=10,
logdir='parsl_monitoring'
)
)Control Parsl's optional anonymous usage tracking for development insights.
from parsl.usage_tracking.levels import DISABLED, LEVEL_1, LEVEL_2, LEVEL_3
# Usage tracking levels:
# - DISABLED: No tracking
# - LEVEL_1: Basic usage statistics
# - LEVEL_2: Configuration and executor info
# - LEVEL_3: Detailed performance metrics (default)Usage Tracking Example:
from parsl.usage_tracking.levels import DISABLED
config = Config(
executors=[ThreadPoolExecutor(max_threads=4)],
usage_tracking=DISABLED # Disable usage tracking
)Advanced options for specialized workflow requirements and performance tuning.
# Retry and failure handling
def custom_retry_handler(exception, task_record):
"""Custom logic for determining retry behavior."""
return True # or False
def custom_failure_handler(exception, task_record):
"""Custom logic for handling task failures."""
pass
config = Config(
executors=[...],
retries=3,
retry_handler=custom_retry_handler,
task_failure_handler=custom_failure_handler,
max_idletime=300.0, # 5 minutes before cleanup
garbage_collect=True,
internal_tasks_max_threads=20
)Control behavior when using Parsl as a context manager with with parsl.load(config):.
# Exit modes:
# - 'cleanup': Cleanup DFK on exit without waiting
# - 'skip': Skip all shutdown behavior
# - 'wait': Wait for tasks when exiting normally, exit immediately on exceptionContext Manager Example:
import parsl
config = Config(
executors=[ThreadPoolExecutor(max_threads=4)],
exit_mode='wait' # Wait for completion on normal exit
)
with parsl.load(config):
# Submit tasks
futures = [my_app(i) for i in range(10)]
# Tasks will complete before exiting contextLoad configuration and manage DataFlowKernel lifecycle:
import parsl
# Load configuration
parsl.load(config)
# Check current configuration state
current_dfk = parsl.dfk() # Get current DataFlowKernel
# Wait for all tasks to complete
parsl.wait_for_current_tasks()
# Clear configuration and shutdown
parsl.clear()Parsl provides pre-configured templates for common computing environments:
# Local configurations
from parsl.configs.htex_local import config as htex_local
from parsl.configs.local_threads import config as local_threads
# HPC system configurations
from parsl.configs.stampede2 import config as stampede2_config
from parsl.configs.frontera import config as frontera_config
from parsl.configs.summit import config as summit_config
# Cloud configurations
from parsl.configs.ec2 import config as ec2_config
from parsl.configs.kubernetes import config as k8s_config
# Use pre-built config
parsl.load(htex_local)Common configuration validation patterns and error handling:
from parsl.errors import ConfigurationError
try:
parsl.load(config)
except ConfigurationError as e:
print(f"Configuration error: {e}")
# Handle configuration issues
# Validate executor labels are unique
executor_labels = [ex.label for ex in config.executors]
if len(executor_labels) != len(set(executor_labels)):
raise ConfigurationError("Executor labels must be unique")Install with Tessl CLI
npx tessl i tessl/pypi-parsl