0
# Delayed
1
2
Build custom task graphs with lazy evaluation for any Python function. Delayed enables flexible parallel workflows by wrapping functions and values to create task graphs that execute only when explicitly computed.
3
4
## Capabilities
5
6
### Delayed Function Creation
7
8
Create delayed versions of functions and wrap values for lazy evaluation.
9
10
```python { .api }
11
def delayed(func=None, *, pure=None, nout=None, name=None, traverse=True):
12
"""
13
Decorator to create delayed functions or wrap objects.
14
15
Parameters:
16
- func: Function to make delayed (None for decorator use)
17
- pure: Whether function is pure (no side effects, enables caching)
18
- nout: Number of outputs for functions returning multiple values
19
- name: Custom name for tasks in graph
20
- traverse: Whether to traverse and delay nested collections
21
22
Returns:
23
Delayed function or object
24
"""
25
```
26
27
### Core Delayed Class
28
29
The Delayed class represents lazy computations in task graphs.
30
31
```python { .api }
32
class Delayed:
33
"""
34
Lazy evaluation wrapper for building task graphs.
35
36
Properties:
37
- key: str - Unique identifier for this computation
38
- dask: dict - Task graph dictionary
39
"""
40
41
def compute(self, scheduler=None, get=None, **kwargs):
42
"""
43
Compute delayed object and return result.
44
45
Parameters:
46
- scheduler: Scheduler to use ('threads', 'processes', etc.)
47
- get: Custom scheduler function
48
- **kwargs: Additional scheduler arguments
49
50
Returns:
51
Computed result of the delayed operation
52
"""
53
54
def persist(self, scheduler=None, get=None, **kwargs):
55
"""
56
Persist delayed object in memory for reuse.
57
58
Parameters:
59
- scheduler: Scheduler for persistence
60
- get: Custom scheduler function
61
- **kwargs: Additional scheduler arguments
62
63
Returns:
64
dask.delayed.Delayed: Persisted delayed object
65
"""
66
67
def visualize(self, filename=None, format=None, optimize_graph=False,
68
**kwargs):
69
"""
70
Visualize the task graph for this delayed object.
71
72
Parameters:
73
- filename: Output file path
74
- format: Output format ('png', 'svg', 'pdf', etc.)
75
- optimize_graph: Whether to optimize before visualization
76
- **kwargs: Additional graphviz parameters
77
78
Returns:
79
Graphviz object or None if filename specified
80
"""
81
82
def __call__(self, *args, **kwargs):
83
"""Call delayed object as function with arguments."""
84
85
def __getattr__(self, attr):
86
"""Access attributes of delayed object."""
87
88
def __getitem__(self, key):
89
"""Index into delayed object."""
90
91
def __setitem__(self, key, value):
92
"""Set item in delayed object."""
93
94
def __iter__(self):
95
"""Iterate over delayed object."""
96
97
def __len__(self):
98
"""Get length of delayed object."""
99
100
def __bool__(self):
101
"""Boolean evaluation of delayed object."""
102
```
103
104
### Delayed Operations
105
106
Mathematical and logical operations on delayed objects.
107
108
```python { .api }
109
# Arithmetic operations create new delayed objects
110
# delayed_obj + other -> Delayed
111
# delayed_obj - other -> Delayed
112
# delayed_obj * other -> Delayed
113
# delayed_obj / other -> Delayed
114
# delayed_obj // other -> Delayed
115
# delayed_obj % other -> Delayed
116
# delayed_obj ** other -> Delayed
117
118
# Comparison operations
119
# delayed_obj == other -> Delayed
120
# delayed_obj != other -> Delayed
121
# delayed_obj < other -> Delayed
122
# delayed_obj <= other -> Delayed
123
# delayed_obj > other -> Delayed
124
# delayed_obj >= other -> Delayed
125
126
# Logical operations
127
# delayed_obj & other -> Delayed
128
# delayed_obj | other -> Delayed
129
# delayed_obj ^ other -> Delayed
130
# ~delayed_obj -> Delayed
131
132
# Unary operations
133
# -delayed_obj -> Delayed
134
# +delayed_obj -> Delayed
135
# abs(delayed_obj) -> Delayed
136
```
137
138
### Task Graph Construction
139
140
Build complex task graphs by composing delayed operations.
141
142
```python { .api }
143
@delayed
144
def custom_function(arg1, arg2, **kwargs):
145
"""
146
Custom delayed function example.
147
148
Any function can be made delayed using the @delayed decorator.
149
The function will not execute until .compute() is called.
150
"""
151
# Function implementation
152
return result
153
154
# Delayed wrapper for existing functions
155
delayed_func = delayed(existing_function)
156
157
# Delayed values
158
delayed_value = delayed(some_value)
159
160
# Function composition creates task graphs
161
@delayed
162
def step1(data):
163
return process_data(data)
164
165
@delayed
166
def step2(processed_data):
167
return analyze_data(processed_data)
168
169
@delayed
170
def step3(analysis_result):
171
return generate_report(analysis_result)
172
173
# Build computation pipeline
174
data = delayed(load_data())
175
processed = step1(data)
176
analyzed = step2(processed)
177
report = step3(analyzed)
178
179
# Execute entire pipeline
180
final_result = report.compute()
181
```
182
183
### Multiple Return Values
184
185
Handle functions that return multiple values.
186
187
```python { .api }
188
@delayed(nout=2)
189
def function_with_multiple_returns(data):
190
"""
191
Function returning multiple values.
192
193
The nout parameter specifies number of return values,
194
enabling proper unpacking of delayed results.
195
"""
196
result1 = process_part1(data)
197
result2 = process_part2(data)
198
return result1, result2
199
200
# Unpack multiple returns
201
data = delayed(load_data())
202
part1, part2 = function_with_multiple_returns(data)
203
204
# Use each part independently
205
analysis1 = delayed(analyze)(part1)
206
analysis2 = delayed(analyze)(part2)
207
208
# Combine results
209
final = delayed(combine)(analysis1, analysis2)
210
result = final.compute()
211
```
212
213
### Pure Functions and Caching
214
215
Enable caching for pure functions without side effects.
216
217
```python { .api }
218
@delayed(pure=True)
219
def expensive_pure_function(data):
220
"""
221
Pure function with no side effects.
222
223
Setting pure=True enables caching of results,
224
improving performance for repeated calls with same arguments.
225
"""
226
# Expensive computation
227
return expensive_result
228
229
# Results will be cached automatically
230
data = delayed(load_data())
231
result1 = expensive_pure_function(data)
232
result2 = expensive_pure_function(data) # Uses cached result
233
234
combined = delayed(combine)(result1, result2)
235
final = combined.compute()
236
```
237
238
### Integration with Collections
239
240
Convert between delayed objects and other Dask collections.
241
242
```python { .api }
243
# Convert collections to delayed
244
import dask.array as da
245
import dask.dataframe as dd
246
247
array = da.random.random((1000, 1000), chunks=(100, 100))
248
delayed_array = array.to_delayed()
249
250
dataframe = dd.read_csv('data.csv')
251
delayed_dataframe = dataframe.to_delayed()
252
253
# Convert delayed to collections
254
delayed_values = [delayed(load_partition)(i) for i in range(10)]
255
bag = db.from_delayed(delayed_values)
256
257
delayed_dfs = [delayed(load_dataframe)(f) for f in files]
258
combined_df = dd.from_delayed(delayed_dfs)
259
```
260
261
### Error Handling
262
263
Handle errors in delayed computations.
264
265
```python { .api }
266
@delayed
267
def risky_function(data):
268
"""Function that might raise exceptions."""
269
if data is None:
270
raise ValueError("Data cannot be None")
271
return process_data(data)
272
273
@delayed
274
def safe_wrapper(data):
275
"""Wrapper with error handling."""
276
try:
277
return risky_function(data)
278
except ValueError as e:
279
return f"Error: {e}"
280
281
# Errors are raised during compute()
282
data = delayed(None)
283
result = safe_wrapper(data)
284
output = result.compute() # Returns error message
285
```
286
287
## Usage Examples
288
289
### Basic Delayed Workflow
290
291
```python
292
from dask.delayed import delayed
293
import pandas as pd
294
295
@delayed
296
def load_data(filename):
297
return pd.read_csv(filename)
298
299
@delayed
300
def clean_data(df):
301
return df.dropna().reset_index(drop=True)
302
303
@delayed
304
def analyze_data(df):
305
return df.describe()
306
307
@delayed
308
def save_results(analysis, filename):
309
analysis.to_csv(filename)
310
return f"Saved to {filename}"
311
312
# Build computation graph
313
filename = 'data.csv'
314
raw_data = load_data(filename)
315
clean_data_result = clean_data(raw_data)
316
analysis = analyze_data(clean_data_result)
317
save_status = save_results(analysis, 'results.csv')
318
319
# Execute computation
320
result = save_status.compute()
321
print(result)
322
```
323
324
### Parallel Processing Pipeline
325
326
```python
327
from dask.delayed import delayed
328
import pandas as pd
329
330
@delayed
331
def process_file(filename):
332
"""Process single file."""
333
df = pd.read_csv(filename)
334
# Complex processing logic
335
return df.groupby('category').value.sum()
336
337
@delayed
338
def combine_results(results):
339
"""Combine results from multiple files."""
340
return pd.concat(results, axis=1).fillna(0)
341
342
# Process multiple files in parallel
343
files = ['file1.csv', 'file2.csv', 'file3.csv', 'file4.csv']
344
processed = [process_file(f) for f in files]
345
346
# Combine all results
347
combined = combine_results(processed)
348
final_result = combined.compute()
349
```
350
351
### Dynamic Task Graphs
352
353
```python
354
from dask.delayed import delayed
355
356
@delayed
357
def generate_data(seed, size):
358
"""Generate synthetic data."""
359
import numpy as np
360
np.random.seed(seed)
361
return np.random.random(size)
362
363
@delayed
364
def process_batch(data_batch, method='mean'):
365
"""Process a batch of data."""
366
if method == 'mean':
367
return data_batch.mean()
368
elif method == 'sum':
369
return data_batch.sum()
370
else:
371
return data_batch.std()
372
373
@delayed
374
def aggregate_results(results):
375
"""Aggregate all batch results."""
376
return sum(results) / len(results)
377
378
# Dynamic graph based on parameters
379
n_batches = 5
380
batch_size = 1000
381
method = 'mean'
382
383
# Generate data batches
384
data_batches = [generate_data(i, batch_size) for i in range(n_batches)]
385
386
# Process each batch
387
batch_results = [process_batch(batch, method) for batch in data_batches]
388
389
# Aggregate final result
390
final_result = aggregate_results(batch_results)
391
answer = final_result.compute()
392
```
393
394
### Custom Class with Delayed Methods
395
396
```python
397
from dask.delayed import delayed
398
399
class DelayedProcessor:
400
"""Class with delayed methods for data processing."""
401
402
def __init__(self, params):
403
self.params = delayed(params)
404
405
@delayed
406
def load_data(self, source):
407
"""Load data from source."""
408
# Implementation
409
return loaded_data
410
411
@delayed
412
def preprocess(self, data):
413
"""Preprocess the data."""
414
# Use self.params for configuration
415
return preprocessed_data
416
417
@delayed
418
def train_model(self, data):
419
"""Train model on data."""
420
return trained_model
421
422
@delayed
423
def evaluate(self, model, test_data):
424
"""Evaluate model performance."""
425
return evaluation_metrics
426
427
# Use delayed class methods
428
processor = DelayedProcessor({'param1': 10, 'param2': 'setting'})
429
430
# Build computation pipeline
431
raw_data = processor.load_data('data_source')
432
clean_data = processor.preprocess(raw_data)
433
model = processor.train_model(clean_data)
434
435
test_data = processor.load_data('test_source')
436
clean_test = processor.preprocess(test_data)
437
metrics = processor.evaluate(model, clean_test)
438
439
# Execute entire pipeline
440
final_metrics = metrics.compute()
441
```
442
443
### Integration with Other Collections
444
445
```python
446
import dask.array as da
447
import dask.dataframe as dd
448
from dask.delayed import delayed
449
450
@delayed
451
def custom_analysis(array_data, df_data):
452
"""Custom analysis combining array and dataframe."""
453
array_stats = {
454
'mean': array_data.mean(),
455
'std': array_data.std()
456
}
457
458
df_stats = {
459
'row_count': len(df_data),
460
'col_count': len(df_data.columns)
461
}
462
463
return {'array': array_stats, 'dataframe': df_stats}
464
465
# Create collections
466
array = da.random.random((10000, 100), chunks=(1000, 100))
467
dataframe = dd.read_csv('large_file.csv')
468
469
# Convert to delayed for custom processing
470
delayed_array = array.compute_chunk_sizes().to_delayed().flatten()[0]
471
delayed_df = dataframe.to_delayed()[0]
472
473
# Custom analysis
474
analysis = custom_analysis(delayed_array, delayed_df)
475
result = analysis.compute()
476
```