or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

arrays.mdbags.mdconfiguration.mdcore-functions.mddataframes.mddelayed.mddiagnostics.mdindex.md

delayed.mddocs/

0

# Delayed

1

2

Build custom task graphs with lazy evaluation for any Python function. Delayed enables flexible parallel workflows by wrapping functions and values to create task graphs that execute only when explicitly computed.

3

4

## Capabilities

5

6

### Delayed Function Creation

7

8

Create delayed versions of functions and wrap values for lazy evaluation.

9

10

```python { .api }

11

def delayed(func=None, *, pure=None, nout=None, name=None, traverse=True):

12

"""

13

Decorator to create delayed functions or wrap objects.

14

15

Parameters:

16

- func: Function to make delayed (None for decorator use)

17

- pure: Whether function is pure (no side effects, enables caching)

18

- nout: Number of outputs for functions returning multiple values

19

- name: Custom name for tasks in graph

20

- traverse: Whether to traverse and delay nested collections

21

22

Returns:

23

Delayed function or object

24

"""

25

```

26

27

### Core Delayed Class

28

29

The Delayed class represents lazy computations in task graphs.

30

31

```python { .api }

32

class Delayed:

33

"""

34

Lazy evaluation wrapper for building task graphs.

35

36

Properties:

37

- key: str - Unique identifier for this computation

38

- dask: dict - Task graph dictionary

39

"""

40

41

def compute(self, scheduler=None, get=None, **kwargs):

42

"""

43

Compute delayed object and return result.

44

45

Parameters:

46

- scheduler: Scheduler to use ('threads', 'processes', etc.)

47

- get: Custom scheduler function

48

- **kwargs: Additional scheduler arguments

49

50

Returns:

51

Computed result of the delayed operation

52

"""

53

54

def persist(self, scheduler=None, get=None, **kwargs):

55

"""

56

Persist delayed object in memory for reuse.

57

58

Parameters:

59

- scheduler: Scheduler for persistence

60

- get: Custom scheduler function

61

- **kwargs: Additional scheduler arguments

62

63

Returns:

64

dask.delayed.Delayed: Persisted delayed object

65

"""

66

67

def visualize(self, filename=None, format=None, optimize_graph=False,

68

**kwargs):

69

"""

70

Visualize the task graph for this delayed object.

71

72

Parameters:

73

- filename: Output file path

74

- format: Output format ('png', 'svg', 'pdf', etc.)

75

- optimize_graph: Whether to optimize before visualization

76

- **kwargs: Additional graphviz parameters

77

78

Returns:

79

Graphviz object or None if filename specified

80

"""

81

82

def __call__(self, *args, **kwargs):

83

"""Call delayed object as function with arguments."""

84

85

def __getattr__(self, attr):

86

"""Access attributes of delayed object."""

87

88

def __getitem__(self, key):

89

"""Index into delayed object."""

90

91

def __setitem__(self, key, value):

92

"""Set item in delayed object."""

93

94

def __iter__(self):

95

"""Iterate over delayed object."""

96

97

def __len__(self):

98

"""Get length of delayed object."""

99

100

def __bool__(self):

101

"""Boolean evaluation of delayed object."""

102

```

103

104

### Delayed Operations

105

106

Mathematical and logical operations on delayed objects.

107

108

```python { .api }

109

# Arithmetic operations create new delayed objects

110

# delayed_obj + other -> Delayed

111

# delayed_obj - other -> Delayed

112

# delayed_obj * other -> Delayed

113

# delayed_obj / other -> Delayed

114

# delayed_obj // other -> Delayed

115

# delayed_obj % other -> Delayed

116

# delayed_obj ** other -> Delayed

117

118

# Comparison operations

119

# delayed_obj == other -> Delayed

120

# delayed_obj != other -> Delayed

121

# delayed_obj < other -> Delayed

122

# delayed_obj <= other -> Delayed

123

# delayed_obj > other -> Delayed

124

# delayed_obj >= other -> Delayed

125

126

# Logical operations

127

# delayed_obj & other -> Delayed

128

# delayed_obj | other -> Delayed

129

# delayed_obj ^ other -> Delayed

130

# ~delayed_obj -> Delayed

131

132

# Unary operations

133

# -delayed_obj -> Delayed

134

# +delayed_obj -> Delayed

135

# abs(delayed_obj) -> Delayed

136

```

137

138

### Task Graph Construction

139

140

Build complex task graphs by composing delayed operations.

141

142

