CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-parsl

Parallel scripting library for executing workflows across diverse computing resources

Pending
Overview
Eval results
Files

data-management.mddocs/

Data Management

Parsl's data management system handles file dependencies and data transfers across distributed computing resources through the File class and automatic staging mechanisms. It supports local files, remote files, and various transfer protocols including Globus.

Capabilities

File Class

The core data management class representing files with global and local paths, supporting various URI schemes and automatic staging.

class File:
    def __init__(self, url):
        """
        Create a File object representing a local or remote file.
        
        Parameters:
        - url: File path or URI (str or PathLike)
          Examples:
          - 'input.txt' (local file)
          - pathlib.Path('data/input.txt') (PathLike)
          - 'file:///scratch/data/input.txt' (file URI)
          - 'globus://endpoint-uuid/path/to/file' (Globus transfer)
          - 'https://example.com/data.csv' (HTTP URL)
        """
        
    @property
    def filepath(self):
        """
        Get the resolved file path for local access.
        
        Returns:
        str: Local file path where the file can be accessed
        
        Raises:
        ValueError: If no local path is available for remote files
        """
        
    def cleancopy(self):
        """
        Create a clean copy without local staging information.
        
        Returns:
        File: New File object with only global URI information
        """
        
    def __str__(self):
        """Return filepath for string conversion."""
        
    def __fspath__(self):
        """Support for os.PathLike interface."""
        
    # Properties for URI components
    url: str          # Original URL/path
    scheme: str       # URI scheme (file, globus, https, etc.)
    netloc: str       # Network location component
    path: str         # Path component  
    filename: str     # Base filename
    local_path: str   # Local staged path (set by staging system)

Basic File Usage:

from parsl.data_provider.files import File

# Local files
input_file = File('data/input.txt')
output_file = File('results/output.txt')

# Remote files with staging
remote_file = File('globus://endpoint-id/remote/data.txt')
web_file = File('https://example.com/dataset.csv')

# Use in apps
@bash_app
def process_file(inputs=[], outputs=[]):
    return f'process {inputs[0]} > {outputs[0]}'

future = process_file(
    inputs=[input_file],
    outputs=[output_file]
)

File Dependencies in Apps

Automatic dependency management through inputs and outputs parameters in app functions.

# App functions can specify file dependencies:
# - inputs: List of input File objects that must be available before execution
# - outputs: List of output File objects that will be produced by execution
# - stdout: File object or string for stdout redirection  
# - stderr: File object or string for stderr redirection

File Dependency Examples:

from parsl import python_app, bash_app
from parsl.data_provider.files import File

@bash_app
def preprocess_data(input_file, output_file, inputs=[], outputs=[]):
    """Preprocess data file."""
    return f'sort {inputs[0]} | uniq > {outputs[0]}'

@python_app
def analyze_data(input_file, inputs=[]):
    """Analyze preprocessed data."""
    with open(inputs[0], 'r') as f:
        lines = f.readlines()
    return len(lines)

@bash_app  
def generate_report(analysis_result, output_file, outputs=[], stdout=None):
    """Generate analysis report."""
    return f'echo "Analysis found {analysis_result} unique items" > {outputs[0]}'

# Create file dependency chain
raw_data = File('raw_data.txt')
clean_data = File('clean_data.txt')
report_file = File('report.txt')
log_file = File('analysis.log')

# Execute with automatic dependency resolution
preprocess_future = preprocess_data(
    raw_data, clean_data,
    inputs=[raw_data],
    outputs=[clean_data]
)

analyze_future = analyze_data(
    clean_data,
    inputs=[clean_data]  # Waits for preprocess_future to complete
)

report_future = generate_report(
    analyze_future,  # Waits for analyze_future result
    report_file,
    outputs=[report_file],
    stdout=log_file
)

result = report_future.result()

File Staging and Transfer

Automatic staging system for handling file transfers between submit node and execution nodes.

# File staging modes:
# - Automatic: Files are staged based on scheme and executor configuration
# - Manual: Explicit staging control through staging directives
# - Shared filesystem: Direct file access without staging

Staging Examples:

# Automatic staging for remote files
@bash_app
def process_remote_data(inputs=[], outputs=[]):
    # File automatically staged to worker before execution
    return f'analyze {inputs[0]} > {outputs[0]}'

remote_input = File('globus://remote-endpoint/large-dataset.dat')
local_output = File('analysis-results.txt')

future = process_remote_data(
    inputs=[remote_input],   # Automatically staged in
    outputs=[local_output]   # Automatically staged out
)

# Explicit staging control
from parsl.data_provider.staging import Staging

