CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-parsl

Parallel scripting library for executing workflows across diverse computing resources

Pending
Overview
Eval results
Files

configuration.mddocs/

Configuration

Parsl's configuration system provides comprehensive control over workflow execution, resource management, monitoring, and checkpointing through the Config class.

Capabilities

Configuration Class

The main configuration class that binds together executors, monitoring, checkpointing, and execution policies.

class Config:
    def __init__(self, executors=None, app_cache=True, checkpoint_files=None,
                 checkpoint_mode=None, checkpoint_period=None,
                 dependency_resolver=None, exit_mode='cleanup',
                 garbage_collect=True, internal_tasks_max_threads=10,
                 retries=0, retry_handler=None, run_dir='runinfo',
                 std_autopath=None, strategy='simple', strategy_period=5,
                 max_idletime=120.0, monitoring=None, usage_tracking=0,
                 project_name=None, initialize_logging=True):
        """
        Parsl configuration specification.
        
        Parameters:
        - executors: List of ParslExecutor instances (default: [ThreadPoolExecutor()])
        - app_cache: Enable app result caching (default: True)
        - checkpoint_files: List of checkpoint file paths to load
        - checkpoint_mode: 'dfk_exit', 'task_exit', 'periodic', 'manual', or None
        - checkpoint_period: Time interval for periodic checkpointing (HH:MM:SS)
        - dependency_resolver: Custom dependency resolver plugin
        - exit_mode: Context manager exit behavior ('cleanup', 'skip', 'wait')
        - garbage_collect: Enable task garbage collection (default: True)
        - internal_tasks_max_threads: Max threads for internal operations (default: 10)
        - retries: Default retry count for failed tasks (default: 0)
        - retry_handler: Custom retry handler function
        - run_dir: Directory for Parsl runtime information (default: 'runinfo')
        - std_autopath: Standard output auto-path function
        - strategy: Task execution strategy ('simple', 'htex_auto_scale', 'none')
        - strategy_period: Strategy evaluation period in seconds (default: 5)
        - max_idletime: Max idle time before cleanup in seconds (default: 120.0)
        - monitoring: MonitoringHub instance for workflow monitoring
        - usage_tracking: Usage tracking level 0-3 (default: 0)
        - project_name: Project name for usage tracking identification
        - initialize_logging: Set up logging automatically (default: True)
        """
        
    @property
    def executors(self):
        """Read-only property returning tuple of configured executors."""
        
    def validate_usage_tracking(self, level):
        """Validate usage tracking level is between 0 and 3."""
        
    def get_usage_information(self):
        """Get configuration usage information for tracking."""

Basic Configuration Example:

from parsl.config import Config
from parsl.executors import ThreadPoolExecutor, HighThroughputExecutor
from parsl.providers import LocalProvider

# Simple local configuration
config = Config(
    executors=[
        ThreadPoolExecutor(
            max_threads=4,
            label='local_threads'
        )
    ]
)

# More complex configuration with multiple executors
config = Config(
    executors=[
        ThreadPoolExecutor(max_threads=2, label='light_tasks'),
        HighThroughputExecutor(
            label='heavy_compute',
            max_workers=8,
            provider=LocalProvider(
                init_blocks=1,
                max_blocks=2,
            )
        )
    ],
    app_cache=True,
    retries=2,
    checkpoint_mode='task_exit'
)

Checkpointing Configuration

Configure automatic checkpointing to enable workflow restart and recovery.

# Checkpoint modes:
# - 'dfk_exit': Checkpoint when DataFlowKernel exits
# - 'task_exit': Checkpoint after each task completion  
# - 'periodic': Checkpoint at regular intervals
# - 'manual': Only checkpoint when explicitly called
# - None: Disable checkpointing

Checkpointing Example:

from parsl.utils import get_all_checkpoints

# Configuration with checkpointing
config = Config(
    executors=[ThreadPoolExecutor(max_threads=4)],
    checkpoint_mode='task_exit',  # Checkpoint after each task
    checkpoint_files=get_all_checkpoints('checkpoints/'),  # Load existing
    run_dir='workflow_run_001'
)

# Periodic checkpointing
config = Config(
    executors=[ThreadPoolExecutor(max_threads=4)],
    checkpoint_mode='periodic',
    checkpoint_period='00:10:00'  # Every 10 minutes
)