```python { .api }

143

@delayed

144

def custom_function(arg1, arg2, **kwargs):

145

"""

146

Custom delayed function example.

147

148

Any function can be made delayed using the @delayed decorator.

149

The function will not execute until .compute() is called.

150

"""

151

# Function implementation

152

return result

153

154

# Delayed wrapper for existing functions

155

delayed_func = delayed(existing_function)

156

157

# Delayed values

158

delayed_value = delayed(some_value)

159

160

# Function composition creates task graphs

161

@delayed

162

def step1(data):

163

return process_data(data)

164

165

@delayed

166

def step2(processed_data):

167

return analyze_data(processed_data)

168

169

@delayed

170

def step3(analysis_result):

171

return generate_report(analysis_result)

172

173

# Build computation pipeline

174

data = delayed(load_data())

175

processed = step1(data)

176

analyzed = step2(processed)

177

report = step3(analyzed)

178

179

# Execute entire pipeline

180

final_result = report.compute()

181

```

182

183

### Multiple Return Values

184

185

Handle functions that return multiple values.

186

187

```python { .api }

188

@delayed(nout=2)

189

def function_with_multiple_returns(data):

190

"""

191

Function returning multiple values.

192

193

The nout parameter specifies number of return values,

194

enabling proper unpacking of delayed results.

195

"""

196

result1 = process_part1(data)

197

result2 = process_part2(data)

198

return result1, result2

199

200

# Unpack multiple returns

201

data = delayed(load_data())

202

part1, part2 = function_with_multiple_returns(data)

203

204

# Use each part independently

205

analysis1 = delayed(analyze)(part1)

206

analysis2 = delayed(analyze)(part2)

207

208

# Combine results

209

final = delayed(combine)(analysis1, analysis2)

210

result = final.compute()

211

```

212

213

### Pure Functions and Caching

214

215

Enable caching for pure functions without side effects.

216

217

```python { .api }

218

@delayed(pure=True)

219

def expensive_pure_function(data):

220

"""

221

Pure function with no side effects.

222

223

Setting pure=True enables caching of results,

224

improving performance for repeated calls with same arguments.

225

"""

226

# Expensive computation

227

return expensive_result

228

229

# Results will be cached automatically

230

data = delayed(load_data())

231

result1 = expensive_pure_function(data)

232

result2 = expensive_pure_function(data) # Uses cached result

233

234

combined = delayed(combine)(result1, result2)

235

final = combined.compute()

236

```

237

238

### Integration with Collections

239

240

Convert between delayed objects and other Dask collections.

241

242

```python { .api }

243

# Convert collections to delayed

244

import dask.array as da

245

import dask.dataframe as dd

246

247

array = da.random.random((1000, 1000), chunks=(100, 100))

248

delayed_array = array.to_delayed()

249

250

dataframe = dd.read_csv('data.csv')

251

delayed_dataframe = dataframe.to_delayed()

252

253

# Convert delayed to collections

254

delayed_values = [delayed(load_partition)(i) for i in range(10)]

255

bag = db.from_delayed(delayed_values)

256

257

delayed_dfs = [delayed(load_dataframe)(f) for f in files]

258

combined_df = dd.from_delayed(delayed_dfs)

259

```

260

261

### Error Handling

262

263

Handle errors in delayed computations.

264

265

```python { .api }

266

@delayed

267

def risky_function(data):

268

"""Function that might raise exceptions."""

269

if data is None:

270

raise ValueError("Data cannot be None")

271

return process_data(data)

272

273

@delayed

274

def safe_wrapper(data):

275

"""Wrapper with error handling."""

276

try:

277

return risky_function(data)

278

except ValueError as e:

279

return f"Error: {e}"

280

281

# Errors are raised during compute()

282

data = delayed(None)

283

result = safe_wrapper(data)

284

output = result.compute() # Returns error message

285

```

286

287

## Usage Examples

288

289

### Basic Delayed Workflow

290

291

```python

292

from dask.delayed import delayed

293

import pandas as pd

294

295

@delayed

296

def load_data(filename):

297

return pd.read_csv(filename)

298

299

@delayed

300

def clean_data(df):

301

return df.dropna().reset_index(drop=True)

302

303

@delayed

304

def analyze_data(df):

305

return df.describe()

306

307

@delayed

308

def save_results(analysis, filename):

309

analysis.to_csv(filename)

310

return f"Saved to {filename}"

311

312

# Build computation graph

313

filename = 'data.csv'

314

raw_data = load_data(filename)

315

clean_data_result = clean_data(raw_data)

316

analysis = analyze_data(clean_data_result)

317

save_status = save_results(analysis, 'results.csv')

318

319

# Execute computation

320

result = save_status.compute()

321

print(result)

322

```

