0
# Configuration
1
2
System for configuring Dask behavior, schedulers, and optimization settings. The configuration system allows fine-tuning of performance, resource usage, and execution strategies across all Dask operations.
3
4
## Capabilities
5
6
### Configuration Management
7
8
Core functions for getting, setting, and managing configuration values.
9
10
```python { .api }
11
def get(key, default=None):
12
"""
13
Get configuration value.
14
15
Parameters:
16
- key: Configuration key (dot-separated path)
17
- default: Default value if key not found
18
19
Returns:
20
Configuration value or default
21
"""
22
23
def set(config=None, **kwargs):
24
"""
25
Set configuration values temporarily.
26
27
Parameters:
28
- config: Dictionary of configuration values
29
- **kwargs: Key-value pairs to set
30
31
Returns:
32
Context manager for temporary configuration
33
"""
34
35
def update(config=None, **kwargs):
36
"""
37
Update configuration permanently.
38
39
Parameters:
40
- config: Dictionary of configuration values
41
- **kwargs: Key-value pairs to update
42
43
Returns:
44
None
45
"""
46
47
def clear():
48
"""
49
Clear all configuration values.
50
51
Returns:
52
None
53
"""
54
55
def collect(paths=None):
56
"""
57
Collect configuration from files and environment.
58
59
Parameters:
60
- paths: List of paths to search for config files
61
62
Returns:
63
dict: Collected configuration
64
"""
65
66
def refresh():
67
"""
68
Refresh configuration from all sources.
69
70
Returns:
71
None
72
"""
73
```
74
75
### Configuration Context
76
77
Context managers for temporary configuration changes.
78
79
```python { .api }
80
def config_context(**kwargs):
81
"""
82
Context manager for temporary configuration.
83
84
Parameters:
85
- **kwargs: Configuration key-value pairs
86
87
Returns:
88
Context manager
89
"""
90
91
# Global configuration dictionary
92
config: dict
93
```
94
95
### Scheduler Configuration
96
97
Configure task execution schedulers and their parameters.
98
99
```python { .api }
100
# Scheduler selection
101
# dask.config.set(scheduler='threads') # Threaded scheduler
102
# dask.config.set(scheduler='processes') # Process-based scheduler
103
# dask.config.set(scheduler='single-threaded') # Single-threaded
104
# dask.config.set(scheduler='distributed') # Distributed scheduler
105
106
# Thread scheduler settings
107
# dask.config.set({'num_workers': 4}) # Number of worker threads
108
# dask.config.set({'pool': custom_pool}) # Custom thread pool
109
110
# Process scheduler settings
111
# dask.config.set({'num_workers': 2}) # Number of worker processes
112
# dask.config.set({'chunksize': 1}) # Tasks per process call
113
114
# Memory and resource limits
115
# dask.config.set({'temporary_directory': '/tmp/dask'})
116
# dask.config.set({'local_directory': '/tmp/dask-worker'})
117
```
118
119
### Array Configuration
120
121
Configure array operations, chunking, and optimization.
122
123
```python { .api }
124
# Array chunk size defaults
125
# dask.config.set({'array.chunk-size': '128MB'})
126
# dask.config.set({'array.chunk-size': (1000, 1000)})
127
128
# Optimization settings
129
# dask.config.set({'array.optimize_graph': True})
130
# dask.config.set({'array.slicing.split_large_chunks': True})
131
132
# Rechunking behavior
133
# dask.config.set({'array.rechunk.method': 'tasks'})
134
# dask.config.set({'array.rechunk-threshold': 4})
135
136
# Query planning (expression-based optimization)
137
# dask.config.set({'array.query-planning': True})
138
```
139
140
### DataFrame Configuration
141
142
Configure DataFrame operations, I/O, and query planning.
143
144
```python { .api }
145
# Query planning system
146
# dask.config.set({'dataframe.query-planning': True})
147
148
# I/O settings
149
# dask.config.set({'dataframe.parquet.minimum-partition-size': '100MB'})
150
# dask.config.set({'dataframe.csv.chunk_size': '50MB'})
151
152
# Index and partitioning
153
# dask.config.set({'dataframe.shuffle.method': 'tasks'})
154
# dask.config.set({'dataframe.shuffle.compression': 'lz4'})
155
156
# Backend configuration
157
# dask.config.set({'dataframe.backend': 'pandas'})
158
# dask.config.set({'dataframe.convert-string': True})
159
```
160
161
### Optimization Configuration
162
163
Configure graph optimization strategies and performance tuning.
164
165
```python { .api }
166
# Graph optimization
167
# dask.config.set({'optimization.fuse': {}}) # Enable fusion
168
# dask.config.set({'optimization.inline': {}}) # Enable inlining
169
# dask.config.set({'optimization.inline_functions': True})
170
171
# Caching configuration
172
# dask.config.set({'cache': 'memory'}) # Memory cache
173
# dask.config.set({'cache': 'disk'}) # Disk cache
174
# dask.config.set({'cache.disk.directory': '/cache'}) # Cache directory
175
176
# Tokenization (affects caching)
177
# dask.config.set({'tokenize.function': 'sha1'}) # Hash function
178
```
179
180
### Distributed Computing Configuration
181
182
Configure distributed scheduler connection and behavior.
183
184
```python { .api }
185
# Distributed scheduler
186
# dask.config.set({'distributed.scheduler-address': 'tcp://scheduler:8786'})
187
# dask.config.set({'distributed.dashboard.link': 'http://scheduler:8787'})
188
189
# Worker configuration
190
# dask.config.set({'distributed.worker.memory.target': 0.6})
191
# dask.config.set({'distributed.worker.memory.spill': 0.7})
192
# dask.config.set({'distributed.worker.memory.pause': 0.8})
193
# dask.config.set({'distributed.worker.memory.terminate': 0.95})
194
195
# Network and communication
196
# dask.config.set({'distributed.comm.compression': 'lz4'})
197
# dask.config.set({'distributed.comm.timeouts.connect': '10s'})
198
```
199
200
### Diagnostics Configuration
201
202
Configure profiling, logging, and diagnostic output.
203
204
```python { .api }
205
# Progress reporting
206
# dask.config.set({'diagnostics.progress.enabled': True})
207
# dask.config.set({'diagnostics.progress.minimum': 1.0}) # Minimum time
208
209
# Profiling
210
# dask.config.set({'diagnostics.profile.enabled': True})
211
# dask.config.set({'diagnostics.profile.interval': '10ms'})
212
213
# Logging configuration
214
# dask.config.set({'logging.distributed': 'INFO'})
215
# dask.config.set({'logging.distributed.worker': 'WARNING'})
216
```
217
218
## Usage Examples
219
220
### Basic Configuration
221
222
```python
223
import dask
224
import dask.config
225
226
# Get current configuration
227
current_scheduler = dask.config.get('scheduler')
228
print(f"Current scheduler: {current_scheduler}")
229
230
# Set configuration permanently
231
dask.config.set(scheduler='threads')
232
dask.config.set(num_workers=4)
233
234
# Set multiple values
235
dask.config.set({
236
'scheduler': 'processes',
237
'num_workers': 2,
238
'temporary_directory': '/tmp/dask'
239
})
240
```
241
242
### Temporary Configuration
243
244
```python
245
import dask
246
import dask.array as da
247
248
# Create computation
249
x = da.random.random((10000, 10000), chunks=(1000, 1000))
250
251
# Compute with temporary configuration
252
with dask.config.set(scheduler='processes', num_workers=8):
253
result1 = x.sum().compute()
254
255
# Configuration automatically reverts
256
with dask.config.set(scheduler='single-threaded'):
257
result2 = x.mean().compute()
258
259
# Using context manager syntax
260
with dask.config.set({'array.chunk-size': '64MB'}):
261
y = da.random.random((5000, 5000)) # Uses new chunk size
262
```
263
264
### Performance Tuning
265
266
```python
267
import dask
268
import dask.array as da
269
270
# Optimize for memory-constrained environment
271
dask.config.set({
272
'array.chunk-size': '32MB', # Smaller chunks
273
'num_workers': 2, # Fewer workers
274
'scheduler': 'threads' # Shared memory
275
})
276
277
# Optimize for CPU-intensive tasks
278
dask.config.set({
279
'scheduler': 'processes', # Avoid GIL
280
'num_workers': 8, # More processes
281
'optimization.fuse': {} # Enable fusion
282
})
283
284
# Large dataset configuration
285
dask.config.set({
286
'array.chunk-size': '256MB', # Larger chunks
287
'temporary_directory': '/fast-ssd/tmp',
288
'distributed.worker.memory.target': 0.7
289
})
290
```
291
292
### File and Environment Configuration
293
294
```python
295
import dask.config
296
import os
297
298
# Load from YAML file
299
# Create ~/.config/dask/dask.yaml:
300
"""
301
scheduler: processes
302
num_workers: 4
303
array:
304
chunk-size: "128MB"
305
optimize_graph: true
306
dataframe:
307
query-planning: true
308
"""
309
310
# Refresh configuration from files
311
dask.config.refresh()
312
313
# Environment variable configuration
314
os.environ['DASK_SCHEDULER'] = 'threads'
315
os.environ['DASK_NUM_WORKERS'] = '6'
316
317
# Collect configuration from environment
318
config_from_env = dask.config.collect()
319
```
320
321
### Distributed Computing Setup
322
323
```python
324
import dask
325
from dask.distributed import Client
326
327
# Configure for distributed computing
328
dask.config.set({
329
'distributed.scheduler-address': 'tcp://10.0.0.100:8786',
330
'distributed.dashboard.link': 'http://10.0.0.100:8787/status',
331
'distributed.worker.memory.target': 0.6,
332
'distributed.worker.memory.spill': 0.7,
333
'distributed.comm.compression': 'lz4'
334
})
335
336
# Connect to cluster
337
client = Client() # Uses configured address
338
339
# Verify configuration
340
print(f"Dashboard: {client.dashboard_link}")
341
```
342
343
### Advanced Optimization
344
345
```python
346
import dask
347
import dask.array as da
348
349
# Fine-tune optimization strategies
350
optimization_config = {
351
'optimization.fuse': {},
352
'optimization.inline': {},
353
'optimization.inline_functions': True,
354
'array.optimize_graph': True,
355
'array.rechunk-threshold': 4,
356
'array.slicing.split_large_chunks': True
357
}
358
359
with dask.config.set(optimization_config):
360
# Complex computation with optimization
361
x = da.random.random((50000, 50000), chunks=(5000, 5000))
362
y = da.random.random((50000, 50000), chunks=(5000, 5000))
363
364
# Chain operations benefit from optimization
365
result = ((x + y).T @ (x - y)).sum(axis=0).compute()
366
```
367
368
### Configuration Inspection
369
370
```python
371
import dask.config
372
import pprint
373
374
# View all current configuration
375
current_config = dict(dask.config.config)
376
pprint.pprint(current_config)
377
378
# View specific sections
379
array_config = {k: v for k, v in current_config.items()
380
if k.startswith('array')}
381
print("Array configuration:")
382
pprint.pprint(array_config)
383
384
# Check configuration sources
385
config_paths = dask.config.paths
386
print(f"Configuration paths: {config_paths}")
387
388
# Validate configuration
389
try:
390
dask.config.set(scheduler='invalid_scheduler')
391
except ValueError as e:
392
print(f"Invalid configuration: {e}")
393
```
394
395
### Dynamic Configuration
396
397
```python
398
import dask
399
import dask.array as da
400
401
def adaptive_scheduler_config(data_size_gb):
402
"""Choose optimal configuration based on data size."""
403
if data_size_gb < 1:
404
return {
405
'scheduler': 'single-threaded',
406
'array.chunk-size': '32MB'
407
}
408
elif data_size_gb < 10:
409
return {
410
'scheduler': 'threads',
411
'num_workers': 4,
412
'array.chunk-size': '64MB'
413
}
414
else:
415
return {
416
'scheduler': 'processes',
417
'num_workers': 8,
418
'array.chunk-size': '128MB'
419
}
420
421
# Apply configuration based on workload
422
data_size = 5.0 # GB
423
config = adaptive_scheduler_config(data_size)
424
425
with dask.config.set(config):
426
# Process data with optimal configuration
427
x = da.random.random((25000, 25000), chunks='auto')
428
result = x.mean(axis=0).compute()
429
```