or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

app-decorators.mdconfiguration.mddata-management.mdexecutors.mdindex.mdlaunchers.mdmonitoring.mdproviders.mdworkflow-management.md

workflow-management.mddocs/

0

# Workflow Management

1

2

Parsl's workflow management system provides functions for loading configurations, managing execution state, controlling task execution, and handling the DataFlowKernel lifecycle.

3

4

## Capabilities

5

6

### Configuration Loading and Management

7

8

Load Parsl configurations and manage the DataFlowKernel that orchestrates workflow execution.

9

10

```python { .api }

11

def load(config, close_bad_executors=True, disable_cleanup=False):

12

"""

13

Load a Parsl configuration and initialize the DataFlowKernel.

14

15

Parameters:

16

- config: Config object specifying executors and execution policies

17

- close_bad_executors: Close executors that fail to start (default: True)

18

- disable_cleanup: Disable automatic cleanup on exit (default: False)

19

20

Returns:

21

DataFlowKernel: The initialized workflow execution engine

22

23

Raises:

24

RuntimeError: If a DataFlowKernel is already loaded

25

ConfigurationError: If configuration is invalid

26

"""

27

28

def clear():

29

"""

30

Clear the current DataFlowKernel and shut down all executors.

31

32

This function blocks until all running tasks complete and resources

33

are properly cleaned up.

34

"""

35

36

@property

37

def dfk():

38

"""

39

Access the current DataFlowKernel instance.

40

41

Returns:

42

DataFlowKernel: Current DFK instance

43

44

Raises:

45

NoDataFlowKernelError: If no DFK is currently loaded

46

"""

47

```

48

49

**Basic Workflow Management:**

50

51

```python

52

import parsl

53

from parsl.config import Config

54

from parsl.executors import ThreadPoolExecutor

55

56

# Load configuration

57

config = Config(executors=[ThreadPoolExecutor(max_threads=4)])

58

dfk = parsl.load(config)

59

60

# Access current DataFlowKernel

61

current_dfk = parsl.dfk()

62

print(f"DFK ID: {current_dfk.run_id}")

63

64

# Submit tasks

65

futures = [my_app(i) for i in range(10)]

66

67

# Clean shutdown

68

parsl.clear()

69

```

70

71

### Task Execution Control

72

73

Control and monitor task execution across the workflow.

74

75

```python { .api }

76

def wait_for_current_tasks():

77

"""

78

Wait for all currently submitted tasks to complete.

79

80

This function blocks until all tasks submitted to the current

81

DataFlowKernel have finished executing (successfully or with errors).

82

"""

83

```

84

85

**Task Execution Examples:**

86

87

```python

88

import parsl

89

from parsl import python_app

90

91

@python_app

92

def compute_task(x):

93

import time

94

time.sleep(x) # Simulate work

95

return x ** 2

96

97

# Load configuration

98

parsl.load(config)

99

100

# Submit batch of tasks

101

futures = []

102

for i in range(1, 6):

103

future = compute_task(i)

104

futures.append(future)

105

print(f"Submitted task {i}")

106

107

# Wait for all current tasks to complete

108

print("Waiting for all tasks...")

109

parsl.wait_for_current_tasks()

110

print("All tasks completed")

111

112

# Collect results

113

results = [f.result() for f in futures]

114

print(f"Results: {results}")

115

116

parsl.clear()

117

```

118

119

### Context Manager Usage

120

121

Use Parsl as a context manager for automatic resource management.

122

123

```python { .api }

124

# Context manager syntax:

125

# with parsl.load(config):

126

# # Submit and execute tasks

127

# # Automatic cleanup on exit based on config.exit_mode

128

```

129

130

**Context Manager Examples:**

131

132

