or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

arrays.mdbags.mdconfiguration.mdcore-functions.mddataframes.mddelayed.mddiagnostics.mdindex.md

configuration.mddocs/

0

# Configuration

1

2

System for configuring Dask behavior, schedulers, and optimization settings. The configuration system allows fine-tuning of performance, resource usage, and execution strategies across all Dask operations.

3

4

## Capabilities

5

6

### Configuration Management

7

8

Core functions for getting, setting, and managing configuration values.

9

10

```python { .api }

11

def get(key, default=None):

12

"""

13

Get configuration value.

14

15

Parameters:

16

- key: Configuration key (dot-separated path)

17

- default: Default value if key not found

18

19

Returns:

20

Configuration value or default

21

"""

22

23

def set(config=None, **kwargs):

24

"""

25

Set configuration values temporarily.

26

27

Parameters:

28

- config: Dictionary of configuration values

29

- **kwargs: Key-value pairs to set

30

31

Returns:

32

Context manager for temporary configuration

33

"""

34

35

def update(config=None, **kwargs):

36

"""

37

Update configuration permanently.

38

39

Parameters:

40

- config: Dictionary of configuration values

41

- **kwargs: Key-value pairs to update

42

43

Returns:

44

None

45

"""

46

47

def clear():

48

"""

49

Clear all configuration values.

50

51

Returns:

52

None

53

"""

54

55

def collect(paths=None):

56

"""

57

Collect configuration from files and environment.

58

59

Parameters:

60

- paths: List of paths to search for config files

61

62

Returns:

63

dict: Collected configuration

64

"""

65

66

def refresh():

67

"""

68

Refresh configuration from all sources.

69

70

Returns:

71

None

72

"""

73

```

74

75

### Configuration Context

76

77

Context managers for temporary configuration changes.

78

79

```python { .api }

80

def config_context(**kwargs):

81

"""

82

Context manager for temporary configuration.

83

84

Parameters:

85

- **kwargs: Configuration key-value pairs

86

87

Returns:

88

Context manager

89

"""

90

91

# Global configuration dictionary

92

config: dict

93

```

94

95

### Scheduler Configuration

96

97

Configure task execution schedulers and their parameters.

98

99

```python { .api }

100

# Scheduler selection

101

# dask.config.set(scheduler='threads') # Threaded scheduler

102

# dask.config.set(scheduler='processes') # Process-based scheduler

103

# dask.config.set(scheduler='single-threaded') # Single-threaded

104

# dask.config.set(scheduler='distributed') # Distributed scheduler

105

106

# Thread scheduler settings

107

# dask.config.set({'num_workers': 4}) # Number of worker threads

108

# dask.config.set({'pool': custom_pool}) # Custom thread pool

109

110

# Process scheduler settings

111

# dask.config.set({'num_workers': 2}) # Number of worker processes

112

# dask.config.set({'chunksize': 1}) # Tasks per process call

113

114

# Memory and resource limits

115

# dask.config.set({'temporary_directory': '/tmp/dask'})

116

# dask.config.set({'local_directory': '/tmp/dask-worker'})

117

```

118

119

### Array Configuration

120

121

Configure array operations, chunking, and optimization.

122

123

```python { .api }

124

# Array chunk size defaults

125

# dask.config.set({'array.chunk-size': '128MB'})

126

# dask.config.set({'array.chunk-size': (1000, 1000)})

127

128

# Optimization settings

129

# dask.config.set({'array.optimize_graph': True})

130

# dask.config.set({'array.slicing.split_large_chunks': True})

131

132

# Rechunking behavior

133

# dask.config.set({'array.rechunk.method': 'tasks'})

134

# dask.config.set({'array.rechunk-threshold': 4})

135

136

# Query planning (expression-based optimization)

137

# dask.config.set({'array.query-planning': True})

138

```

139

140

### DataFrame Configuration

141

142

Configure DataFrame operations, I/O, and query planning.

143

144

