Parallel PyData with task scheduling for distributed analytics and computing.
npx @tessl/cli install tessl/pypi-dask@2025.7.0Dask is a flexible parallel computing library for analytics that enables performance at scale for the PyData ecosystem. It provides parallel implementations of popular data science tools and allows scaling from single machines to distributed clusters with minimal code changes.
pip install daskpip install "dask[complete]" for all featuresimport daskCommon imports for specific functionality:
import dask.array as da
import dask.dataframe as dd
import dask.bag as db
from dask.delayed import delayed
from dask import compute, persist, visualizeimport dask.array as da
import dask.dataframe as dd
import numpy as np
import pandas as pd
# Create a large array and perform parallel operations
x = da.random.random((10000, 10000), chunks=(1000, 1000))
result = (x + x.T).mean(axis=0)
# Work with larger-than-memory DataFrames
df = dd.read_csv('large-dataset-*.csv')
grouped = df.groupby('category').value.mean()
# Compute results lazily
result_array, result_df = dask.compute(result, grouped)
# Create delayed computations
@delayed
def load_data(filename):
return pd.read_csv(filename)
@delayed
def process_data(df):
return df.groupby('key').value.sum()
# Build computation graph
files = ['file1.csv', 'file2.csv', 'file3.csv']
loaded = [load_data(f) for f in files]
processed = [process_data(df) for df in loaded]
final_result = delayed(pd.concat)(processed)
# Execute computation
result = final_result.compute()Dask operates on task graphs - directed acyclic graphs representing computations. The key components are:
This design enables users to work with familiar APIs while Dask handles parallelization, memory management, and distributed computing behind the scenes.
Essential functions for executing, optimizing, and managing Dask computations across all collection types.
def compute(*collections, scheduler=None, **kwargs):
"""Compute multiple dask collections synchronously."""
def persist(*collections, scheduler=None, **kwargs):
"""Persist collections in memory for repeated use."""
def optimize(*collections, **kwargs):
"""Optimize task graphs before computation."""
def visualize(*collections, filename=None, **kwargs):
"""Visualize task graphs and dependencies."""
def delayed(func=None, *, pure=None, nout=None, **kwargs):
"""Create delayed objects for building task graphs."""
def is_dask_collection(obj):
"""Check if object is a Dask collection."""
def annotate(**annotations):
"""Context manager for adding annotations to task graphs."""
def get_annotations():
"""Get current task graph annotations."""NumPy-compatible arrays for parallel and out-of-core computation with automatic chunking and distributed processing.
class Array:
"""N-dimensional distributed array with NumPy interface."""
def compute(self, scheduler=None, **kwargs): ...
def persist(self, scheduler=None, **kwargs): ...
def rechunk(self, chunks=None, **kwargs): ...
def from_array(x, chunks=None, name=None, **kwargs): ...
def array(object, dtype=None, chunks=None, **kwargs): ...
def arange(start, stop=None, step=None, chunks=None, **kwargs): ...
def linspace(start, stop, num=50, chunks=None, **kwargs): ...Pandas-compatible DataFrames for larger-than-memory datasets with parallel processing and familiar DataFrame operations.
class DataFrame:
"""Distributed pandas-like DataFrame."""
def compute(self, scheduler=None, **kwargs): ...
def persist(self, scheduler=None, **kwargs): ...
def head(self, n=5, npartitions=1, compute=True): ...
class Series:
"""Distributed pandas-like Series."""
def compute(self, scheduler=None, **kwargs): ...
def persist(self, scheduler=None, **kwargs): ...
def read_csv(path, **kwargs): ...
def read_parquet(path, **kwargs): ...
def from_pandas(data, npartitions=None, chunksize=None, **kwargs): ...Distributed list-like collections for processing semi-structured and unstructured data with functional programming patterns.
class Bag:
"""Distributed list-like collection."""
def compute(self, scheduler=None, **kwargs): ...
def persist(self, scheduler=None, **kwargs): ...
def map(self, func): ...
def filter(self, predicate): ...
def from_sequence(seq, partition_size=None, **kwargs): ...
def read_text(path, encoding='utf-8', **kwargs): ...
def from_url(urls, **kwargs): ...Build custom task graphs with lazy evaluation for any Python function, enabling flexible parallel workflows.
class Delayed:
"""Lazy evaluation wrapper for building task graphs."""
def compute(self, scheduler=None, **kwargs): ...
def persist(self, scheduler=None, **kwargs): ...
def visualize(self, filename=None, **kwargs): ...
def delayed(func=None, *, pure=None, nout=None, **kwargs):
"""Decorator to create delayed functions."""System for configuring Dask behavior, schedulers, and optimization settings across all computations.
def get(key, default=None): ...
def set(config=None, **kwargs): ...
def update(config, **kwargs): ...
def collect(paths=None): ...Tools for monitoring performance, resource usage, and debugging distributed computations.
class ProgressBar:
"""Display computation progress."""
def __enter__(self): ...
def __exit__(self, *args): ...
class Profiler:
"""Profile task execution times."""
def __enter__(self): ...
def __exit__(self, *args): ...
class ResourceProfiler:
"""Monitor system resource usage."""
def __enter__(self): ...
def __exit__(self, *args): ...from typing import Any, Dict, List, Optional, Union, Callable
from collections.abc import Sequence, Mapping
# Core types
DaskCollection = Union['Array', 'DataFrame', 'Series', 'Bag', 'Delayed']
Scheduler = Union[str, Callable]
Graph = Dict[str, Any]
Key = Union[str, tuple]
# Chunk specifications
Chunks = Union[int, str, Sequence[int], Dict[int, int]]
# Configuration types
Config = Dict[str, Any]
ConfigValue = Union[str, int, float, bool, None, List, Dict]