CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-dask

Parallel PyData with task scheduling for distributed analytics and computing.

Pending
Overview
Eval results
Files

configuration.mddocs/

Configuration

System for configuring Dask behavior, schedulers, and optimization settings. The configuration system allows fine-tuning of performance, resource usage, and execution strategies across all Dask operations.

Capabilities

Configuration Management

Core functions for getting, setting, and managing configuration values.

def get(key, default=None):
    """
    Get configuration value.
    
    Parameters:
    - key: Configuration key (dot-separated path)
    - default: Default value if key not found
    
    Returns:
    Configuration value or default
    """

def set(config=None, **kwargs):
    """
    Set configuration values temporarily.
    
    Parameters:
    - config: Dictionary of configuration values
    - **kwargs: Key-value pairs to set
    
    Returns:
    Context manager for temporary configuration
    """

def update(config=None, **kwargs):
    """
    Update configuration permanently.
    
    Parameters:
    - config: Dictionary of configuration values
    - **kwargs: Key-value pairs to update
    
    Returns:
    None
    """

def clear():
    """
    Clear all configuration values.
    
    Returns:
    None
    """

def collect(paths=None):
    """
    Collect configuration from files and environment.
    
    Parameters:
    - paths: List of paths to search for config files
    
    Returns:
    dict: Collected configuration
    """

def refresh():
    """
    Refresh configuration from all sources.
    
    Returns:
    None
    """

Configuration Context

Context managers for temporary configuration changes.

def config_context(**kwargs):
    """
    Context manager for temporary configuration.
    
    Parameters:
    - **kwargs: Configuration key-value pairs
    
    Returns:
    Context manager
    """

# Global configuration dictionary
config: dict

Scheduler Configuration

Configure task execution schedulers and their parameters.

# Scheduler selection
# dask.config.set(scheduler='threads')      # Threaded scheduler
# dask.config.set(scheduler='processes')    # Process-based scheduler  
# dask.config.set(scheduler='single-threaded')  # Single-threaded
# dask.config.set(scheduler='distributed')  # Distributed scheduler

# Thread scheduler settings
# dask.config.set({'num_workers': 4})           # Number of worker threads
# dask.config.set({'pool': custom_pool})        # Custom thread pool

# Process scheduler settings  
# dask.config.set({'num_workers': 2})           # Number of worker processes
# dask.config.set({'chunksize': 1})             # Tasks per process call

# Memory and resource limits
# dask.config.set({'temporary_directory': '/tmp/dask'})
# dask.config.set({'local_directory': '/tmp/dask-worker'})

Array Configuration

Configure array operations, chunking, and optimization.

# Array chunk size defaults
# dask.config.set({'array.chunk-size': '128MB'})
# dask.config.set({'array.chunk-size': (1000, 1000)})

# Optimization settings
# dask.config.set({'array.optimize_graph': True})
# dask.config.set({'array.slicing.split_large_chunks': True})

# Rechunking behavior
# dask.config.set({'array.rechunk.method': 'tasks'})
# dask.config.set({'array.rechunk-threshold': 4})

# Query planning (expression-based optimization)
# dask.config.set({'array.query-planning': True})

DataFrame Configuration

Configure DataFrame operations, I/O, and query planning.

# Query planning system
# dask.config.set({'dataframe.query-planning': True})

# I/O settings
# dask.config.set({'dataframe.parquet.minimum-partition-size': '100MB'})
# dask.config.set({'dataframe.csv.chunk_size': '50MB'})

# Index and partitioning
# dask.config.set({'dataframe.shuffle.method': 'tasks'})
# dask.config.set({'dataframe.shuffle.compression': 'lz4'})

# Backend configuration
# dask.config.set({'dataframe.backend': 'pandas'})
# dask.config.set({'dataframe.convert-string': True})

Optimization Configuration

Configure graph optimization strategies and performance tuning.

# Graph optimization
# dask.config.set({'optimization.fuse': {}})          # Enable fusion
# dask.config.set({'optimization.inline': {}})        # Enable inlining
# dask.config.set({'optimization.inline_functions': True})

# Caching configuration  
# dask.config.set({'cache': 'memory'})                # Memory cache
# dask.config.set({'cache': 'disk'})                  # Disk cache
# dask.config.set({'cache.disk.directory': '/cache'}) # Cache directory

# Tokenization (affects caching)
# dask.config.set({'tokenize.function': 'sha1'})      # Hash function

Distributed Computing Configuration

Configure distributed scheduler connection and behavior.

# Distributed scheduler
# dask.config.set({'distributed.scheduler-address': 'tcp://scheduler:8786'})
# dask.config.set({'distributed.dashboard.link': 'http://scheduler:8787'})

# Worker configuration
# dask.config.set({'distributed.worker.memory.target': 0.6})
# dask.config.set({'distributed.worker.memory.spill': 0.7})
# dask.config.set({'distributed.worker.memory.pause': 0.8})
# dask.config.set({'distributed.worker.memory.terminate': 0.95})

# Network and communication
# dask.config.set({'distributed.comm.compression': 'lz4'})
# dask.config.set({'distributed.comm.timeouts.connect': '10s'})

Diagnostics Configuration

Configure profiling, logging, and diagnostic output.

# Progress reporting
# dask.config.set({'diagnostics.progress.enabled': True})
# dask.config.set({'diagnostics.progress.minimum': 1.0})  # Minimum time