```python { .api }

145

# Query planning system

146

# dask.config.set({'dataframe.query-planning': True})

147

148

# I/O settings

149

# dask.config.set({'dataframe.parquet.minimum-partition-size': '100MB'})

150

# dask.config.set({'dataframe.csv.chunk_size': '50MB'})

151

152

# Index and partitioning

153

# dask.config.set({'dataframe.shuffle.method': 'tasks'})

154

# dask.config.set({'dataframe.shuffle.compression': 'lz4'})

155

156

# Backend configuration

157

# dask.config.set({'dataframe.backend': 'pandas'})

158

# dask.config.set({'dataframe.convert-string': True})

159

```

160

161

### Optimization Configuration

162

163

Configure graph optimization strategies and performance tuning.

164

165

```python { .api }

166

# Graph optimization

167

# dask.config.set({'optimization.fuse': {}}) # Enable fusion

168

# dask.config.set({'optimization.inline': {}}) # Enable inlining

169

# dask.config.set({'optimization.inline_functions': True})

170

171

# Caching configuration

172

# dask.config.set({'cache': 'memory'}) # Memory cache

173

# dask.config.set({'cache': 'disk'}) # Disk cache

174

# dask.config.set({'cache.disk.directory': '/cache'}) # Cache directory

175

176

# Tokenization (affects caching)

177

# dask.config.set({'tokenize.function': 'sha1'}) # Hash function

178

```

179

180

### Distributed Computing Configuration

181

182

Configure distributed scheduler connection and behavior.

183

184

```python { .api }

185

# Distributed scheduler

186

# dask.config.set({'distributed.scheduler-address': 'tcp://scheduler:8786'})

187

# dask.config.set({'distributed.dashboard.link': 'http://scheduler:8787'})

188

189

# Worker configuration

190

# dask.config.set({'distributed.worker.memory.target': 0.6})

191

# dask.config.set({'distributed.worker.memory.spill': 0.7})

192

# dask.config.set({'distributed.worker.memory.pause': 0.8})

193

# dask.config.set({'distributed.worker.memory.terminate': 0.95})

194

195

# Network and communication

196

# dask.config.set({'distributed.comm.compression': 'lz4'})

197

# dask.config.set({'distributed.comm.timeouts.connect': '10s'})

198

```

199

200

### Diagnostics Configuration

201

202

Configure profiling, logging, and diagnostic output.

203

204

```python { .api }

205

# Progress reporting

206

# dask.config.set({'diagnostics.progress.enabled': True})

207

# dask.config.set({'diagnostics.progress.minimum': 1.0}) # Minimum time

208

209

# Profiling

210

# dask.config.set({'diagnostics.profile.enabled': True})

211

# dask.config.set({'diagnostics.profile.interval': '10ms'})

212

213

# Logging configuration

214

# dask.config.set({'logging.distributed': 'INFO'})

215

# dask.config.set({'logging.distributed.worker': 'WARNING'})

216

```

217

218

## Usage Examples

219

220

### Basic Configuration

221

222

```python

223

import dask

224

import dask.config

225

226

# Get current configuration

227

current_scheduler = dask.config.get('scheduler')

228

print(f"Current scheduler: {current_scheduler}")

229

230

# Set configuration permanently

231

dask.config.set(scheduler='threads')

232

dask.config.set(num_workers=4)

233

234

# Set multiple values

235

dask.config.set({

236

'scheduler': 'processes',

237

'num_workers': 2,

238

'temporary_directory': '/tmp/dask'

239

})

240

```

241

242

### Temporary Configuration

243

244

```python

245

import dask

246

import dask.array as da

247

248

# Create computation

249

x = da.random.random((10000, 10000), chunks=(1000, 1000))

250

251

# Compute with temporary configuration

252

with dask.config.set(scheduler='processes', num_workers=8):

253

result1 = x.sum().compute()

254

255

# Configuration automatically reverts

256

with dask.config.set(scheduler='single-threaded'):

257

result2 = x.mean().compute()

258

259

# Using context manager syntax

260

with dask.config.set({'array.chunk-size': '64MB'}):

261

y = da.random.random((5000, 5000)) # Uses new chunk size

262

```

263

264

### Performance Tuning

265

266

