Parallel PyData with task scheduling for distributed analytics and computing.
—
System for configuring Dask behavior, schedulers, and optimization settings. The configuration system allows fine-tuning of performance, resource usage, and execution strategies across all Dask operations.
Core functions for getting, setting, and managing configuration values.
def get(key, default=None):
"""
Get configuration value.
Parameters:
- key: Configuration key (dot-separated path)
- default: Default value if key not found
Returns:
Configuration value or default
"""
def set(config=None, **kwargs):
"""
Set configuration values temporarily.
Parameters:
- config: Dictionary of configuration values
- **kwargs: Key-value pairs to set
Returns:
Context manager for temporary configuration
"""
def update(config=None, **kwargs):
"""
Update configuration permanently.
Parameters:
- config: Dictionary of configuration values
- **kwargs: Key-value pairs to update
Returns:
None
"""
def clear():
"""
Clear all configuration values.
Returns:
None
"""
def collect(paths=None):
"""
Collect configuration from files and environment.
Parameters:
- paths: List of paths to search for config files
Returns:
dict: Collected configuration
"""
def refresh():
"""
Refresh configuration from all sources.
Returns:
None
"""Context managers for temporary configuration changes.
def config_context(**kwargs):
"""
Context manager for temporary configuration.
Parameters:
- **kwargs: Configuration key-value pairs
Returns:
Context manager
"""
# Global configuration dictionary
config: dictConfigure task execution schedulers and their parameters.
# Scheduler selection
# dask.config.set(scheduler='threads') # Threaded scheduler
# dask.config.set(scheduler='processes') # Process-based scheduler
# dask.config.set(scheduler='single-threaded') # Single-threaded
# dask.config.set(scheduler='distributed') # Distributed scheduler
# Thread scheduler settings
# dask.config.set({'num_workers': 4}) # Number of worker threads
# dask.config.set({'pool': custom_pool}) # Custom thread pool
# Process scheduler settings
# dask.config.set({'num_workers': 2}) # Number of worker processes
# dask.config.set({'chunksize': 1}) # Tasks per process call
# Memory and resource limits
# dask.config.set({'temporary_directory': '/tmp/dask'})
# dask.config.set({'local_directory': '/tmp/dask-worker'})Configure array operations, chunking, and optimization.
# Array chunk size defaults
# dask.config.set({'array.chunk-size': '128MB'})
# dask.config.set({'array.chunk-size': (1000, 1000)})
# Optimization settings
# dask.config.set({'array.optimize_graph': True})
# dask.config.set({'array.slicing.split_large_chunks': True})
# Rechunking behavior
# dask.config.set({'array.rechunk.method': 'tasks'})
# dask.config.set({'array.rechunk-threshold': 4})
# Query planning (expression-based optimization)
# dask.config.set({'array.query-planning': True})Configure DataFrame operations, I/O, and query planning.
# Query planning system
# dask.config.set({'dataframe.query-planning': True})
# I/O settings
# dask.config.set({'dataframe.parquet.minimum-partition-size': '100MB'})
# dask.config.set({'dataframe.csv.chunk_size': '50MB'})
# Index and partitioning
# dask.config.set({'dataframe.shuffle.method': 'tasks'})
# dask.config.set({'dataframe.shuffle.compression': 'lz4'})
# Backend configuration
# dask.config.set({'dataframe.backend': 'pandas'})
# dask.config.set({'dataframe.convert-string': True})Configure graph optimization strategies and performance tuning.
# Graph optimization
# dask.config.set({'optimization.fuse': {}}) # Enable fusion
# dask.config.set({'optimization.inline': {}}) # Enable inlining
# dask.config.set({'optimization.inline_functions': True})
# Caching configuration
# dask.config.set({'cache': 'memory'}) # Memory cache
# dask.config.set({'cache': 'disk'}) # Disk cache
# dask.config.set({'cache.disk.directory': '/cache'}) # Cache directory
# Tokenization (affects caching)
# dask.config.set({'tokenize.function': 'sha1'}) # Hash functionConfigure distributed scheduler connection and behavior.
# Distributed scheduler
# dask.config.set({'distributed.scheduler-address': 'tcp://scheduler:8786'})
# dask.config.set({'distributed.dashboard.link': 'http://scheduler:8787'})
# Worker configuration
# dask.config.set({'distributed.worker.memory.target': 0.6})
# dask.config.set({'distributed.worker.memory.spill': 0.7})
# dask.config.set({'distributed.worker.memory.pause': 0.8})
# dask.config.set({'distributed.worker.memory.terminate': 0.95})
# Network and communication
# dask.config.set({'distributed.comm.compression': 'lz4'})
# dask.config.set({'distributed.comm.timeouts.connect': '10s'})Configure profiling, logging, and diagnostic output.
# Progress reporting
# dask.config.set({'diagnostics.progress.enabled': True})
# dask.config.set({'diagnostics.progress.minimum': 1.0}) # Minimum time
# Profiling
# dask.config.set({'diagnostics.profile.enabled': True})
# dask.config.set({'diagnostics.profile.interval': '10ms'})
# Logging configuration
# dask.config.set({'logging.distributed': 'INFO'})
# dask.config.set({'logging.distributed.worker': 'WARNING'})import dask
import dask.config
# Get current configuration
current_scheduler = dask.config.get('scheduler')
print(f"Current scheduler: {current_scheduler}")
# Set configuration permanently
dask.config.set(scheduler='threads')
dask.config.set(num_workers=4)
# Set multiple values
dask.config.set({
'scheduler': 'processes',
'num_workers': 2,
'temporary_directory': '/tmp/dask'
})import dask
import dask.array as da
# Create computation
x = da.random.random((10000, 10000), chunks=(1000, 1000))
# Compute with temporary configuration
with dask.config.set(scheduler='processes', num_workers=8):
result1 = x.sum().compute()
# Configuration automatically reverts
with dask.config.set(scheduler='single-threaded'):
result2 = x.mean().compute()
# Using context manager syntax
with dask.config.set({'array.chunk-size': '64MB'}):
y = da.random.random((5000, 5000)) # Uses new chunk sizeimport dask
import dask.array as da
# Optimize for memory-constrained environment
dask.config.set({
'array.chunk-size': '32MB', # Smaller chunks
'num_workers': 2, # Fewer workers
'scheduler': 'threads' # Shared memory
})
# Optimize for CPU-intensive tasks
dask.config.set({
'scheduler': 'processes', # Avoid GIL
'num_workers': 8, # More processes
'optimization.fuse': {} # Enable fusion
})
# Large dataset configuration
dask.config.set({
'array.chunk-size': '256MB', # Larger chunks
'temporary_directory': '/fast-ssd/tmp',
'distributed.worker.memory.target': 0.7
})import dask.config
import os
# Load from YAML file
# Create ~/.config/dask/dask.yaml:
"""
scheduler: processes
num_workers: 4
array:
chunk-size: "128MB"
optimize_graph: true
dataframe:
query-planning: true
"""
# Refresh configuration from files
dask.config.refresh()
# Environment variable configuration
os.environ['DASK_SCHEDULER'] = 'threads'
os.environ['DASK_NUM_WORKERS'] = '6'
# Collect configuration from environment
config_from_env = dask.config.collect()import dask
from dask.distributed import Client
# Configure for distributed computing
dask.config.set({
'distributed.scheduler-address': 'tcp://10.0.0.100:8786',
'distributed.dashboard.link': 'http://10.0.0.100:8787/status',
'distributed.worker.memory.target': 0.6,
'distributed.worker.memory.spill': 0.7,
'distributed.comm.compression': 'lz4'
})
# Connect to cluster
client = Client() # Uses configured address
# Verify configuration
print(f"Dashboard: {client.dashboard_link}")import dask
import dask.array as da
# Fine-tune optimization strategies
optimization_config = {
'optimization.fuse': {},
'optimization.inline': {},
'optimization.inline_functions': True,
'array.optimize_graph': True,
'array.rechunk-threshold': 4,
'array.slicing.split_large_chunks': True
}
with dask.config.set(optimization_config):
# Complex computation with optimization
x = da.random.random((50000, 50000), chunks=(5000, 5000))
y = da.random.random((50000, 50000), chunks=(5000, 5000))
# Chain operations benefit from optimization
result = ((x + y).T @ (x - y)).sum(axis=0).compute()import dask.config
import pprint
# View all current configuration
current_config = dict(dask.config.config)
pprint.pprint(current_config)
# View specific sections
array_config = {k: v for k, v in current_config.items()
if k.startswith('array')}
print("Array configuration:")
pprint.pprint(array_config)
# Check configuration sources
config_paths = dask.config.paths
print(f"Configuration paths: {config_paths}")
# Validate configuration
try:
dask.config.set(scheduler='invalid_scheduler')
except ValueError as e:
print(f"Invalid configuration: {e}")import dask
import dask.array as da
def adaptive_scheduler_config(data_size_gb):
"""Choose optimal configuration based on data size."""
if data_size_gb < 1:
return {
'scheduler': 'single-threaded',
'array.chunk-size': '32MB'
}
elif data_size_gb < 10:
return {
'scheduler': 'threads',
'num_workers': 4,
'array.chunk-size': '64MB'
}
else:
return {
'scheduler': 'processes',
'num_workers': 8,
'array.chunk-size': '128MB'
}
# Apply configuration based on workload
data_size = 5.0 # GB
config = adaptive_scheduler_config(data_size)
with dask.config.set(config):
# Process data with optimal configuration
x = da.random.random((25000, 25000), chunks='auto')
result = x.mean(axis=0).compute()Install with Tessl CLI
npx tessl i tessl/pypi-dask