```python

133

import parsl

134

from parsl.config import Config

135

from parsl.executors import ThreadPoolExecutor

136

137

# Basic context manager usage

138

config = Config(

139

executors=[ThreadPoolExecutor(max_threads=4)],

140

exit_mode='wait' # Wait for tasks on normal exit

141

)

142

143

with parsl.load(config):

144

# Submit tasks within context

145

futures = [compute_task(i) for i in range(5)]

146

147

# Tasks automatically complete before exiting context

148

results = [f.result() for f in futures]

149

print(f"Completed: {results}")

150

151

# DFK automatically cleared when exiting context

152

153

# Exception handling with context manager

154

try:

155

with parsl.load(config):

156

# Submit tasks

157

futures = [risky_task(i) for i in range(10)]

158

159

# If exception occurs, behavior depends on exit_mode

160

raise ValueError("Something went wrong")

161

162

except ValueError as e:

163

print(f"Workflow failed: {e}")

164

# DFK still cleaned up properly

165

```

166

167

### DataFlowKernel Access

168

169

Direct access to the DataFlowKernel for advanced workflow control and monitoring.

170

171

```python { .api }

172

class DataFlowKernel:

173

"""

174

Core workflow execution engine managing task scheduling and dependencies.

175

176

Key properties:

177

- run_id: Unique identifier for this DFK instance

178

- executors: Dictionary of configured executors

179

- config: Current configuration object

180

- task_count: Number of tasks submitted

181

- tasks: Dictionary of task records

182

"""

183

184

def cleanup(self):

185

"""Clean up DFK resources and shut down executors."""

186

187

def wait_for_current_tasks(self):

188

"""Wait for all current tasks to complete."""

189

190

def submit(self, func, *args, **kwargs):

191

"""Submit a task for execution (internal use)."""

192

```

193

194

**Advanced DFK Usage:**

195

196

```python

197

import parsl

198

199

# Load configuration and access DFK

200

parsl.load(config)

201

dfk = parsl.dfk()

202

203

# Monitor workflow state

204

print(f"Run ID: {dfk.run_id}")

205

print(f"Executors: {list(dfk.executors.keys())}")

206

print(f"Tasks submitted: {dfk.task_count}")

207

208

# Submit tasks and monitor

209

futures = [compute_task(i) for i in range(10)]

210

211

# Check task states

212

for task_id, task_record in dfk.tasks.items():

213

print(f"Task {task_id}: {task_record.status}")

214

215

# Wait for completion

216

dfk.wait_for_current_tasks()

217

218

parsl.clear()

219

```

220

221

### Workflow State Management

222

223

Manage workflow execution state, including checkpointing and recovery.

224

225

```python { .api }

226

# Checkpoint and recovery functions (accessed through config):

227

# - checkpoint_mode: When to create checkpoints

228

# - checkpoint_files: Previous checkpoints to load

229

# - checkpoint_period: Interval for periodic checkpointing

230

```

231

232

**Checkpointing Example:**

233

234

```python

235

from parsl.config import Config

236

from parsl.utils import get_all_checkpoints, get_last_checkpoint

237

238

# Configuration with checkpointing

239

config = Config(

240

executors=[ThreadPoolExecutor(max_threads=4)],

241

checkpoint_mode='task_exit', # Checkpoint after each task

242

checkpoint_files=get_all_checkpoints('workflow_checkpoints/'),

243

run_dir='workflow_run_20240101'

244

)

245

246

with parsl.load(config):

247

# Submit long-running workflow

248

futures = [long_running_task(i) for i in range(100)]

249

250

# Tasks are checkpointed automatically

251

# Workflow can be resumed if interrupted

252

253

results = [f.result() for f in futures]

254

255

# Recovery from checkpoint

256

recovery_config = Config(

257

executors=[ThreadPoolExecutor(max_threads=4)],

258

checkpoint_files=[get_last_checkpoint('workflow_checkpoints/')],

259

run_dir='workflow_run_20240101_recovery'

260

)

261

262

# Resume from last checkpoint

263

with parsl.load(recovery_config):

264

# Only incomplete tasks will be re-executed

265

print("Resumed from checkpoint")

266

```

267

268

### Workflow Error Handling

269

270

Handle workflow-level errors and exceptions.

271

272

```python { .api }

273

from parsl.errors import NoDataFlowKernelError, ConfigurationError

274

275

# Common workflow errors:

276

# - NoDataFlowKernelError: No DFK loaded when required

277

# - ConfigurationError: Invalid configuration

278

# - ExecutorError: Executor initialization or operation failure

279

```