```python

267

import dask

268

import dask.array as da

269

270

# Optimize for memory-constrained environment

271

dask.config.set({

272

'array.chunk-size': '32MB', # Smaller chunks

273

'num_workers': 2, # Fewer workers

274

'scheduler': 'threads' # Shared memory

275

})

276

277

# Optimize for CPU-intensive tasks

278

dask.config.set({

279

'scheduler': 'processes', # Avoid GIL

280

'num_workers': 8, # More processes

281

'optimization.fuse': {} # Enable fusion

282

})

283

284

# Large dataset configuration

285

dask.config.set({

286

'array.chunk-size': '256MB', # Larger chunks

287

'temporary_directory': '/fast-ssd/tmp',

288

'distributed.worker.memory.target': 0.7

289

})

290

```

291

292

### File and Environment Configuration

293

294

```python

295

import dask.config

296

import os

297

298

# Load from YAML file

299

# Create ~/.config/dask/dask.yaml:

300

"""

301

scheduler: processes

302

num_workers: 4

303

array:

304

chunk-size: "128MB"

305

optimize_graph: true

306

dataframe:

307

query-planning: true

308

"""

309

310

# Refresh configuration from files

311

dask.config.refresh()

312

313

# Environment variable configuration

314

os.environ['DASK_SCHEDULER'] = 'threads'

315

os.environ['DASK_NUM_WORKERS'] = '6'

316

317

# Collect configuration from environment

318

config_from_env = dask.config.collect()

319

```

320

321

### Distributed Computing Setup

322

323

```python

324

import dask

325

from dask.distributed import Client

326

327

# Configure for distributed computing

328

dask.config.set({

329

'distributed.scheduler-address': 'tcp://10.0.0.100:8786',

330

'distributed.dashboard.link': 'http://10.0.0.100:8787/status',

331

'distributed.worker.memory.target': 0.6,

332

'distributed.worker.memory.spill': 0.7,

333

'distributed.comm.compression': 'lz4'

334

})

335

336

# Connect to cluster

337

client = Client() # Uses configured address

338

339

# Verify configuration

340

print(f"Dashboard: {client.dashboard_link}")

341

```

342

343

### Advanced Optimization

344

345

```python

346

import dask

347

import dask.array as da

348

349

# Fine-tune optimization strategies

350

optimization_config = {

351

'optimization.fuse': {},

352

'optimization.inline': {},

353

'optimization.inline_functions': True,

354

'array.optimize_graph': True,

355

'array.rechunk-threshold': 4,

356

'array.slicing.split_large_chunks': True

357

}

358

359

with dask.config.set(optimization_config):

360

# Complex computation with optimization

361

x = da.random.random((50000, 50000), chunks=(5000, 5000))

362

y = da.random.random((50000, 50000), chunks=(5000, 5000))

363

364

# Chain operations benefit from optimization

365

result = ((x + y).T @ (x - y)).sum(axis=0).compute()

366

```

367

368

### Configuration Inspection

369

370

```python

371

import dask.config

372

import pprint

373

374

# View all current configuration

375

current_config = dict(dask.config.config)

376

pprint.pprint(current_config)

377

378

# View specific sections

379

array_config = {k: v for k, v in current_config.items()

380

if k.startswith('array')}

381

print("Array configuration:")

382

pprint.pprint(array_config)

383

384

# Check configuration sources

385

config_paths = dask.config.paths

386

print(f"Configuration paths: {config_paths}")

387

388

# Validate configuration

389

try:

390

dask.config.set(scheduler='invalid_scheduler')

391

except ValueError as e:

392

print(f"Invalid configuration: {e}")

393

```

394

395

### Dynamic Configuration

396

397

```python

398

import dask

399

import dask.array as da

400

401

def adaptive_scheduler_config(data_size_gb):

402

"""Choose optimal configuration based on data size."""

403

if data_size_gb < 1:

404

return {

405

'scheduler': 'single-threaded',

406

'array.chunk-size': '32MB'

407

}

408

elif data_size_gb < 10:

409

return {

410

'scheduler': 'threads',

411

'num_workers': 4,

412

'array.chunk-size': '64MB'

413

}

414

else:

415

return {

416

'scheduler': 'processes',

417

'num_workers': 8,

418

'array.chunk-size': '128MB'

419

}

420

421

# Apply configuration based on workload

422

data_size = 5.0 # GB

423

config = adaptive_scheduler_config(data_size)

424

425

with dask.config.set(config):

426

# Process data with optimal configuration

427

x = da.random.random((25000, 25000), chunks='auto')

428

result = x.mean(axis=0).compute()

429

```