CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-parsl

Parallel scripting library for executing workflows across diverse computing resources

Pending
Overview
Eval results
Files

workflow-management.mddocs/

Workflow Management

Parsl's workflow management system provides functions for loading configurations, managing execution state, controlling task execution, and handling the DataFlowKernel lifecycle.

Capabilities

Configuration Loading and Management

Load Parsl configurations and manage the DataFlowKernel that orchestrates workflow execution.

def load(config, close_bad_executors=True, disable_cleanup=False):
    """
    Load a Parsl configuration and initialize the DataFlowKernel.
    
    Parameters:
    - config: Config object specifying executors and execution policies
    - close_bad_executors: Close executors that fail to start (default: True)
    - disable_cleanup: Disable automatic cleanup on exit (default: False)
    
    Returns:
    DataFlowKernel: The initialized workflow execution engine
    
    Raises:
    RuntimeError: If a DataFlowKernel is already loaded
    ConfigurationError: If configuration is invalid
    """

def clear():
    """
    Clear the current DataFlowKernel and shut down all executors.
    
    This function blocks until all running tasks complete and resources
    are properly cleaned up.
    """

@property
def dfk():
    """
    Access the current DataFlowKernel instance.
    
    Returns:
    DataFlowKernel: Current DFK instance
    
    Raises:
    NoDataFlowKernelError: If no DFK is currently loaded
    """

Basic Workflow Management:

import parsl
from parsl.config import Config
from parsl.executors import ThreadPoolExecutor

# Load configuration
config = Config(executors=[ThreadPoolExecutor(max_threads=4)])
dfk = parsl.load(config)

# Access current DataFlowKernel
current_dfk = parsl.dfk()
print(f"DFK ID: {current_dfk.run_id}")

# Submit tasks
futures = [my_app(i) for i in range(10)]

# Clean shutdown
parsl.clear()

Task Execution Control

Control and monitor task execution across the workflow.

def wait_for_current_tasks():
    """
    Wait for all currently submitted tasks to complete.
    
    This function blocks until all tasks submitted to the current
    DataFlowKernel have finished executing (successfully or with errors).
    """

Task Execution Examples:

import parsl
from parsl import python_app

@python_app
def compute_task(x):
    import time
    time.sleep(x)  # Simulate work
    return x ** 2

# Load configuration
parsl.load(config)

# Submit batch of tasks
futures = []
for i in range(1, 6):
    future = compute_task(i)
    futures.append(future)
    print(f"Submitted task {i}")

# Wait for all current tasks to complete
print("Waiting for all tasks...")
parsl.wait_for_current_tasks()
print("All tasks completed")

# Collect results
results = [f.result() for f in futures]
print(f"Results: {results}")

parsl.clear()

Context Manager Usage

Use Parsl as a context manager for automatic resource management.

# Context manager syntax:
# with parsl.load(config):
#     # Submit and execute tasks
#     # Automatic cleanup on exit based on config.exit_mode

Context Manager Examples:

import parsl
from parsl.config import Config
from parsl.executors import ThreadPoolExecutor

# Basic context manager usage
config = Config(
    executors=[ThreadPoolExecutor(max_threads=4)],
    exit_mode='wait'  # Wait for tasks on normal exit
)

with parsl.load(config):
    # Submit tasks within context
    futures = [compute_task(i) for i in range(5)]
    
    # Tasks automatically complete before exiting context
    results = [f.result() for f in futures]
    print(f"Completed: {results}")

# DFK automatically cleared when exiting context

# Exception handling with context manager
try:
    with parsl.load(config):
        # Submit tasks
        futures = [risky_task(i) for i in range(10)]
        
        # If exception occurs, behavior depends on exit_mode
        raise ValueError("Something went wrong")
        
except ValueError as e:
    print(f"Workflow failed: {e}")
    # DFK still cleaned up properly

DataFlowKernel Access

Direct access to the DataFlowKernel for advanced workflow control and monitoring.

class DataFlowKernel:
    """
    Core workflow execution engine managing task scheduling and dependencies.
    
    Key properties:
    - run_id: Unique identifier for this DFK instance
    - executors: Dictionary of configured executors
    - config: Current configuration object
    - task_count: Number of tasks submitted
    - tasks: Dictionary of task records
    """
    
    def cleanup(self):
        """Clean up DFK resources and shut down executors."""
        
    def wait_for_current_tasks(self):
        """Wait for all current tasks to complete."""
        
    def submit(self, func, *args, **kwargs):
        """Submit a task for execution (internal use)."""

Advanced DFK Usage:

import parsl

# Load configuration and access DFK
parsl.load(config)
dfk = parsl.dfk()

# Monitor workflow state
print(f"Run ID: {dfk.run_id}")
print(f"Executors: {list(dfk.executors.keys())}")
print(f"Tasks submitted: {dfk.task_count}")

# Submit tasks and monitor
futures = [compute_task(i) for i in range(10)]

# Check task states
for task_id, task_record in dfk.tasks.items():
    print(f"Task {task_id}: {task_record.status}")

# Wait for completion
dfk.wait_for_current_tasks()

parsl.clear()

Workflow State Management

Manage workflow execution state, including checkpointing and recovery.

