or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

arrays.mdbags.mdconfiguration.mdcore-functions.mddataframes.mddelayed.mddiagnostics.mdindex.md

core-functions.mddocs/

0

# Core Functions

1

2

Essential functions for executing, optimizing, and managing Dask computations across all collection types. These functions provide the fundamental operations needed to work with any Dask collection.

3

4

## Imports

5

6

```python

7

import dask

8

from dask import compute, persist, optimize, visualize, delayed

9

from dask import is_dask_collection, get_annotations, annotate

10

from dask import tokenize, normalize_token

11

from dask.base import istask

12

from dask.local import get_sync as get

13

```

14

15

## Capabilities

16

17

### Computation Execution

18

19

Execute Dask collections to get concrete results, with support for different schedulers and optimization strategies.

20

21

```python { .api }

22

def compute(*collections, scheduler=None, get=None, optimize_graph=True,

23

pool=None, chunksize=None, **kwargs):

24

"""

25

Compute multiple dask collections, returning concrete results.

26

27

Parameters:

28

- *collections: Dask collections to compute

29

- scheduler: Scheduler to use ('threads', 'processes', 'single-threaded')

30

- get: Custom scheduler function

31

- optimize_graph: Whether to optimize task graphs before execution

32

- pool: Thread/process pool for computation

33

- chunksize: Chunk size for multiprocessing scheduler

34

- **kwargs: Additional scheduler arguments

35

36

Returns:

37

Tuple of computed results in same order as input collections

38

"""

39

40

def persist(*collections, scheduler=None, get=None, optimize_graph=True,

41

pool=None, chunksize=None, **kwargs):

42

"""

43

Persist collections in memory for repeated use.

44

45

Parameters:

46

- *collections: Dask collections to persist

47

- scheduler: Scheduler to use for persistence

48

- get: Custom scheduler function

49

- optimize_graph: Whether to optimize before persisting

50

- pool: Thread/process pool for computation

51

- chunksize: Chunk size for multiprocessing scheduler

52

- **kwargs: Additional scheduler arguments

53

54

Returns:

55

Tuple of persisted collections maintaining lazy interface

56

"""

57

```

58

59

### Graph Optimization

60

61

Optimize task graphs for better performance through fusion, caching, and other transformations.

62

63

```python { .api }

64

def optimize(*collections, **kwargs):

65

"""

66

Optimize task graphs of collections.

67

68

Parameters:

69

- *collections: Collections to optimize

70

- **kwargs: Optimization parameters

71

72

Returns:

73

Optimized collections with the same interface

74

"""

75

```

76

77

### Visualization

78

79

Visualize task graphs and their dependencies for debugging and understanding computation structure.

80

81

```python { .api }

82

def visualize(*collections, filename=None, format=None, optimize_graph=False,

83

color='order', **kwargs):

84

"""

85

Visualize task graphs of collections.

86

87

Parameters:

88

- *collections: Collections to visualize

89

- filename: Output file path (None shows in notebook/browser)

90

- format: Output format ('png', 'pdf', 'svg', etc.)

91

- optimize_graph: Whether to optimize graph before visualization

92

- color: Node coloring scheme ('order', 'tasks', etc.)

93

- **kwargs: Additional graphviz parameters

94

95

Returns:

96

Graphviz object or None if filename specified

97

"""

98

```

99

100

### Collection Utilities

101

102

Utility functions for working with Dask collections and determining their properties.

103

104

```python { .api }

105

def is_dask_collection(obj):

106

"""

107

Check if object is a Dask collection.

108

109

Parameters:

110

- obj: Object to check

111

112

Returns:

113

bool: True if object is a Dask collection

114

"""

115

116

def get_annotations():

117

"""

118

Get current task graph annotations.

119

120

Returns:

121

dict: Current annotation dictionary

122

"""

123

124

def annotate(**annotations):

125

"""

126

Context manager for adding annotations to task graphs.

127

128

Parameters:

129

- **annotations: Key-value pairs to add as annotations

130

131

Returns:

132

Context manager for annotation scope

133

"""

134

```

135

136

### Delayed Computation

137

138

Create delayed objects for building custom task graphs with lazy evaluation.

139

140

