0
# Core Functions
1
2
Essential functions for executing, optimizing, and managing Dask computations across all collection types. These functions provide the fundamental operations needed to work with any Dask collection.
3
4
## Imports
5
6
```python
7
import dask
8
from dask import compute, persist, optimize, visualize, delayed
9
from dask import is_dask_collection, get_annotations, annotate
10
from dask import tokenize, normalize_token
11
from dask.base import istask
12
from dask.local import get_sync as get
13
```
14
15
## Capabilities
16
17
### Computation Execution
18
19
Execute Dask collections to get concrete results, with support for different schedulers and optimization strategies.
20
21
```python { .api }
22
def compute(*collections, scheduler=None, get=None, optimize_graph=True,
23
pool=None, chunksize=None, **kwargs):
24
"""
25
Compute multiple dask collections, returning concrete results.
26
27
Parameters:
28
- *collections: Dask collections to compute
29
- scheduler: Scheduler to use ('threads', 'processes', 'single-threaded')
30
- get: Custom scheduler function
31
- optimize_graph: Whether to optimize task graphs before execution
32
- pool: Thread/process pool for computation
33
- chunksize: Chunk size for multiprocessing scheduler
34
- **kwargs: Additional scheduler arguments
35
36
Returns:
37
Tuple of computed results in same order as input collections
38
"""
39
40
def persist(*collections, scheduler=None, get=None, optimize_graph=True,
41
pool=None, chunksize=None, **kwargs):
42
"""
43
Persist collections in memory for repeated use.
44
45
Parameters:
46
- *collections: Dask collections to persist
47
- scheduler: Scheduler to use for persistence
48
- get: Custom scheduler function
49
- optimize_graph: Whether to optimize before persisting
50
- pool: Thread/process pool for computation
51
- chunksize: Chunk size for multiprocessing scheduler
52
- **kwargs: Additional scheduler arguments
53
54
Returns:
55
Tuple of persisted collections maintaining lazy interface
56
"""
57
```
58
59
### Graph Optimization
60
61
Optimize task graphs for better performance through fusion, caching, and other transformations.
62
63
```python { .api }
64
def optimize(*collections, **kwargs):
65
"""
66
Optimize task graphs of collections.
67
68
Parameters:
69
- *collections: Collections to optimize
70
- **kwargs: Optimization parameters
71
72
Returns:
73
Optimized collections with the same interface
74
"""
75
```
76
77
### Visualization
78
79
Visualize task graphs and their dependencies for debugging and understanding computation structure.
80
81
```python { .api }
82
def visualize(*collections, filename=None, format=None, optimize_graph=False,
83
color='order', **kwargs):
84
"""
85
Visualize task graphs of collections.
86
87
Parameters:
88
- *collections: Collections to visualize
89
- filename: Output file path (None shows in notebook/browser)
90
- format: Output format ('png', 'pdf', 'svg', etc.)
91
- optimize_graph: Whether to optimize graph before visualization
92
- color: Node coloring scheme ('order', 'tasks', etc.)
93
- **kwargs: Additional graphviz parameters
94
95
Returns:
96
Graphviz object or None if filename specified
97
"""
98
```
99
100
### Collection Utilities
101
102
Utility functions for working with Dask collections and determining their properties.
103
104
```python { .api }
105
def is_dask_collection(obj):
106
"""
107
Check if object is a Dask collection.
108
109
Parameters:
110
- obj: Object to check
111
112
Returns:
113
bool: True if object is a Dask collection
114
"""
115
116
def get_annotations():
117
"""
118
Get current task graph annotations.
119
120
Returns:
121
dict: Current annotation dictionary
122
"""
123
124
def annotate(**annotations):
125
"""
126
Context manager for adding annotations to task graphs.
127
128
Parameters:
129
- **annotations: Key-value pairs to add as annotations
130
131
Returns:
132
Context manager for annotation scope
133
"""
134
```
135
136
### Delayed Computation
137
138
Create delayed objects for building custom task graphs with lazy evaluation.
139
140
```python { .api }
141
def delayed(func=None, *, pure=None, nout=None, name=None, traverse=True):
142
"""
143
Decorator to create delayed functions or wrap objects.
144
145
Parameters:
146
- func: Function to make delayed (None for decorator use)
147
- pure: Whether function is pure (no side effects)
148
- nout: Number of outputs for functions returning multiple values
149
- name: Custom name for task in graph
150
- traverse: Whether to traverse and delay nested collections
151
152
Returns:
153
Delayed function or object
154
"""
155
```
156
157
### Task Graph Access
158
159
Access and manipulate the underlying task graphs of collections.
160
161
```python { .api }
162
def get_sync(*collections):
163
"""
164
Synchronous scheduler for immediate local execution.
165
166
Parameters:
167
- *collections: Collections to compute
168
169
Returns:
170
Computed results using single-threaded execution
171
"""
172
173
def istask(obj):
174
"""
175
Check if object is a Dask task.
176
177
Parameters:
178
- obj: Object to check
179
180
Returns:
181
bool: True if object is a task tuple
182
"""
183
```
184
185
### Tokenization
186
187
Generate unique tokens for objects to enable caching and task identification.
188
189
```python { .api }
190
def tokenize(*args, **kwargs):
191
"""
192
Generate deterministic token for objects.
193
194
Parameters:
195
- *args: Objects to tokenize
196
- **kwargs: Additional objects to include in token
197
198
Returns:
199
str: Unique token string
200
"""
201
202
def normalize_token(obj):
203
"""
204
Normalize an object to a token representation.
205
206
Parameters:
207
- obj: Object to normalize
208
209
Returns:
210
Normalized token representation
211
"""
212
```
213
214
## Usage Examples
215
216
### Basic Computation
217
218
```python
219
import dask.array as da
220
import dask
221
222
# Create computation
223
x = da.random.random((10000, 10000), chunks=(1000, 1000))
224
y = (x + x.T).sum()
225
226
# Compute result
227
result = dask.compute(y)[0]
228
229
# Or compute directly on collection
230
result = y.compute()
231
```
232
233
### Multiple Collections
234
235
```python
236
import dask.array as da
237
import dask.dataframe as dd
238
239
# Create multiple computations
240
arr = da.random.random((5000, 5000), chunks=(1000, 1000))
241
df = dd.read_csv('data.csv')
242
243
result_arr = arr.mean()
244
result_df = df.value.sum()
245
246
# Compute both together (more efficient)
247
arr_mean, df_sum = dask.compute(result_arr, result_df)
248
```
249
250
### Persistence for Reuse
251
252
```python
253
import dask.array as da
254
255
# Create expensive computation
256
x = da.random.random((10000, 10000), chunks=(1000, 1000))
257
y = x.rechunk((500, 500)) # Expensive rechunking
258
259
# Persist for reuse
260
y_persisted = dask.persist(y)[0]
261
262
# Use multiple times without recomputation
263
result1 = y_persisted.sum().compute()
264
result2 = y_persisted.mean().compute()
265
```
266
267
### Custom Delayed Functions
268
269
```python
270
from dask.delayed import delayed
271
import pandas as pd
272
273
@delayed
274
def load_data(filename):
275
return pd.read_csv(filename)
276
277
@delayed
278
def process_data(df):
279
return df.groupby('category').value.mean()
280
281
@delayed
282
def combine_results(*results):
283
return pd.concat(results)
284
285
# Build computation graph
286
files = ['file1.csv', 'file2.csv', 'file3.csv']
287
loaded = [load_data(f) for f in files]
288
processed = [process_data(df) for df in loaded]
289
final = combine_results(*processed)
290
291
# Execute
292
result = final.compute()
293
```
294
295
### Visualization and Debugging
296
297
```python
298
import dask.array as da
299
300
# Create computation
301
x = da.random.random((1000, 1000), chunks=(100, 100))
302
y = (x + x.T).sum(axis=0)
303
304
# Visualize task graph
305
dask.visualize(y, filename='computation.png')
306
307
# With annotations for debugging
308
with dask.annotate(priority='high', resources={'memory': '4GB'}):
309
z = y * 2
310
311
# Visualize annotated graph
312
dask.visualize(z, filename='annotated.png')
313
```