Parallel scripting library for executing workflows across diverse computing resources
—
Parsl's data management system handles file dependencies and data transfers across distributed computing resources through the File class and automatic staging mechanisms. It supports local files, remote files, and various transfer protocols including Globus.
The core data management class representing files with global and local paths, supporting various URI schemes and automatic staging.
class File:
def __init__(self, url):
"""
Create a File object representing a local or remote file.
Parameters:
- url: File path or URI (str or PathLike)
Examples:
- 'input.txt' (local file)
- pathlib.Path('data/input.txt') (PathLike)
- 'file:///scratch/data/input.txt' (file URI)
- 'globus://endpoint-uuid/path/to/file' (Globus transfer)
- 'https://example.com/data.csv' (HTTP URL)
"""
@property
def filepath(self):
"""
Get the resolved file path for local access.
Returns:
str: Local file path where the file can be accessed
Raises:
ValueError: If no local path is available for remote files
"""
def cleancopy(self):
"""
Create a clean copy without local staging information.
Returns:
File: New File object with only global URI information
"""
def __str__(self):
"""Return filepath for string conversion."""
def __fspath__(self):
"""Support for os.PathLike interface."""
# Properties for URI components
url: str # Original URL/path
scheme: str # URI scheme (file, globus, https, etc.)
netloc: str # Network location component
path: str # Path component
filename: str # Base filename
local_path: str # Local staged path (set by staging system)Basic File Usage:
from parsl.data_provider.files import File
# Local files
input_file = File('data/input.txt')
output_file = File('results/output.txt')
# Remote files with staging
remote_file = File('globus://endpoint-id/remote/data.txt')
web_file = File('https://example.com/dataset.csv')
# Use in apps
@bash_app
def process_file(inputs=[], outputs=[]):
return f'process {inputs[0]} > {outputs[0]}'
future = process_file(
inputs=[input_file],
outputs=[output_file]
)Automatic dependency management through inputs and outputs parameters in app functions.
# App functions can specify file dependencies:
# - inputs: List of input File objects that must be available before execution
# - outputs: List of output File objects that will be produced by execution
# - stdout: File object or string for stdout redirection
# - stderr: File object or string for stderr redirectionFile Dependency Examples:
from parsl import python_app, bash_app
from parsl.data_provider.files import File
@bash_app
def preprocess_data(input_file, output_file, inputs=[], outputs=[]):
"""Preprocess data file."""
return f'sort {inputs[0]} | uniq > {outputs[0]}'
@python_app
def analyze_data(input_file, inputs=[]):
"""Analyze preprocessed data."""
with open(inputs[0], 'r') as f:
lines = f.readlines()
return len(lines)
@bash_app
def generate_report(analysis_result, output_file, outputs=[], stdout=None):
"""Generate analysis report."""
return f'echo "Analysis found {analysis_result} unique items" > {outputs[0]}'
# Create file dependency chain
raw_data = File('raw_data.txt')
clean_data = File('clean_data.txt')
report_file = File('report.txt')
log_file = File('analysis.log')
# Execute with automatic dependency resolution
preprocess_future = preprocess_data(
raw_data, clean_data,
inputs=[raw_data],
outputs=[clean_data]
)
analyze_future = analyze_data(
clean_data,
inputs=[clean_data] # Waits for preprocess_future to complete
)
report_future = generate_report(
analyze_future, # Waits for analyze_future result
report_file,
outputs=[report_file],
stdout=log_file
)
result = report_future.result()Automatic staging system for handling file transfers between submit node and execution nodes.
# File staging modes:
# - Automatic: Files are staged based on scheme and executor configuration
# - Manual: Explicit staging control through staging directives
# - Shared filesystem: Direct file access without stagingStaging Examples:
# Automatic staging for remote files
@bash_app
def process_remote_data(inputs=[], outputs=[]):
# File automatically staged to worker before execution
return f'analyze {inputs[0]} > {outputs[0]}'
remote_input = File('globus://remote-endpoint/large-dataset.dat')
local_output = File('analysis-results.txt')
future = process_remote_data(
inputs=[remote_input], # Automatically staged in
outputs=[local_output] # Automatically staged out
)
# Explicit staging control
from parsl.data_provider.staging import Staging
@bash_app
def custom_staging_app(inputs=[], outputs=[], staging=None):
return f'process {inputs[0]} > {outputs[0]}'
staging_config = Staging(
input_staging=['symlink'], # Create symlinks for inputs
output_staging=['move'] # Move outputs to final location
)Integration with Globus for high-performance data transfer between research institutions and computing facilities.
# Globus URI format:
# globus://endpoint-uuid/path/to/file
# globus://endpoint-name#endpoint-uuid/path/to/file
# Authentication through parsl-globus-auth command-line toolGlobus Transfer Example:
# Configure Globus endpoints
source_endpoint = "globus://university-cluster-uuid"
dest_endpoint = "globus://supercomputer-uuid"
# Create Globus file references
input_dataset = File(f'{source_endpoint}/research/dataset.h5')
results_file = File(f'{dest_endpoint}/scratch/results.out')
@python_app
def analyze_large_dataset(inputs=[], outputs=[]):
"""Analyze large dataset transferred via Globus."""
import h5py
# File automatically transferred and available locally
with h5py.File(inputs[0], 'r') as f:
data = f['dataset'][:]
result = data.mean()
# Write results to output file
with open(outputs[0], 'w') as f:
f.write(f"Mean value: {result}\n")
return result
# Execute with automatic Globus transfer
future = analyze_large_dataset(
inputs=[input_dataset], # Transferred from university cluster
outputs=[results_file] # Transferred to supercomputer storage
)
result = future.result()Configure data staging behavior through executor and provider settings.
# Executor staging configuration
from parsl.data_provider.staging import Staging
staging_config = Staging(
# Input staging strategies: 'copy', 'symlink', 'move', 'none'
input_staging=['copy'],
# Output staging strategies: 'copy', 'symlink', 'move', 'none'
output_staging=['move'],
# Stage-in timeout in seconds
stage_in_timeout=300,
# Stage-out timeout in seconds
stage_out_timeout=300
)Advanced Staging Configuration:
from parsl.executors import HighThroughputExecutor
from parsl.providers import SlurmProvider
from parsl.data_provider.staging import Staging
# Configure executor with custom staging
htex = HighThroughputExecutor(
label='data_intensive',
provider=SlurmProvider(
partition='gpu',
nodes_per_block=1,
max_blocks=10
),
storage_access=[staging_config], # Apply staging configuration
working_dir='/tmp/parsl_work' # Local working directory
)
# Shared filesystem configuration (no staging)
shared_fs_executor = HighThroughputExecutor(
label='shared_storage',
provider=SlurmProvider(partition='shared'),
# No staging configuration - direct file access assumed
)Support for file globbing and pattern matching in workflows.
import glob
from parsl.data_provider.files import File
@python_app
def process_multiple_files(pattern, outputs=[]):
"""Process multiple files matching a pattern."""
import glob
files = glob.glob(pattern)
results = []
for filepath in files:
# Process each file
with open(filepath, 'r') as f:
results.append(len(f.readlines()))
# Write aggregated results
with open(outputs[0], 'w') as f:
for i, count in enumerate(results):
f.write(f"File {i}: {count} lines\n")
return sum(results)
# Process all CSV files in directory
output_summary = File('file_summary.txt')
future = process_multiple_files(
'data/*.csv',
outputs=[output_summary]
)
total_lines = future.result()Handle file-related errors and staging failures.
from parsl.data_provider.files import File
from parsl.executors.errors import FileStagingError
try:
# Attempt to access remote file
remote_file = File('globus://invalid-endpoint/missing-file.dat')
@bash_app
def process_file(inputs=[], outputs=[]):
return f'process {inputs[0]} > {outputs[0]}'
future = process_file(
inputs=[remote_file],
outputs=[File('result.out')]
)
result = future.result()
except FileStagingError as e:
print(f"File staging failed: {e}")
# Handle staging error (retry, use alternative file, etc.)
except FileNotFoundError as e:
print(f"File not found: {e}")
# Handle missing file error
# Check file accessibility before use
if remote_file.scheme == 'globus':
# Verify Globus endpoint is accessible
print(f"Using Globus endpoint: {remote_file.netloc}")Install with Tessl CLI
npx tessl i tessl/pypi-parsl