280

281

**Error Handling Examples:**

282

283

```python

284

import parsl

285

from parsl.errors import NoDataFlowKernelError, ConfigurationError

286

287

# Handle missing DataFlowKernel

288

try:

289

# Attempt to access DFK without loading

290

dfk = parsl.dfk()

291

except NoDataFlowKernelError:

292

print("No DataFlowKernel loaded. Loading configuration...")

293

parsl.load(config)

294

dfk = parsl.dfk()

295

296

# Handle configuration errors

297

try:

298

invalid_config = Config(executors=[]) # Empty executor list

299

parsl.load(invalid_config)

300

except ConfigurationError as e:

301

print(f"Configuration error: {e}")

302

# Load valid configuration instead

303

parsl.load(valid_config)

304

305

# Handle executor failures

306

try:

307

parsl.load(config)

308

except Exception as e:

309

print(f"Failed to initialize executors: {e}")

310

# Try alternative configuration

311

fallback_config = Config(executors=[ThreadPoolExecutor(max_threads=2)])

312

parsl.load(fallback_config)

313

314

# Graceful workflow shutdown

315

def safe_shutdown():

316

"""Safely shutdown Parsl workflow."""

317

try:

318

# Wait for tasks with timeout

319

parsl.wait_for_current_tasks()

320

except KeyboardInterrupt:

321

print("Interrupt received, shutting down...")

322

finally:

323

# Always clear DFK

324

try:

325

parsl.clear()

326

except:

327

pass # Already cleared or never loaded

328

329

# Use in main workflow

330

if __name__ == "__main__":

331

try:

332

parsl.load(config)

333

334

# Execute workflow

335

futures = [my_task(i) for i in range(100)]

336

results = [f.result() for f in futures]

337

338

except KeyboardInterrupt:

339

print("Workflow interrupted by user")

340

except Exception as e:

341

print(f"Workflow failed: {e}")

342

finally:

343

safe_shutdown()

344

```

345

346

### Workflow Patterns

347

348

Common patterns for structuring Parsl workflows.

349

350

**Sequential Workflow:**

351

352

```python

353

@python_app

354

def step1(input_data):

355

return process_step1(input_data)

356

357

@python_app

358

def step2(step1_result):

359

return process_step2(step1_result)

360

361

@python_app

362

def step3(step2_result):

363

return process_step3(step2_result)

364

365

# Sequential execution with dependencies

366

with parsl.load(config):

367

result1 = step1("initial_data")

368

result2 = step2(result1) # Waits for step1

369

result3 = step3(result2) # Waits for step2

370

371

final_result = result3.result()

372

```

373

374

**Parallel Workflow:**

375

376

```python

377

@python_app

378

def parallel_task(item):

379

return process_item(item)

380

381

@python_app

382

def aggregate_results(futures_list):

383

"""Aggregate results from parallel tasks."""

384

results = [f.result() for f in futures_list]

385

return combine_results(results)

386

387

# Parallel execution with aggregation

388

with parsl.load(config):

389

# Launch parallel tasks

390

futures = [parallel_task(item) for item in data_items]

391

392

# Aggregate results

393

final_result = aggregate_results(futures)

394

print(f"Aggregated result: {final_result.result()}")

395

```

396

397

**Map-Reduce Workflow:**

398

399

```python

400

@python_app

401

def map_task(data_chunk):

402

"""Map operation on data chunk."""

403

return [transform(item) for item in data_chunk]

404

405

@python_app

406

def reduce_task(mapped_results):

407

"""Reduce operation on mapped results."""

408

flattened = [item for sublist in mapped_results for item in sublist]

409

return aggregate(flattened)

410

411

# Map-reduce pattern

412

with parsl.load(config):

413

# Split data into chunks

414

data_chunks = split_data(large_dataset, num_chunks=10)

415

416

# Map phase - parallel processing

417

map_futures = [map_task(chunk) for chunk in data_chunks]

418

419

# Reduce phase - aggregate results

420

final_result = reduce_task(map_futures)

421

print(f"Final result: {final_result.result()}")

422

```