0
# Data Management
1
2
Parsl's data management system handles file dependencies and data transfers across distributed computing resources through the `File` class and automatic staging mechanisms. It supports local files, remote files, and various transfer protocols including Globus.
3
4
## Capabilities
5
6
### File Class
7
8
The core data management class representing files with global and local paths, supporting various URI schemes and automatic staging.
9
10
```python { .api }
11
class File:
12
def __init__(self, url):
13
"""
14
Create a File object representing a local or remote file.
15
16
Parameters:
17
- url: File path or URI (str or PathLike)
18
Examples:
19
- 'input.txt' (local file)
20
- pathlib.Path('data/input.txt') (PathLike)
21
- 'file:///scratch/data/input.txt' (file URI)
22
- 'globus://endpoint-uuid/path/to/file' (Globus transfer)
23
- 'https://example.com/data.csv' (HTTP URL)
24
"""
25
26
@property
27
def filepath(self):
28
"""
29
Get the resolved file path for local access.
30
31
Returns:
32
str: Local file path where the file can be accessed
33
34
Raises:
35
ValueError: If no local path is available for remote files
36
"""
37
38
def cleancopy(self):
39
"""
40
Create a clean copy without local staging information.
41
42
Returns:
43
File: New File object with only global URI information
44
"""
45
46
def __str__(self):
47
"""Return filepath for string conversion."""
48
49
def __fspath__(self):
50
"""Support for os.PathLike interface."""
51
52
# Properties for URI components
53
url: str # Original URL/path
54
scheme: str # URI scheme (file, globus, https, etc.)
55
netloc: str # Network location component
56
path: str # Path component
57
filename: str # Base filename
58
local_path: str # Local staged path (set by staging system)
59
```
60
61
**Basic File Usage:**
62
63
```python
64
from parsl.data_provider.files import File
65
66
# Local files
67
input_file = File('data/input.txt')
68
output_file = File('results/output.txt')
69
70
# Remote files with staging
71
remote_file = File('globus://endpoint-id/remote/data.txt')
72
web_file = File('https://example.com/dataset.csv')
73
74
# Use in apps
75
@bash_app
76
def process_file(inputs=[], outputs=[]):
77
return f'process {inputs[0]} > {outputs[0]}'
78
79
future = process_file(
80
inputs=[input_file],
81
outputs=[output_file]
82
)
83
```
84
85
### File Dependencies in Apps
86
87
Automatic dependency management through `inputs` and `outputs` parameters in app functions.
88
89
```python { .api }
90
# App functions can specify file dependencies:
91
# - inputs: List of input File objects that must be available before execution
92
# - outputs: List of output File objects that will be produced by execution
93
# - stdout: File object or string for stdout redirection
94
# - stderr: File object or string for stderr redirection
95
```
96
97
**File Dependency Examples:**
98
99
```python
100
from parsl import python_app, bash_app
101
from parsl.data_provider.files import File
102
103
@bash_app
104
def preprocess_data(input_file, output_file, inputs=[], outputs=[]):
105
"""Preprocess data file."""
106
return f'sort {inputs[0]} | uniq > {outputs[0]}'
107
108
@python_app
109
def analyze_data(input_file, inputs=[]):
110
"""Analyze preprocessed data."""
111
with open(inputs[0], 'r') as f:
112
lines = f.readlines()
113
return len(lines)
114
115
@bash_app
116
def generate_report(analysis_result, output_file, outputs=[], stdout=None):
117
"""Generate analysis report."""
118
return f'echo "Analysis found {analysis_result} unique items" > {outputs[0]}'
119
120
# Create file dependency chain
121
raw_data = File('raw_data.txt')
122
clean_data = File('clean_data.txt')
123
report_file = File('report.txt')
124
log_file = File('analysis.log')
125
126
# Execute with automatic dependency resolution
127
preprocess_future = preprocess_data(
128
raw_data, clean_data,
129
inputs=[raw_data],
130
outputs=[clean_data]
131
)
132
133
analyze_future = analyze_data(
134
clean_data,
135
inputs=[clean_data] # Waits for preprocess_future to complete
136
)
137
138
report_future = generate_report(
139
analyze_future, # Waits for analyze_future result
140
report_file,
141
outputs=[report_file],
142
stdout=log_file
143
)
144
145
result = report_future.result()
146
```
147
148
### File Staging and Transfer
149
150
Automatic staging system for handling file transfers between submit node and execution nodes.
151
152
```python { .api }
153
# File staging modes:
154
# - Automatic: Files are staged based on scheme and executor configuration
155
# - Manual: Explicit staging control through staging directives
156
# - Shared filesystem: Direct file access without staging
157
```
158
159
**Staging Examples:**
160
161
```python
162
# Automatic staging for remote files
163
@bash_app
164
def process_remote_data(inputs=[], outputs=[]):
165
# File automatically staged to worker before execution
166
return f'analyze {inputs[0]} > {outputs[0]}'
167
168
remote_input = File('globus://remote-endpoint/large-dataset.dat')
169
local_output = File('analysis-results.txt')
170
171
future = process_remote_data(
172
inputs=[remote_input], # Automatically staged in
173
outputs=[local_output] # Automatically staged out
174
)
175
176
# Explicit staging control
177
from parsl.data_provider.staging import Staging
178
179
@bash_app
180
def custom_staging_app(inputs=[], outputs=[], staging=None):
181
return f'process {inputs[0]} > {outputs[0]}'
182
183
staging_config = Staging(
184
input_staging=['symlink'], # Create symlinks for inputs
185
output_staging=['move'] # Move outputs to final location
186
)
187
```
188
189
### Globus Data Transfer
190
191
Integration with Globus for high-performance data transfer between research institutions and computing facilities.
192
193
```python { .api }
194
# Globus URI format:
195
# globus://endpoint-uuid/path/to/file
196
# globus://endpoint-name#endpoint-uuid/path/to/file
197
198
# Authentication through parsl-globus-auth command-line tool
199
```
200
201
**Globus Transfer Example:**
202
203
```python
204
# Configure Globus endpoints
205
source_endpoint = "globus://university-cluster-uuid"
206
dest_endpoint = "globus://supercomputer-uuid"
207
208
# Create Globus file references
209
input_dataset = File(f'{source_endpoint}/research/dataset.h5')
210
results_file = File(f'{dest_endpoint}/scratch/results.out')
211
212
@python_app
213
def analyze_large_dataset(inputs=[], outputs=[]):
214
"""Analyze large dataset transferred via Globus."""
215
import h5py
216
217
# File automatically transferred and available locally
218
with h5py.File(inputs[0], 'r') as f:
219
data = f['dataset'][:]
220
result = data.mean()
221
222
# Write results to output file
223
with open(outputs[0], 'w') as f:
224
f.write(f"Mean value: {result}\n")
225
226
return result
227
228
# Execute with automatic Globus transfer
229
future = analyze_large_dataset(
230
inputs=[input_dataset], # Transferred from university cluster
231
outputs=[results_file] # Transferred to supercomputer storage
232
)
233
234
result = future.result()
235
```
236
237
### Data Staging Configuration
238
239
Configure data staging behavior through executor and provider settings.
240
241
```python { .api }
242
# Executor staging configuration
243
from parsl.data_provider.staging import Staging
244
245
staging_config = Staging(
246
# Input staging strategies: 'copy', 'symlink', 'move', 'none'
247
input_staging=['copy'],
248
249
# Output staging strategies: 'copy', 'symlink', 'move', 'none'
250
output_staging=['move'],
251
252
# Stage-in timeout in seconds
253
stage_in_timeout=300,
254
255
# Stage-out timeout in seconds
256
stage_out_timeout=300
257
)
258
```
259
260
**Advanced Staging Configuration:**
261
262
```python
263
from parsl.executors import HighThroughputExecutor
264
from parsl.providers import SlurmProvider
265
from parsl.data_provider.staging import Staging
266
267
# Configure executor with custom staging
268
htex = HighThroughputExecutor(
269
label='data_intensive',
270
provider=SlurmProvider(
271
partition='gpu',
272
nodes_per_block=1,
273
max_blocks=10
274
),
275
storage_access=[staging_config], # Apply staging configuration
276
working_dir='/tmp/parsl_work' # Local working directory
277
)
278
279
# Shared filesystem configuration (no staging)
280
shared_fs_executor = HighThroughputExecutor(
281
label='shared_storage',
282
provider=SlurmProvider(partition='shared'),
283
# No staging configuration - direct file access assumed
284
)
285
```
286
287
### File Pattern Matching
288
289
Support for file globbing and pattern matching in workflows.
290
291
```python
292
import glob
293
from parsl.data_provider.files import File
294
295
@python_app
296
def process_multiple_files(pattern, outputs=[]):
297
"""Process multiple files matching a pattern."""
298
import glob
299
files = glob.glob(pattern)
300
301
results = []
302
for filepath in files:
303
# Process each file
304
with open(filepath, 'r') as f:
305
results.append(len(f.readlines()))
306
307
# Write aggregated results
308
with open(outputs[0], 'w') as f:
309
for i, count in enumerate(results):
310
f.write(f"File {i}: {count} lines\n")
311
312
return sum(results)
313
314
# Process all CSV files in directory
315
output_summary = File('file_summary.txt')
316
future = process_multiple_files(
317
'data/*.csv',
318
outputs=[output_summary]
319
)
320
321
total_lines = future.result()
322
```
323
324
### Error Handling
325
326
Handle file-related errors and staging failures.
327
328
```python
329
from parsl.data_provider.files import File
330
from parsl.executors.errors import FileStagingError
331
332
try:
333
# Attempt to access remote file
334
remote_file = File('globus://invalid-endpoint/missing-file.dat')
335
336
@bash_app
337
def process_file(inputs=[], outputs=[]):
338
return f'process {inputs[0]} > {outputs[0]}'
339
340
future = process_file(
341
inputs=[remote_file],
342
outputs=[File('result.out')]
343
)
344
345
result = future.result()
346
347
except FileStagingError as e:
348
print(f"File staging failed: {e}")
349
# Handle staging error (retry, use alternative file, etc.)
350
351
except FileNotFoundError as e:
352
print(f"File not found: {e}")
353
# Handle missing file error
354
355
# Check file accessibility before use
356
if remote_file.scheme == 'globus':
357
# Verify Globus endpoint is accessible
358
print(f"Using Globus endpoint: {remote_file.netloc}")
359
```