0
# Dask
1
2
Dask is a flexible parallel computing library for analytics that enables performance at scale for the PyData ecosystem. It provides parallel implementations of popular data science tools and allows scaling from single machines to distributed clusters with minimal code changes.
3
4
## Package Information
5
6
- **Package Name**: dask
7
- **Language**: Python
8
- **Installation**: `pip install dask`
9
- **Optional Components**: `pip install "dask[complete]"` for all features
10
11
## Core Imports
12
13
```python
14
import dask
15
```
16
17
Common imports for specific functionality:
18
19
```python
20
import dask.array as da
21
import dask.dataframe as dd
22
import dask.bag as db
23
from dask.delayed import delayed
24
from dask import compute, persist, visualize
25
```
26
27
## Basic Usage
28
29
```python
30
import dask.array as da
31
import dask.dataframe as dd
32
import numpy as np
33
import pandas as pd
34
35
# Create a large array and perform parallel operations
36
x = da.random.random((10000, 10000), chunks=(1000, 1000))
37
result = (x + x.T).mean(axis=0)
38
39
# Work with larger-than-memory DataFrames
40
df = dd.read_csv('large-dataset-*.csv')
41
grouped = df.groupby('category').value.mean()
42
43
# Compute results lazily
44
result_array, result_df = dask.compute(result, grouped)
45
46
# Create delayed computations
47
@delayed
48
def load_data(filename):
49
return pd.read_csv(filename)
50
51
@delayed
52
def process_data(df):
53
return df.groupby('key').value.sum()
54
55
# Build computation graph
56
files = ['file1.csv', 'file2.csv', 'file3.csv']
57
loaded = [load_data(f) for f in files]
58
processed = [process_data(df) for df in loaded]
59
final_result = delayed(pd.concat)(processed)
60
61
# Execute computation
62
result = final_result.compute()
63
```
64
65
## Architecture
66
67
Dask operates on **task graphs** - directed acyclic graphs representing computations. The key components are:
68
69
- **Collections**: High-level user interfaces (Array, DataFrame, Bag, Delayed)
70
- **Schedulers**: Execute task graphs on various hardware (threads, processes, clusters)
71
- **Graph Optimization**: Automatically optimize task graphs for performance
72
- **Chunking**: Break large datasets into manageable pieces
73
74
This design enables users to work with familiar APIs while Dask handles parallelization, memory management, and distributed computing behind the scenes.
75
76
## Capabilities
77
78
### Core Computation Functions
79
80
Essential functions for executing, optimizing, and managing Dask computations across all collection types.
81
82
```python { .api }
83
def compute(*collections, scheduler=None, **kwargs):
84
"""Compute multiple dask collections synchronously."""
85
86
def persist(*collections, scheduler=None, **kwargs):
87
"""Persist collections in memory for repeated use."""
88
89
def optimize(*collections, **kwargs):
90
"""Optimize task graphs before computation."""
91
92
def visualize(*collections, filename=None, **kwargs):
93
"""Visualize task graphs and dependencies."""
94
95
def delayed(func=None, *, pure=None, nout=None, **kwargs):
96
"""Create delayed objects for building task graphs."""
97
98
def is_dask_collection(obj):
99
"""Check if object is a Dask collection."""
100
101
def annotate(**annotations):
102
"""Context manager for adding annotations to task graphs."""
103
104
def get_annotations():
105
"""Get current task graph annotations."""
106
```
107
108
[Core Functions](./core-functions.md)
109
110
### Distributed Arrays
111
112
NumPy-compatible arrays for parallel and out-of-core computation with automatic chunking and distributed processing.
113
114
```python { .api }
115
class Array:
116
"""N-dimensional distributed array with NumPy interface."""
117
def compute(self, scheduler=None, **kwargs): ...
118
def persist(self, scheduler=None, **kwargs): ...
119
def rechunk(self, chunks=None, **kwargs): ...
120
121
def from_array(x, chunks=None, name=None, **kwargs): ...
122
def array(object, dtype=None, chunks=None, **kwargs): ...
123
def arange(start, stop=None, step=None, chunks=None, **kwargs): ...
124
def linspace(start, stop, num=50, chunks=None, **kwargs): ...
125
```
126
127
[Arrays](./arrays.md)
128
129
### Distributed DataFrames
130
131
Pandas-compatible DataFrames for larger-than-memory datasets with parallel processing and familiar DataFrame operations.
132
133
```python { .api }
134
class DataFrame:
135
"""Distributed pandas-like DataFrame."""
136
def compute(self, scheduler=None, **kwargs): ...
137
def persist(self, scheduler=None, **kwargs): ...
138
def head(self, n=5, npartitions=1, compute=True): ...
139
140
class Series:
141
"""Distributed pandas-like Series."""
142
def compute(self, scheduler=None, **kwargs): ...
143
def persist(self, scheduler=None, **kwargs): ...
144
145
def read_csv(path, **kwargs): ...
146
def read_parquet(path, **kwargs): ...
147
def from_pandas(data, npartitions=None, chunksize=None, **kwargs): ...
148
```
149
150
[DataFrames](./dataframes.md)
151
152
### Bag Collections
153
154
Distributed list-like collections for processing semi-structured and unstructured data with functional programming patterns.
155
156
```python { .api }
157
class Bag:
158
"""Distributed list-like collection."""
159
def compute(self, scheduler=None, **kwargs): ...
160
def persist(self, scheduler=None, **kwargs): ...
161
def map(self, func): ...
162
def filter(self, predicate): ...
163
164
def from_sequence(seq, partition_size=None, **kwargs): ...
165
def read_text(path, encoding='utf-8', **kwargs): ...
166
def from_url(urls, **kwargs): ...
167
```
168
169
[Bags](./bags.md)
170
171
### Delayed Computations
172
173
Build custom task graphs with lazy evaluation for any Python function, enabling flexible parallel workflows.
174
175
```python { .api }
176
class Delayed:
177
"""Lazy evaluation wrapper for building task graphs."""
178
def compute(self, scheduler=None, **kwargs): ...
179
def persist(self, scheduler=None, **kwargs): ...
180
def visualize(self, filename=None, **kwargs): ...
181
182
def delayed(func=None, *, pure=None, nout=None, **kwargs):
183
"""Decorator to create delayed functions."""
184
```
185
186
[Delayed](./delayed.md)
187
188
### Configuration Management
189
190
System for configuring Dask behavior, schedulers, and optimization settings across all computations.
191
192
```python { .api }
193
def get(key, default=None): ...
194
def set(config=None, **kwargs): ...
195
def update(config, **kwargs): ...
196
def collect(paths=None): ...
197
```
198
199
[Configuration](./configuration.md)
200
201
### Diagnostics and Profiling
202
203
Tools for monitoring performance, resource usage, and debugging distributed computations.
204
205
```python { .api }
206
class ProgressBar:
207
"""Display computation progress."""
208
def __enter__(self): ...
209
def __exit__(self, *args): ...
210
211
class Profiler:
212
"""Profile task execution times."""
213
def __enter__(self): ...
214
def __exit__(self, *args): ...
215
216
class ResourceProfiler:
217
"""Monitor system resource usage."""
218
def __enter__(self): ...
219
def __exit__(self, *args): ...
220
```
221
222
[Diagnostics](./diagnostics.md)
223
224
## Types
225
226
```python { .api }
227
from typing import Any, Dict, List, Optional, Union, Callable
228
from collections.abc import Sequence, Mapping
229
230
# Core types
231
DaskCollection = Union['Array', 'DataFrame', 'Series', 'Bag', 'Delayed']
232
Scheduler = Union[str, Callable]
233
Graph = Dict[str, Any]
234
Key = Union[str, tuple]
235
236
# Chunk specifications
237
Chunks = Union[int, str, Sequence[int], Dict[int, int]]
238
239
# Configuration types
240
Config = Dict[str, Any]
241
ConfigValue = Union[str, int, float, bool, None, List, Dict]
242
```