# Checkpoint and recovery functions (accessed through config):
# - checkpoint_mode: When to create checkpoints
# - checkpoint_files: Previous checkpoints to load
# - checkpoint_period: Interval for periodic checkpointing

Checkpointing Example:

from parsl.config import Config
from parsl.utils import get_all_checkpoints, get_last_checkpoint

# Configuration with checkpointing
config = Config(
    executors=[ThreadPoolExecutor(max_threads=4)],
    checkpoint_mode='task_exit',  # Checkpoint after each task
    checkpoint_files=get_all_checkpoints('workflow_checkpoints/'),
    run_dir='workflow_run_20240101'
)

with parsl.load(config):
    # Submit long-running workflow
    futures = [long_running_task(i) for i in range(100)]
    
    # Tasks are checkpointed automatically
    # Workflow can be resumed if interrupted
    
    results = [f.result() for f in futures]

# Recovery from checkpoint
recovery_config = Config(
    executors=[ThreadPoolExecutor(max_threads=4)],
    checkpoint_files=[get_last_checkpoint('workflow_checkpoints/')],
    run_dir='workflow_run_20240101_recovery'
)

# Resume from last checkpoint
with parsl.load(recovery_config):
    # Only incomplete tasks will be re-executed
    print("Resumed from checkpoint")

Workflow Error Handling

Handle workflow-level errors and exceptions.

from parsl.errors import NoDataFlowKernelError, ConfigurationError

# Common workflow errors:
# - NoDataFlowKernelError: No DFK loaded when required
# - ConfigurationError: Invalid configuration
# - ExecutorError: Executor initialization or operation failure

Error Handling Examples:

import parsl
from parsl.errors import NoDataFlowKernelError, ConfigurationError

# Handle missing DataFlowKernel
try:
    # Attempt to access DFK without loading
    dfk = parsl.dfk()
except NoDataFlowKernelError:
    print("No DataFlowKernel loaded. Loading configuration...")
    parsl.load(config)
    dfk = parsl.dfk()

# Handle configuration errors
try:
    invalid_config = Config(executors=[])  # Empty executor list
    parsl.load(invalid_config)
except ConfigurationError as e:
    print(f"Configuration error: {e}")
    # Load valid configuration instead
    parsl.load(valid_config)

# Handle executor failures
try:
    parsl.load(config)
except Exception as e:
    print(f"Failed to initialize executors: {e}")
    # Try alternative configuration
    fallback_config = Config(executors=[ThreadPoolExecutor(max_threads=2)])
    parsl.load(fallback_config)

# Graceful workflow shutdown
def safe_shutdown():
    """Safely shutdown Parsl workflow."""
    try:
        # Wait for tasks with timeout
        parsl.wait_for_current_tasks()
    except KeyboardInterrupt:
        print("Interrupt received, shutting down...")
    finally:
        # Always clear DFK
        try:
            parsl.clear()
        except:
            pass  # Already cleared or never loaded

# Use in main workflow
if __name__ == "__main__":
    try:
        parsl.load(config)
        
        # Execute workflow
        futures = [my_task(i) for i in range(100)]
        results = [f.result() for f in futures]
        
    except KeyboardInterrupt:
        print("Workflow interrupted by user")
    except Exception as e:
        print(f"Workflow failed: {e}")
    finally:
        safe_shutdown()

Workflow Patterns

Common patterns for structuring Parsl workflows.

Sequential Workflow:

@python_app
def step1(input_data):
    return process_step1(input_data)

@python_app  
def step2(step1_result):
    return process_step2(step1_result)

@python_app
def step3(step2_result):
    return process_step3(step2_result)

# Sequential execution with dependencies
with parsl.load(config):
    result1 = step1("initial_data")
    result2 = step2(result1)  # Waits for step1
    result3 = step3(result2)  # Waits for step2
    
    final_result = result3.result()

Parallel Workflow:

@python_app
def parallel_task(item):
    return process_item(item)

@python_app
def aggregate_results(futures_list):
    """Aggregate results from parallel tasks."""
    results = [f.result() for f in futures_list]
    return combine_results(results)

# Parallel execution with aggregation
with parsl.load(config):
    # Launch parallel tasks
    futures = [parallel_task(item) for item in data_items]
    
    # Aggregate results
    final_result = aggregate_results(futures)
    print(f"Aggregated result: {final_result.result()}")

Map-Reduce Workflow:

@python_app
def map_task(data_chunk):
    """Map operation on data chunk."""
    return [transform(item) for item in data_chunk]

@python_app
def reduce_task(mapped_results):
    """Reduce operation on mapped results."""
    flattened = [item for sublist in mapped_results for item in sublist]
    return aggregate(flattened)

# Map-reduce pattern
with parsl.load(config):
    # Split data into chunks
    data_chunks = split_data(large_dataset, num_chunks=10)
    
    # Map phase - parallel processing
    map_futures = [map_task(chunk) for chunk in data_chunks]
    
    # Reduce phase - aggregate results
    final_result = reduce_task(map_futures)
    print(f"Final result: {final_result.result()}")

Install with Tessl CLI

npx tessl i tessl/pypi-parsl

docs

app-decorators.md

configuration.md

data-management.md

executors.md

index.md

launchers.md

monitoring.md

providers.md

workflow-management.md

tile.json