Monitoring Configuration

Configure workflow monitoring for performance tracking and resource usage analysis.

from parsl.monitoring import MonitoringHub

monitoring_config = MonitoringHub(
    hub_address='localhost',
    hub_port=55055,
    monitoring_debug=False,
    resource_monitoring_interval=30,  # seconds
    logging_endpoint=None,
    logdir='monitoring_logs'
)

Monitoring Example:

from parsl.monitoring import MonitoringHub

config = Config(
    executors=[HighThroughputExecutor(max_workers=4)],
    monitoring=MonitoringHub(
        hub_address='localhost',
        hub_port=55055,
        resource_monitoring_interval=10,
        logdir='parsl_monitoring'
    )
)

Usage Tracking Configuration

Control Parsl's optional anonymous usage tracking for development insights.

from parsl.usage_tracking.levels import DISABLED, LEVEL_1, LEVEL_2, LEVEL_3

# Usage tracking levels:
# - DISABLED: No tracking
# - LEVEL_1: Basic usage statistics
# - LEVEL_2: Configuration and executor info  
# - LEVEL_3: Detailed performance metrics (default)

Usage Tracking Example:

from parsl.usage_tracking.levels import DISABLED

config = Config(
    executors=[ThreadPoolExecutor(max_threads=4)],
    usage_tracking=DISABLED  # Disable usage tracking
)

Advanced Configuration Options

Advanced options for specialized workflow requirements and performance tuning.

# Retry and failure handling
def custom_retry_handler(exception, task_record):
    """Custom logic for determining retry behavior."""
    return True  # or False

def custom_failure_handler(exception, task_record):
    """Custom logic for handling task failures.""" 
    pass

config = Config(
    executors=[...],
    retries=3,
    retry_handler=custom_retry_handler,
    task_failure_handler=custom_failure_handler,
    max_idletime=300.0,  # 5 minutes before cleanup
    garbage_collect=True,
    internal_tasks_max_threads=20
)

Exit Mode Configuration

Control behavior when using Parsl as a context manager with with parsl.load(config):.

# Exit modes:
# - 'cleanup': Cleanup DFK on exit without waiting
# - 'skip': Skip all shutdown behavior
# - 'wait': Wait for tasks when exiting normally, exit immediately on exception

Context Manager Example:

import parsl

config = Config(
    executors=[ThreadPoolExecutor(max_threads=4)],
    exit_mode='wait'  # Wait for completion on normal exit
)

with parsl.load(config):
    # Submit tasks
    futures = [my_app(i) for i in range(10)]
    # Tasks will complete before exiting context

Configuration Loading

Load configuration and manage DataFlowKernel lifecycle:

import parsl

# Load configuration
parsl.load(config)

# Check current configuration state
current_dfk = parsl.dfk()  # Get current DataFlowKernel

# Wait for all tasks to complete
parsl.wait_for_current_tasks()

# Clear configuration and shutdown
parsl.clear()

Pre-built Configuration Templates

Parsl provides pre-configured templates for common computing environments:

# Local configurations
from parsl.configs.htex_local import config as htex_local
from parsl.configs.local_threads import config as local_threads

# HPC system configurations  
from parsl.configs.stampede2 import config as stampede2_config
from parsl.configs.frontera import config as frontera_config
from parsl.configs.summit import config as summit_config

# Cloud configurations
from parsl.configs.ec2 import config as ec2_config
from parsl.configs.kubernetes import config as k8s_config

# Use pre-built config
parsl.load(htex_local)

Configuration Validation

Common configuration validation patterns and error handling:

from parsl.errors import ConfigurationError

try:
    parsl.load(config)
except ConfigurationError as e:
    print(f"Configuration error: {e}")
    # Handle configuration issues
    
# Validate executor labels are unique
executor_labels = [ex.label for ex in config.executors]
if len(executor_labels) != len(set(executor_labels)):
    raise ConfigurationError("Executor labels must be unique")

Install with Tessl CLI

npx tessl i tessl/pypi-parsl

docs

app-decorators.md

configuration.md

data-management.md

executors.md

index.md

launchers.md

monitoring.md

providers.md

workflow-management.md

tile.json