```python { .api }

141

def delayed(func=None, *, pure=None, nout=None, name=None, traverse=True):

142

"""

143

Decorator to create delayed functions or wrap objects.

144

145

Parameters:

146

- func: Function to make delayed (None for decorator use)

147

- pure: Whether function is pure (no side effects)

148

- nout: Number of outputs for functions returning multiple values

149

- name: Custom name for task in graph

150

- traverse: Whether to traverse and delay nested collections

151

152

Returns:

153

Delayed function or object

154

"""

155

```

156

157

### Task Graph Access

158

159

Access and manipulate the underlying task graphs of collections.

160

161

```python { .api }

162

def get_sync(*collections):

163

"""

164

Synchronous scheduler for immediate local execution.

165

166

Parameters:

167

- *collections: Collections to compute

168

169

Returns:

170

Computed results using single-threaded execution

171

"""

172

173

def istask(obj):

174

"""

175

Check if object is a Dask task.

176

177

Parameters:

178

- obj: Object to check

179

180

Returns:

181

bool: True if object is a task tuple

182

"""

183

```

184

185

### Tokenization

186

187

Generate unique tokens for objects to enable caching and task identification.

188

189

```python { .api }

190

def tokenize(*args, **kwargs):

191

"""

192

Generate deterministic token for objects.

193

194

Parameters:

195

- *args: Objects to tokenize

196

- **kwargs: Additional objects to include in token

197

198

Returns:

199

str: Unique token string

200

"""

201

202

def normalize_token(obj):

203

"""

204

Normalize an object to a token representation.

205

206

Parameters:

207

- obj: Object to normalize

208

209

Returns:

210

Normalized token representation

211

"""

212

```

213

214

## Usage Examples

215

216

### Basic Computation

217

218

```python

219

import dask.array as da

220

import dask

221

222

# Create computation

223

x = da.random.random((10000, 10000), chunks=(1000, 1000))

224

y = (x + x.T).sum()

225

226

# Compute result

227

result = dask.compute(y)[0]

228

229

# Or compute directly on collection

230

result = y.compute()

231

```

232

233

### Multiple Collections

234

235

```python

236

import dask.array as da

237

import dask.dataframe as dd

238

239

# Create multiple computations

240

arr = da.random.random((5000, 5000), chunks=(1000, 1000))

241

df = dd.read_csv('data.csv')

242

243

result_arr = arr.mean()

244

result_df = df.value.sum()

245

246

# Compute both together (more efficient)

247

arr_mean, df_sum = dask.compute(result_arr, result_df)

248

```

249

250

### Persistence for Reuse

251

252

```python

253

import dask.array as da

254

255

# Create expensive computation

256

x = da.random.random((10000, 10000), chunks=(1000, 1000))

257

y = x.rechunk((500, 500)) # Expensive rechunking

258

259

# Persist for reuse

260

y_persisted = dask.persist(y)[0]

261

262

# Use multiple times without recomputation

263

result1 = y_persisted.sum().compute()

264

result2 = y_persisted.mean().compute()

265

```

266

267

### Custom Delayed Functions

268

269

```python

270

from dask.delayed import delayed

271

import pandas as pd

272

273

@delayed

274

def load_data(filename):

275

return pd.read_csv(filename)

276

277

@delayed

278

def process_data(df):

279

return df.groupby('category').value.mean()

280

281

@delayed

282

def combine_results(*results):

283

return pd.concat(results)

284

285

# Build computation graph

286

files = ['file1.csv', 'file2.csv', 'file3.csv']

287

loaded = [load_data(f) for f in files]

288

processed = [process_data(df) for df in loaded]

289

final = combine_results(*processed)

290

291

# Execute

292

result = final.compute()

293

```

294

295

### Visualization and Debugging

296

297

```python

298

import dask.array as da

299

300

# Create computation

301

x = da.random.random((1000, 1000), chunks=(100, 100))

302

y = (x + x.T).sum(axis=0)

303

304

# Visualize task graph

305

dask.visualize(y, filename='computation.png')

306

307

# With annotations for debugging

308

with dask.annotate(priority='high', resources={'memory': '4GB'}):

309

z = y * 2

310

311

# Visualize annotated graph

312

dask.visualize(z, filename='annotated.png')

313

```