@bash_app
def custom_staging_app(inputs=[], outputs=[], staging=None):
    return f'process {inputs[0]} > {outputs[0]}'

staging_config = Staging(
    input_staging=['symlink'],   # Create symlinks for inputs
    output_staging=['move']      # Move outputs to final location
)

Globus Data Transfer

Integration with Globus for high-performance data transfer between research institutions and computing facilities.

# Globus URI format:
# globus://endpoint-uuid/path/to/file
# globus://endpoint-name#endpoint-uuid/path/to/file

# Authentication through parsl-globus-auth command-line tool

Globus Transfer Example:

# Configure Globus endpoints
source_endpoint = "globus://university-cluster-uuid"
dest_endpoint = "globus://supercomputer-uuid"

# Create Globus file references
input_dataset = File(f'{source_endpoint}/research/dataset.h5')
results_file = File(f'{dest_endpoint}/scratch/results.out')

@python_app
def analyze_large_dataset(inputs=[], outputs=[]):
    """Analyze large dataset transferred via Globus.""" 
    import h5py
    
    # File automatically transferred and available locally
    with h5py.File(inputs[0], 'r') as f:
        data = f['dataset'][:]
        result = data.mean()
    
    # Write results to output file
    with open(outputs[0], 'w') as f:
        f.write(f"Mean value: {result}\n")
    
    return result

# Execute with automatic Globus transfer
future = analyze_large_dataset(
    inputs=[input_dataset],    # Transferred from university cluster
    outputs=[results_file]     # Transferred to supercomputer storage
)

result = future.result()

Data Staging Configuration

Configure data staging behavior through executor and provider settings.

# Executor staging configuration
from parsl.data_provider.staging import Staging

staging_config = Staging(
    # Input staging strategies: 'copy', 'symlink', 'move', 'none'
    input_staging=['copy'],
    
    # Output staging strategies: 'copy', 'symlink', 'move', 'none'  
    output_staging=['move'],
    
    # Stage-in timeout in seconds
    stage_in_timeout=300,
    
    # Stage-out timeout in seconds
    stage_out_timeout=300
)

Advanced Staging Configuration:

from parsl.executors import HighThroughputExecutor
from parsl.providers import SlurmProvider
from parsl.data_provider.staging import Staging

# Configure executor with custom staging
htex = HighThroughputExecutor(
    label='data_intensive',
    provider=SlurmProvider(
        partition='gpu',
        nodes_per_block=1,
        max_blocks=10
    ),
    storage_access=[staging_config],  # Apply staging configuration
    working_dir='/tmp/parsl_work'     # Local working directory
)

# Shared filesystem configuration (no staging)
shared_fs_executor = HighThroughputExecutor(
    label='shared_storage',
    provider=SlurmProvider(partition='shared'),
    # No staging configuration - direct file access assumed
)

File Pattern Matching

Support for file globbing and pattern matching in workflows.

import glob
from parsl.data_provider.files import File

@python_app
def process_multiple_files(pattern, outputs=[]):
    """Process multiple files matching a pattern."""
    import glob
    files = glob.glob(pattern)
    
    results = []
    for filepath in files:
        # Process each file
        with open(filepath, 'r') as f:
            results.append(len(f.readlines()))
    
    # Write aggregated results
    with open(outputs[0], 'w') as f:
        for i, count in enumerate(results):
            f.write(f"File {i}: {count} lines\n")
    
    return sum(results)

# Process all CSV files in directory
output_summary = File('file_summary.txt')
future = process_multiple_files(
    'data/*.csv',
    outputs=[output_summary]
)

total_lines = future.result()

Error Handling

Handle file-related errors and staging failures.

from parsl.data_provider.files import File
from parsl.executors.errors import FileStagingError

try:
    # Attempt to access remote file
    remote_file = File('globus://invalid-endpoint/missing-file.dat')
    
    @bash_app
    def process_file(inputs=[], outputs=[]):
        return f'process {inputs[0]} > {outputs[0]}'
    
    future = process_file(
        inputs=[remote_file],
        outputs=[File('result.out')]
    )
    
    result = future.result()
    
except FileStagingError as e:
    print(f"File staging failed: {e}")
    # Handle staging error (retry, use alternative file, etc.)

except FileNotFoundError as e:
    print(f"File not found: {e}")
    # Handle missing file error

# Check file accessibility before use
if remote_file.scheme == 'globus':
    # Verify Globus endpoint is accessible
    print(f"Using Globus endpoint: {remote_file.netloc}")

Install with Tessl CLI

npx tessl i tessl/pypi-parsl

docs

app-decorators.md

configuration.md

data-management.md

executors.md

index.md

launchers.md

monitoring.md

providers.md

workflow-management.md

tile.json