# Profiling
# dask.config.set({'diagnostics.profile.enabled': True})
# dask.config.set({'diagnostics.profile.interval': '10ms'})

# Logging configuration
# dask.config.set({'logging.distributed': 'INFO'})
# dask.config.set({'logging.distributed.worker': 'WARNING'})

Usage Examples

Basic Configuration

import dask
import dask.config

# Get current configuration
current_scheduler = dask.config.get('scheduler')
print(f"Current scheduler: {current_scheduler}")

# Set configuration permanently
dask.config.set(scheduler='threads')
dask.config.set(num_workers=4)

# Set multiple values
dask.config.set({
    'scheduler': 'processes', 
    'num_workers': 2,
    'temporary_directory': '/tmp/dask'
})

Temporary Configuration

import dask
import dask.array as da

# Create computation
x = da.random.random((10000, 10000), chunks=(1000, 1000))

# Compute with temporary configuration
with dask.config.set(scheduler='processes', num_workers=8):
    result1 = x.sum().compute()

# Configuration automatically reverts
with dask.config.set(scheduler='single-threaded'):
    result2 = x.mean().compute()

# Using context manager syntax
with dask.config.set({'array.chunk-size': '64MB'}):
    y = da.random.random((5000, 5000))  # Uses new chunk size

Performance Tuning

import dask
import dask.array as da

# Optimize for memory-constrained environment  
dask.config.set({
    'array.chunk-size': '32MB',        # Smaller chunks
    'num_workers': 2,                  # Fewer workers
    'scheduler': 'threads'             # Shared memory
})

# Optimize for CPU-intensive tasks
dask.config.set({
    'scheduler': 'processes',          # Avoid GIL
    'num_workers': 8,                  # More processes
    'optimization.fuse': {}            # Enable fusion
})

# Large dataset configuration
dask.config.set({
    'array.chunk-size': '256MB',       # Larger chunks
    'temporary_directory': '/fast-ssd/tmp',
    'distributed.worker.memory.target': 0.7
})

File and Environment Configuration

import dask.config
import os

# Load from YAML file
# Create ~/.config/dask/dask.yaml:
"""
scheduler: processes
num_workers: 4
array:
  chunk-size: "128MB"
  optimize_graph: true
dataframe:
  query-planning: true
"""

# Refresh configuration from files
dask.config.refresh()

# Environment variable configuration
os.environ['DASK_SCHEDULER'] = 'threads'
os.environ['DASK_NUM_WORKERS'] = '6'

# Collect configuration from environment
config_from_env = dask.config.collect()

Distributed Computing Setup

import dask
from dask.distributed import Client

# Configure for distributed computing
dask.config.set({
    'distributed.scheduler-address': 'tcp://10.0.0.100:8786',
    'distributed.dashboard.link': 'http://10.0.0.100:8787/status',
    'distributed.worker.memory.target': 0.6,
    'distributed.worker.memory.spill': 0.7,
    'distributed.comm.compression': 'lz4'
})

# Connect to cluster
client = Client()  # Uses configured address

# Verify configuration
print(f"Dashboard: {client.dashboard_link}")

Advanced Optimization

import dask
import dask.array as da

# Fine-tune optimization strategies
optimization_config = {
    'optimization.fuse': {},
    'optimization.inline': {},
    'optimization.inline_functions': True,
    'array.optimize_graph': True,
    'array.rechunk-threshold': 4,
    'array.slicing.split_large_chunks': True
}

with dask.config.set(optimization_config):
    # Complex computation with optimization
    x = da.random.random((50000, 50000), chunks=(5000, 5000))
    y = da.random.random((50000, 50000), chunks=(5000, 5000))
    
    # Chain operations benefit from optimization
    result = ((x + y).T @ (x - y)).sum(axis=0).compute()

Configuration Inspection

import dask.config
import pprint

# View all current configuration
current_config = dict(dask.config.config)
pprint.pprint(current_config)

# View specific sections
array_config = {k: v for k, v in current_config.items() 
                if k.startswith('array')}
print("Array configuration:")
pprint.pprint(array_config)

# Check configuration sources
config_paths = dask.config.paths
print(f"Configuration paths: {config_paths}")

# Validate configuration
try:
    dask.config.set(scheduler='invalid_scheduler')
except ValueError as e:
    print(f"Invalid configuration: {e}")

Dynamic Configuration

import dask
import dask.array as da

def adaptive_scheduler_config(data_size_gb):
    """Choose optimal configuration based on data size."""
    if data_size_gb < 1:
        return {
            'scheduler': 'single-threaded',
            'array.chunk-size': '32MB'
        }
    elif data_size_gb < 10:
        return {
            'scheduler': 'threads',
            'num_workers': 4,
            'array.chunk-size': '64MB'
        }
    else:
        return {
            'scheduler': 'processes',
            'num_workers': 8, 
            'array.chunk-size': '128MB'
        }

# Apply configuration based on workload
data_size = 5.0  # GB
config = adaptive_scheduler_config(data_size)

with dask.config.set(config):
    # Process data with optimal configuration  
    x = da.random.random((25000, 25000), chunks='auto')
    result = x.mean(axis=0).compute()

Install with Tessl CLI

npx tessl i tessl/pypi-dask

docs

arrays.md

bags.md

configuration.md

core-functions.md

dataframes.md

delayed.md

diagnostics.md

index.md

tile.json