or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

app-decorators.mdconfiguration.mddata-management.mdexecutors.mdindex.mdlaunchers.mdmonitoring.mdproviders.mdworkflow-management.md

data-management.mddocs/

0

# Data Management

1

2

Parsl's data management system handles file dependencies and data transfers across distributed computing resources through the `File` class and automatic staging mechanisms. It supports local files, remote files, and various transfer protocols including Globus.

3

4

## Capabilities

5

6

### File Class

7

8

The core data management class representing files with global and local paths, supporting various URI schemes and automatic staging.

9

10

```python { .api }

11

class File:

12

def __init__(self, url):

13

"""

14

Create a File object representing a local or remote file.

15

16

Parameters:

17

- url: File path or URI (str or PathLike)

18

Examples:

19

- 'input.txt' (local file)

20

- pathlib.Path('data/input.txt') (PathLike)

21

- 'file:///scratch/data/input.txt' (file URI)

22

- 'globus://endpoint-uuid/path/to/file' (Globus transfer)

23

- 'https://example.com/data.csv' (HTTP URL)

24

"""

25

26

@property

27

def filepath(self):

28

"""

29

Get the resolved file path for local access.

30

31

Returns:

32

str: Local file path where the file can be accessed

33

34

Raises:

35

ValueError: If no local path is available for remote files

36

"""

37

38

def cleancopy(self):

39

"""

40

Create a clean copy without local staging information.

41

42

Returns:

43

File: New File object with only global URI information

44

"""

45

46

def __str__(self):

47

"""Return filepath for string conversion."""

48

49

def __fspath__(self):

50

"""Support for os.PathLike interface."""

51

52

# Properties for URI components

53

url: str # Original URL/path

54

scheme: str # URI scheme (file, globus, https, etc.)

55

netloc: str # Network location component

56

path: str # Path component

57

filename: str # Base filename

58

local_path: str # Local staged path (set by staging system)

59

```

60

61

**Basic File Usage:**

62

63

```python

64

from parsl.data_provider.files import File

65

66

# Local files

67

input_file = File('data/input.txt')

68

output_file = File('results/output.txt')

69

70

# Remote files with staging

71

remote_file = File('globus://endpoint-id/remote/data.txt')

72

web_file = File('https://example.com/dataset.csv')

73

74

# Use in apps

75

@bash_app

76

def process_file(inputs=[], outputs=[]):

77

return f'process {inputs[0]} > {outputs[0]}'

78

79

future = process_file(

80

inputs=[input_file],

81

outputs=[output_file]

82

)

83

```

84

85

### File Dependencies in Apps

86

87

Automatic dependency management through `inputs` and `outputs` parameters in app functions.

88

89

```python { .api }

90

# App functions can specify file dependencies:

91

# - inputs: List of input File objects that must be available before execution

92

# - outputs: List of output File objects that will be produced by execution

93

# - stdout: File object or string for stdout redirection

94

# - stderr: File object or string for stderr redirection

95

```

96

97

**File Dependency Examples:**

98

99

```python

100

from parsl import python_app, bash_app

101

from parsl.data_provider.files import File

102

103

@bash_app

104

def preprocess_data(input_file, output_file, inputs=[], outputs=[]):

105

"""Preprocess data file."""

106

return f'sort {inputs[0]} | uniq > {outputs[0]}'

107

108

@python_app

109

def analyze_data(input_file, inputs=[]):

110

"""Analyze preprocessed data."""

111

with open(inputs[0], 'r') as f:

112

lines = f.readlines()

113

return len(lines)

114

115

@bash_app

116

def generate_report(analysis_result, output_file, outputs=[], stdout=None):

117

"""Generate analysis report."""

118

return f'echo "Analysis found {analysis_result} unique items" > {outputs[0]}'

119

120

# Create file dependency chain

121

raw_data = File('raw_data.txt')

122

clean_data = File('clean_data.txt')

123

report_file = File('report.txt')

124

log_file = File('analysis.log')

125

126

# Execute with automatic dependency resolution

127

preprocess_future = preprocess_data(

128

raw_data, clean_data,

129

inputs=[raw_data],

130

outputs=[clean_data]

131

)

132

133

analyze_future = analyze_data(

134

clean_data,

135

inputs=[clean_data] # Waits for preprocess_future to complete

136

)

137

138

report_future = generate_report(

139

analyze_future, # Waits for analyze_future result

140

report_file,

141

outputs=[report_file],

142

stdout=log_file

143

)

144

145

result = report_future.result()

146

```

147

148

### File Staging and Transfer

149

150

Automatic staging system for handling file transfers between submit node and execution nodes.

151

152

```python { .api }

153

# File staging modes:

154

# - Automatic: Files are staged based on scheme and executor configuration

155

# - Manual: Explicit staging control through staging directives

156

# - Shared filesystem: Direct file access without staging

157

```

158

159

**Staging Examples:**

160

161

```python

162

# Automatic staging for remote files

163

@bash_app

164

def process_remote_data(inputs=[], outputs=[]):

165

# File automatically staged to worker before execution

166

return f'analyze {inputs[0]} > {outputs[0]}'

167

168

remote_input = File('globus://remote-endpoint/large-dataset.dat')

169

local_output = File('analysis-results.txt')

170

171

future = process_remote_data(

172

inputs=[remote_input], # Automatically staged in

173

outputs=[local_output] # Automatically staged out

174

)

175

176

# Explicit staging control

177

from parsl.data_provider.staging import Staging

178

179

@bash_app

180

def custom_staging_app(inputs=[], outputs=[], staging=None):

181

return f'process {inputs[0]} > {outputs[0]}'

182

183

staging_config = Staging(

184

input_staging=['symlink'], # Create symlinks for inputs

185

output_staging=['move'] # Move outputs to final location

186

)

187

```