323

324

### Parallel Processing Pipeline

325

326

```python

327

from dask.delayed import delayed

328

import pandas as pd

329

330

@delayed

331

def process_file(filename):

332

"""Process single file."""

333

df = pd.read_csv(filename)

334

# Complex processing logic

335

return df.groupby('category').value.sum()

336

337

@delayed

338

def combine_results(results):

339

"""Combine results from multiple files."""

340

return pd.concat(results, axis=1).fillna(0)

341

342

# Process multiple files in parallel

343

files = ['file1.csv', 'file2.csv', 'file3.csv', 'file4.csv']

344

processed = [process_file(f) for f in files]

345

346

# Combine all results

347

combined = combine_results(processed)

348

final_result = combined.compute()

349

```

350

351

### Dynamic Task Graphs

352

353

```python

354

from dask.delayed import delayed

355

356

@delayed

357

def generate_data(seed, size):

358

"""Generate synthetic data."""

359

import numpy as np

360

np.random.seed(seed)

361

return np.random.random(size)

362

363

@delayed

364

def process_batch(data_batch, method='mean'):

365

"""Process a batch of data."""

366

if method == 'mean':

367

return data_batch.mean()

368

elif method == 'sum':

369

return data_batch.sum()

370

else:

371

return data_batch.std()

372

373

@delayed

374

def aggregate_results(results):

375

"""Aggregate all batch results."""

376

return sum(results) / len(results)

377

378

# Dynamic graph based on parameters

379

n_batches = 5

380

batch_size = 1000

381

method = 'mean'

382

383

# Generate data batches

384

data_batches = [generate_data(i, batch_size) for i in range(n_batches)]

385

386

# Process each batch

387

batch_results = [process_batch(batch, method) for batch in data_batches]

388

389

# Aggregate final result

390

final_result = aggregate_results(batch_results)

391

answer = final_result.compute()

392

```

393

394

### Custom Class with Delayed Methods

395

396

```python

397

from dask.delayed import delayed

398

399

class DelayedProcessor:

400

"""Class with delayed methods for data processing."""

401

402

def __init__(self, params):

403

self.params = delayed(params)

404

405

@delayed

406

def load_data(self, source):

407

"""Load data from source."""

408

# Implementation

409

return loaded_data

410

411

@delayed

412

def preprocess(self, data):

413

"""Preprocess the data."""

414

# Use self.params for configuration

415

return preprocessed_data

416

417

@delayed

418

def train_model(self, data):

419

"""Train model on data."""

420

return trained_model

421

422

@delayed

423

def evaluate(self, model, test_data):

424

"""Evaluate model performance."""

425

return evaluation_metrics

426

427

# Use delayed class methods

428

processor = DelayedProcessor({'param1': 10, 'param2': 'setting'})

429

430

# Build computation pipeline

431

raw_data = processor.load_data('data_source')

432

clean_data = processor.preprocess(raw_data)

433

model = processor.train_model(clean_data)

434

435

test_data = processor.load_data('test_source')

436

clean_test = processor.preprocess(test_data)

437

metrics = processor.evaluate(model, clean_test)

438

439

# Execute entire pipeline

440

final_metrics = metrics.compute()

441

```

442

443

### Integration with Other Collections

444

445

```python

446

import dask.array as da

447

import dask.dataframe as dd

448

from dask.delayed import delayed

449

450

@delayed

451

def custom_analysis(array_data, df_data):

452

"""Custom analysis combining array and dataframe."""

453

array_stats = {

454

'mean': array_data.mean(),

455

'std': array_data.std()

456

}

457

458

df_stats = {

459

'row_count': len(df_data),

460

'col_count': len(df_data.columns)

461

}

462

463

return {'array': array_stats, 'dataframe': df_stats}

464

465

# Create collections

466

array = da.random.random((10000, 100), chunks=(1000, 100))

467

dataframe = dd.read_csv('large_file.csv')

468

469

# Convert to delayed for custom processing

470

delayed_array = array.compute_chunk_sizes().to_delayed().flatten()[0]

471

delayed_df = dataframe.to_delayed()[0]

472

473

# Custom analysis

474

analysis = custom_analysis(delayed_array, delayed_df)

475

result = analysis.compute()

476

```