0
# Workflow Management
1
2
Parsl's workflow management system provides functions for loading configurations, managing execution state, controlling task execution, and handling the DataFlowKernel lifecycle.
3
4
## Capabilities
5
6
### Configuration Loading and Management
7
8
Load Parsl configurations and manage the DataFlowKernel that orchestrates workflow execution.
9
10
```python { .api }
11
def load(config, close_bad_executors=True, disable_cleanup=False):
12
"""
13
Load a Parsl configuration and initialize the DataFlowKernel.
14
15
Parameters:
16
- config: Config object specifying executors and execution policies
17
- close_bad_executors: Close executors that fail to start (default: True)
18
- disable_cleanup: Disable automatic cleanup on exit (default: False)
19
20
Returns:
21
DataFlowKernel: The initialized workflow execution engine
22
23
Raises:
24
RuntimeError: If a DataFlowKernel is already loaded
25
ConfigurationError: If configuration is invalid
26
"""
27
28
def clear():
29
"""
30
Clear the current DataFlowKernel and shut down all executors.
31
32
This function blocks until all running tasks complete and resources
33
are properly cleaned up.
34
"""
35
36
@property
37
def dfk():
38
"""
39
Access the current DataFlowKernel instance.
40
41
Returns:
42
DataFlowKernel: Current DFK instance
43
44
Raises:
45
NoDataFlowKernelError: If no DFK is currently loaded
46
"""
47
```
48
49
**Basic Workflow Management:**
50
51
```python
52
import parsl
53
from parsl.config import Config
54
from parsl.executors import ThreadPoolExecutor
55
56
# Load configuration
57
config = Config(executors=[ThreadPoolExecutor(max_threads=4)])
58
dfk = parsl.load(config)
59
60
# Access current DataFlowKernel
61
current_dfk = parsl.dfk()
62
print(f"DFK ID: {current_dfk.run_id}")
63
64
# Submit tasks
65
futures = [my_app(i) for i in range(10)]
66
67
# Clean shutdown
68
parsl.clear()
69
```
70
71
### Task Execution Control
72
73
Control and monitor task execution across the workflow.
74
75
```python { .api }
76
def wait_for_current_tasks():
77
"""
78
Wait for all currently submitted tasks to complete.
79
80
This function blocks until all tasks submitted to the current
81
DataFlowKernel have finished executing (successfully or with errors).
82
"""
83
```
84
85
**Task Execution Examples:**
86
87
```python
88
import parsl
89
from parsl import python_app
90
91
@python_app
92
def compute_task(x):
93
import time
94
time.sleep(x) # Simulate work
95
return x ** 2
96
97
# Load configuration
98
parsl.load(config)
99
100
# Submit batch of tasks
101
futures = []
102
for i in range(1, 6):
103
future = compute_task(i)
104
futures.append(future)
105
print(f"Submitted task {i}")
106
107
# Wait for all current tasks to complete
108
print("Waiting for all tasks...")
109
parsl.wait_for_current_tasks()
110
print("All tasks completed")
111
112
# Collect results
113
results = [f.result() for f in futures]
114
print(f"Results: {results}")
115
116
parsl.clear()
117
```
118
119
### Context Manager Usage
120
121
Use Parsl as a context manager for automatic resource management.
122
123
```python { .api }
124
# Context manager syntax:
125
# with parsl.load(config):
126
# # Submit and execute tasks
127
# # Automatic cleanup on exit based on config.exit_mode
128
```
129
130
**Context Manager Examples:**
131
132
```python
133
import parsl
134
from parsl.config import Config
135
from parsl.executors import ThreadPoolExecutor
136
137
# Basic context manager usage
138
config = Config(
139
executors=[ThreadPoolExecutor(max_threads=4)],
140
exit_mode='wait' # Wait for tasks on normal exit
141
)
142
143
with parsl.load(config):
144
# Submit tasks within context
145
futures = [compute_task(i) for i in range(5)]
146
147
# Tasks automatically complete before exiting context
148
results = [f.result() for f in futures]
149
print(f"Completed: {results}")
150
151
# DFK automatically cleared when exiting context
152
153
# Exception handling with context manager
154
try:
155
with parsl.load(config):
156
# Submit tasks
157
futures = [risky_task(i) for i in range(10)]
158
159
# If exception occurs, behavior depends on exit_mode
160
raise ValueError("Something went wrong")
161
162
except ValueError as e:
163
print(f"Workflow failed: {e}")
164
# DFK still cleaned up properly
165
```
166
167
### DataFlowKernel Access
168
169
Direct access to the DataFlowKernel for advanced workflow control and monitoring.
170
171
```python { .api }
172
class DataFlowKernel:
173
"""
174
Core workflow execution engine managing task scheduling and dependencies.
175
176
Key properties:
177
- run_id: Unique identifier for this DFK instance
178
- executors: Dictionary of configured executors
179
- config: Current configuration object
180
- task_count: Number of tasks submitted
181
- tasks: Dictionary of task records
182
"""
183
184
def cleanup(self):
185
"""Clean up DFK resources and shut down executors."""
186
187
def wait_for_current_tasks(self):
188
"""Wait for all current tasks to complete."""
189
190
def submit(self, func, *args, **kwargs):
191
"""Submit a task for execution (internal use)."""
192
```
193
194
**Advanced DFK Usage:**
195
196
```python
197
import parsl
198
199
# Load configuration and access DFK
200
parsl.load(config)
201
dfk = parsl.dfk()
202
203
# Monitor workflow state
204
print(f"Run ID: {dfk.run_id}")
205
print(f"Executors: {list(dfk.executors.keys())}")
206
print(f"Tasks submitted: {dfk.task_count}")
207
208
# Submit tasks and monitor
209
futures = [compute_task(i) for i in range(10)]
210
211
# Check task states
212
for task_id, task_record in dfk.tasks.items():
213
print(f"Task {task_id}: {task_record.status}")
214
215
# Wait for completion
216
dfk.wait_for_current_tasks()
217
218
parsl.clear()
219
```
220
221
### Workflow State Management
222
223
Manage workflow execution state, including checkpointing and recovery.
224
225
```python { .api }
226
# Checkpoint and recovery functions (accessed through config):
227
# - checkpoint_mode: When to create checkpoints
228
# - checkpoint_files: Previous checkpoints to load
229
# - checkpoint_period: Interval for periodic checkpointing
230
```
231
232
**Checkpointing Example:**
233
234
```python
235
from parsl.config import Config
236
from parsl.utils import get_all_checkpoints, get_last_checkpoint
237
238
# Configuration with checkpointing
239
config = Config(
240
executors=[ThreadPoolExecutor(max_threads=4)],
241
checkpoint_mode='task_exit', # Checkpoint after each task
242
checkpoint_files=get_all_checkpoints('workflow_checkpoints/'),
243
run_dir='workflow_run_20240101'
244
)
245
246
with parsl.load(config):
247
# Submit long-running workflow
248
futures = [long_running_task(i) for i in range(100)]
249
250
# Tasks are checkpointed automatically
251
# Workflow can be resumed if interrupted
252
253
results = [f.result() for f in futures]
254
255
# Recovery from checkpoint
256
recovery_config = Config(
257
executors=[ThreadPoolExecutor(max_threads=4)],
258
checkpoint_files=[get_last_checkpoint('workflow_checkpoints/')],
259
run_dir='workflow_run_20240101_recovery'
260
)
261
262
# Resume from last checkpoint
263
with parsl.load(recovery_config):
264
# Only incomplete tasks will be re-executed
265
print("Resumed from checkpoint")
266
```
267
268
### Workflow Error Handling
269
270
Handle workflow-level errors and exceptions.
271
272
```python { .api }
273
from parsl.errors import NoDataFlowKernelError, ConfigurationError
274
275
# Common workflow errors:
276
# - NoDataFlowKernelError: No DFK loaded when required
277
# - ConfigurationError: Invalid configuration
278
# - ExecutorError: Executor initialization or operation failure
279
```
280
281
**Error Handling Examples:**
282
283
```python
284
import parsl
285
from parsl.errors import NoDataFlowKernelError, ConfigurationError
286
287
# Handle missing DataFlowKernel
288
try:
289
# Attempt to access DFK without loading
290
dfk = parsl.dfk()
291
except NoDataFlowKernelError:
292
print("No DataFlowKernel loaded. Loading configuration...")
293
parsl.load(config)
294
dfk = parsl.dfk()
295
296
# Handle configuration errors
297
try:
298
invalid_config = Config(executors=[]) # Empty executor list
299
parsl.load(invalid_config)
300
except ConfigurationError as e:
301
print(f"Configuration error: {e}")
302
# Load valid configuration instead
303
parsl.load(valid_config)
304
305
# Handle executor failures
306
try:
307
parsl.load(config)
308
except Exception as e:
309
print(f"Failed to initialize executors: {e}")
310
# Try alternative configuration
311
fallback_config = Config(executors=[ThreadPoolExecutor(max_threads=2)])
312
parsl.load(fallback_config)
313
314
# Graceful workflow shutdown
315
def safe_shutdown():
316
"""Safely shutdown Parsl workflow."""
317
try:
318
# Wait for tasks with timeout
319
parsl.wait_for_current_tasks()
320
except KeyboardInterrupt:
321
print("Interrupt received, shutting down...")
322
finally:
323
# Always clear DFK
324
try:
325
parsl.clear()
326
except:
327
pass # Already cleared or never loaded
328
329
# Use in main workflow
330
if __name__ == "__main__":
331
try:
332
parsl.load(config)
333
334
# Execute workflow
335
futures = [my_task(i) for i in range(100)]
336
results = [f.result() for f in futures]
337
338
except KeyboardInterrupt:
339
print("Workflow interrupted by user")
340
except Exception as e:
341
print(f"Workflow failed: {e}")
342
finally:
343
safe_shutdown()
344
```
345
346
### Workflow Patterns
347
348
Common patterns for structuring Parsl workflows.
349
350
**Sequential Workflow:**
351
352
```python
353
@python_app
354
def step1(input_data):
355
return process_step1(input_data)
356
357
@python_app
358
def step2(step1_result):
359
return process_step2(step1_result)
360
361
@python_app
362
def step3(step2_result):
363
return process_step3(step2_result)
364
365
# Sequential execution with dependencies
366
with parsl.load(config):
367
result1 = step1("initial_data")
368
result2 = step2(result1) # Waits for step1
369
result3 = step3(result2) # Waits for step2
370
371
final_result = result3.result()
372
```
373
374
**Parallel Workflow:**
375
376
```python
377
@python_app
378
def parallel_task(item):
379
return process_item(item)
380
381
@python_app
382
def aggregate_results(futures_list):
383
"""Aggregate results from parallel tasks."""
384
results = [f.result() for f in futures_list]
385
return combine_results(results)
386
387
# Parallel execution with aggregation
388
with parsl.load(config):
389
# Launch parallel tasks
390
futures = [parallel_task(item) for item in data_items]
391
392
# Aggregate results
393
final_result = aggregate_results(futures)
394
print(f"Aggregated result: {final_result.result()}")
395
```
396
397
**Map-Reduce Workflow:**
398
399
```python
400
@python_app
401
def map_task(data_chunk):
402
"""Map operation on data chunk."""
403
return [transform(item) for item in data_chunk]
404
405
@python_app
406
def reduce_task(mapped_results):
407
"""Reduce operation on mapped results."""
408
flattened = [item for sublist in mapped_results for item in sublist]
409
return aggregate(flattened)
410
411
# Map-reduce pattern
412
with parsl.load(config):
413
# Split data into chunks
414
data_chunks = split_data(large_dataset, num_chunks=10)
415
416
# Map phase - parallel processing
417
map_futures = [map_task(chunk) for chunk in data_chunks]
418
419
# Reduce phase - aggregate results
420
final_result = reduce_task(map_futures)
421
print(f"Final result: {final_result.result()}")
422
```