188

189

### Globus Data Transfer

190

191

Integration with Globus for high-performance data transfer between research institutions and computing facilities.

192

193

```python { .api }

194

# Globus URI format:

195

# globus://endpoint-uuid/path/to/file

196

# globus://endpoint-name#endpoint-uuid/path/to/file

197

198

# Authentication through parsl-globus-auth command-line tool

199

```

200

201

**Globus Transfer Example:**

202

203

```python

204

# Configure Globus endpoints

205

source_endpoint = "globus://university-cluster-uuid"

206

dest_endpoint = "globus://supercomputer-uuid"

207

208

# Create Globus file references

209

input_dataset = File(f'{source_endpoint}/research/dataset.h5')

210

results_file = File(f'{dest_endpoint}/scratch/results.out')

211

212

@python_app

213

def analyze_large_dataset(inputs=[], outputs=[]):

214

"""Analyze large dataset transferred via Globus."""

215

import h5py

216

217

# File automatically transferred and available locally

218

with h5py.File(inputs[0], 'r') as f:

219

data = f['dataset'][:]

220

result = data.mean()

221

222

# Write results to output file

223

with open(outputs[0], 'w') as f:

224

f.write(f"Mean value: {result}\n")

225

226

return result

227

228

# Execute with automatic Globus transfer

229

future = analyze_large_dataset(

230

inputs=[input_dataset], # Transferred from university cluster

231

outputs=[results_file] # Transferred to supercomputer storage

232

)

233

234

result = future.result()

235

```

236

237

### Data Staging Configuration

238

239

Configure data staging behavior through executor and provider settings.

240

241

```python { .api }

242

# Executor staging configuration

243

from parsl.data_provider.staging import Staging

244

245

staging_config = Staging(

246

# Input staging strategies: 'copy', 'symlink', 'move', 'none'

247

input_staging=['copy'],

248

249

# Output staging strategies: 'copy', 'symlink', 'move', 'none'

250

output_staging=['move'],

251

252

# Stage-in timeout in seconds

253

stage_in_timeout=300,

254

255

# Stage-out timeout in seconds

256

stage_out_timeout=300

257

)

258

```

259

260

**Advanced Staging Configuration:**

261

262

```python

263

from parsl.executors import HighThroughputExecutor

264

from parsl.providers import SlurmProvider

265

from parsl.data_provider.staging import Staging

266

267

# Configure executor with custom staging

268

htex = HighThroughputExecutor(

269

label='data_intensive',

270

provider=SlurmProvider(

271

partition='gpu',

272

nodes_per_block=1,

273

max_blocks=10

274

),

275

storage_access=[staging_config], # Apply staging configuration

276

working_dir='/tmp/parsl_work' # Local working directory

277

)

278

279

# Shared filesystem configuration (no staging)

280

shared_fs_executor = HighThroughputExecutor(

281

label='shared_storage',

282

provider=SlurmProvider(partition='shared'),

283

# No staging configuration - direct file access assumed

284

)

285

```

286

287

### File Pattern Matching

288

289

Support for file globbing and pattern matching in workflows.

290

291

```python

292

import glob

293

from parsl.data_provider.files import File

294

295

@python_app

296

def process_multiple_files(pattern, outputs=[]):

297

"""Process multiple files matching a pattern."""

298

import glob

299

files = glob.glob(pattern)

300

301

results = []

302

for filepath in files:

303

# Process each file

304

with open(filepath, 'r') as f:

305

results.append(len(f.readlines()))

306

307

# Write aggregated results

308

with open(outputs[0], 'w') as f:

309

for i, count in enumerate(results):

310

f.write(f"File {i}: {count} lines\n")

311

312

return sum(results)

313

314

# Process all CSV files in directory

315

output_summary = File('file_summary.txt')

316

future = process_multiple_files(

317

'data/*.csv',

318

outputs=[output_summary]

319

)

320

321

total_lines = future.result()

322

```

323

324

### Error Handling

325

326

Handle file-related errors and staging failures.

327

328

```python

329

from parsl.data_provider.files import File

330

from parsl.executors.errors import FileStagingError

331

332

try:

333

# Attempt to access remote file

334

remote_file = File('globus://invalid-endpoint/missing-file.dat')

335

336

@bash_app

337

def process_file(inputs=[], outputs=[]):

338

return f'process {inputs[0]} > {outputs[0]}'

339

340

future = process_file(

341

inputs=[remote_file],

342

outputs=[File('result.out')]

343

)

344

345

result = future.result()

346

347

except FileStagingError as e:

348

print(f"File staging failed: {e}")

349

# Handle staging error (retry, use alternative file, etc.)

350

351

except FileNotFoundError as e:

352

print(f"File not found: {e}")

353

# Handle missing file error

354

355

# Check file accessibility before use

356

if remote_file.scheme == 'globus':

357

# Verify Globus endpoint is accessible

358

print(f"Using Globus endpoint: {remote_file.netloc}")

359

```