0
# DataFrames
1
2
Pandas-compatible distributed DataFrames for larger-than-memory datasets. Dask DataFrames partition data across multiple pandas DataFrames, enabling familiar pandas operations on datasets that don't fit in memory.
3
4
## Capabilities
5
6
### DataFrame Creation
7
8
Create Dask DataFrames from various sources including files, pandas objects, and other collections.
9
10
```python { .api }
11
def from_pandas(df, npartitions=None, chunksize=None, sort=True, name=None):
12
"""
13
Create Dask DataFrame from pandas DataFrame.
14
15
Parameters:
16
- df: pandas DataFrame or Series
17
- npartitions: Number of partitions to create
18
- chunksize: Approximate size of each partition
19
- sort: Whether to sort by index
20
- name: Custom name for collection
21
22
Returns:
23
dask.dataframe.DataFrame or Series: Dask collection
24
"""
25
26
def from_array(x, columns=None, index=None, meta=None):
27
"""
28
Create DataFrame from dask array.
29
30
Parameters:
31
- x: Dask array (must be 2D)
32
- columns: Column names
33
- index: Index for DataFrame
34
- meta: Metadata DataFrame
35
36
Returns:
37
dask.dataframe.DataFrame: Dask DataFrame
38
"""
39
40
def from_dict(data, npartitions, orient='columns', dtype=None, columns=None):
41
"""
42
Create DataFrame from dictionary of sequences.
43
44
Parameters:
45
- data: Dictionary of array-like values
46
- npartitions: Number of partitions
47
- orient: Data orientation ('columns' or 'index')
48
- dtype: Data type
49
- columns: Column names
50
51
Returns:
52
dask.dataframe.DataFrame: Dask DataFrame
53
"""
54
55
def from_delayed(dfs, meta=None, divisions=None, prefix='from-delayed',
56
verify_meta=True):
57
"""
58
Create DataFrame from delayed objects.
59
60
Parameters:
61
- dfs: List of delayed pandas DataFrame objects
62
- meta: Metadata DataFrame for type inference
63
- divisions: Index divisions between partitions
64
- prefix: Name prefix for task keys
65
- verify_meta: Check metadata consistency
66
67
Returns:
68
dask.dataframe.DataFrame: Dask DataFrame
69
"""
70
```
71
72
### File I/O Operations
73
74
Read and write DataFrames from various file formats.
75
76
```python { .api }
77
def read_csv(urlpath, blocksize=None, collection=True, **kwargs):
78
"""
79
Read CSV files into Dask DataFrame.
80
81
Parameters:
82
- urlpath: File path or pattern (supports wildcards)
83
- blocksize: Size of each partition in bytes
84
- collection: Return DataFrame (True) or delayed objects (False)
85
- **kwargs: Additional pandas.read_csv arguments
86
87
Returns:
88
dask.dataframe.DataFrame: Dask DataFrame
89
"""
90
91
def read_parquet(path, columns=None, filters=None, categories=None,
92
index=None, storage_options=None, **kwargs):
93
"""
94
Read Parquet files into Dask DataFrame.
95
96
Parameters:
97
- path: File path or directory
98
- columns: Columns to read
99
- filters: Row filters to apply
100
- categories: Columns to treat as categorical
101
- index: Column to use as index
102
- storage_options: Storage backend options
103
- **kwargs: Additional arguments
104
105
Returns:
106
dask.dataframe.DataFrame: Dask DataFrame
107
"""
108
109
def read_json(urlpath, orient='records', lines=None, **kwargs):
110
"""Read JSON files into Dask DataFrame."""
111
112
def read_hdf(pattern, key, start=None, stop=None, **kwargs):
113
"""Read HDF5 files into Dask DataFrame."""
114
115
def read_sql_table(table, uri, index_col, divisions=None,
116
npartitions=None, **kwargs):
117
"""Read SQL table into Dask DataFrame."""
118
119
def read_sql_query(sql, uri, index_col, divisions=None,
120
npartitions=None, **kwargs):
121
"""Read SQL query results into Dask DataFrame."""
122
123
def to_csv(df, filename, **kwargs):
124
"""Write DataFrame to CSV files."""
125
126
def to_parquet(df, path, **kwargs):
127
"""Write DataFrame to Parquet format."""
128
129
def to_json(df, filename, **kwargs):
130
"""Write DataFrame to JSON files."""
131
132
def to_hdf(df, path, key, **kwargs):
133
"""Write DataFrame to HDF5 format."""
134
135
def to_sql(df, name, uri, **kwargs):
136
"""Write DataFrame to SQL database."""
137
```
138
139
### Core DataFrame Class
140
141
Main DataFrame class with pandas-compatible interface.
142
143
```python { .api }
144
class DataFrame:
145
"""
146
Distributed pandas-like DataFrame.
147
148
Properties:
149
- columns: Index - Column names
150
- dtypes: Series - Data types of columns
151
- index: Index - Row index
152
- shape: tuple - Approximate shape (nrows, ncols)
153
- ndim: int - Number of dimensions (always 2)
154
- npartitions: int - Number of partitions
155
- divisions: tuple - Index divisions between partitions
156
"""
157
158
def compute(self, scheduler=None, **kwargs):
159
"""
160
Compute DataFrame and return pandas result.
161
162
Returns:
163
pandas.DataFrame: Computed DataFrame
164
"""
165
166
def persist(self, scheduler=None, **kwargs):
167
"""
168
Persist DataFrame in memory for reuse.
169
170
Returns:
171
dask.dataframe.DataFrame: Persisted DataFrame
172
"""
173
174
def head(self, n=5, npartitions=1, compute=True):
175
"""
176
Return first n rows.
177
178
Parameters:
179
- n: Number of rows to return
180
- npartitions: Number of partitions to search
181
- compute: Whether to compute result
182
183
Returns:
184
pandas.DataFrame or dask.dataframe.DataFrame: First n rows
185
"""
186
187
def tail(self, n=5, compute=True):
188
"""Return last n rows."""
189
190
def __getitem__(self, key):
191
"""Column selection and fancy indexing."""
192
193
def __setitem__(self, key, value):
194
"""Column assignment."""
195
196
def loc(self):
197
"""Label-based indexing."""
198
199
def iloc(self):
200
"""Integer position-based indexing."""
201
```
202
203
### Series Class
204
205
Distributed Series for single-column operations.
206
207
```python { .api }
208
class Series:
209
"""
210
Distributed pandas-like Series.
211
212
Properties:
213
- dtype: numpy.dtype - Data type
214
- index: Index - Row index
215
- name: str - Series name
216
- shape: tuple - Approximate shape (nrows,)
217
- ndim: int - Number of dimensions (always 1)
218
- npartitions: int - Number of partitions
219
"""
220
221
def compute(self, scheduler=None, **kwargs):
222
"""
223
Compute Series and return pandas result.
224
225
Returns:
226
pandas.Series: Computed Series
227
"""
228
229
def persist(self, scheduler=None, **kwargs):
230
"""
231
Persist Series in memory.
232
233
Returns:
234
dask.dataframe.Series: Persisted Series
235
"""
236
237
def head(self, n=5, npartitions=1, compute=True):
238
"""Return first n values."""
239
240
def tail(self, n=5, compute=True):
241
"""Return last n values."""
242
243
def value_counts(self, normalize=False, sort=True, ascending=False,
244
split_every=None):
245
"""Count unique values."""
246
247
def unique(self, split_every=None):
248
"""Return unique values."""
249
```
250
251
### DataFrame Operations
252
253
Data manipulation and transformation functions.
254
255
```python { .api }
256
def concat(dfs, axis=0, join='outer', ignore_index=False,
257
interleave_partitions=None):
258
"""
259
Concatenate DataFrames along axis.
260
261
Parameters:
262
- dfs: List of DataFrames to concatenate
263
- axis: Axis to concatenate along (0=rows, 1=columns)
264
- join: How to handle non-matching columns
265
- ignore_index: Reset index in result
266
- interleave_partitions: Interleave partitions for axis=0
267
268
Returns:
269
dask.dataframe.DataFrame: Concatenated DataFrame
270
"""
271
272
def merge(left, right, how='inner', on=None, left_on=None, right_on=None,
273
left_index=False, right_index=False, suffixes=('_x', '_y'),
274
npartitions=None, shuffle=None):
275
"""
276
Merge DataFrames with database-style joins.
277
278
Parameters:
279
- left, right: DataFrames to merge
280
- how: Type of join ('inner', 'outer', 'left', 'right')
281
- on: Column names to join on
282
- left_on, right_on: Columns to join on for each DataFrame
283
- left_index, right_index: Use index as join key
284
- suffixes: Suffixes for overlapping column names
285
- npartitions: Number of output partitions
286
- shuffle: Shuffling method
287
288
Returns:
289
dask.dataframe.DataFrame: Merged DataFrame
290
"""
291
292
def merge_asof(left, right, on=None, left_on=None, right_on=None,
293
left_index=False, right_index=False, by=None,
294
left_by=None, right_by=None, suffixes=('_x', '_y'),
295
tolerance=None, allow_exact_matches=True, direction='backward'):
296
"""Perform asof merge (temporal/ordered merge)."""
297
298
def pivot_table(df, index=None, columns=None, values=None, aggfunc='mean'):
299
"""Create pivot table."""
300
301
def melt(df, id_vars=None, value_vars=None, var_name=None, value_name='value'):
302
"""Unpivot DataFrame from wide to long format."""
303
304
def get_dummies(data, prefix=None, prefix_sep='_', dummy_na=False,
305
columns=None, sparse=False, drop_first=False, dtype=None):
306
"""Convert categorical variables to dummy/indicator variables."""
307
```
308
309
### Groupby Operations
310
311
Group-based operations and aggregations.
312
313
```python { .api }
314
class DataFrameGroupBy:
315
"""DataFrame groupby operations."""
316
317
def aggregate(self, func, **kwargs):
318
"""Apply aggregation functions."""
319
320
def apply(self, func, **kwargs):
321
"""Apply function to each group."""
322
323
def size(self):
324
"""Size of each group."""
325
326
def count(self):
327
"""Count non-null values in each group."""
328
329
def mean(self, **kwargs):
330
"""Mean of each group."""
331
332
def sum(self, **kwargs):
333
"""Sum of each group."""
334
335
def min(self, **kwargs):
336
"""Minimum of each group."""
337
338
def max(self, **kwargs):
339
"""Maximum of each group."""
340
341
def std(self, **kwargs):
342
"""Standard deviation of each group."""
343
344
def var(self, **kwargs):
345
"""Variance of each group."""
346
347
class SeriesGroupBy:
348
"""Series groupby operations."""
349
350
def aggregate(self, func, **kwargs):
351
"""Apply aggregation functions."""
352
353
def apply(self, func, **kwargs):
354
"""Apply function to each group."""
355
356
# Same methods as DataFrameGroupBy
357
```
358
359
### Data Processing
360
361
Data cleaning, transformation, and processing functions.
362
363
```python { .api }
364
def map_partitions(func, *args, meta=None, **kwargs):
365
"""
366
Apply function to each partition.
367
368
Parameters:
369
- func: Function to apply to each partition
370
- *args: DataFrames and other arguments
371
- meta: Metadata for result inference
372
- **kwargs: Additional arguments to func
373
374
Returns:
375
dask.dataframe result: Result of applying func
376
"""
377
378
def repartition(df, divisions=None, npartitions=None, partition_size=None,
379
freq=None, force=False):
380
"""
381
Change DataFrame partitioning.
382
383
Parameters:
384
- df: DataFrame to repartition
385
- divisions: New index divisions
386
- npartitions: Target number of partitions
387
- partition_size: Target partition size
388
- freq: Frequency for time-based partitioning
389
- force: Force repartitioning even if expensive
390
391
Returns:
392
dask.dataframe.DataFrame: Repartitioned DataFrame
393
"""
394
395
def to_datetime(arg, **kwargs):
396
"""Convert argument to datetime."""
397
398
def to_numeric(arg, errors='raise', **kwargs):
399
"""Convert argument to numeric type."""
400
401
def to_timedelta(arg, unit=None, **kwargs):
402
"""Convert argument to timedelta."""
403
404
def isna(df):
405
"""Detect missing values."""
406
```
407
408
### Index Operations
409
410
Index and division management for DataFrames.
411
412
```python { .api }
413
class Index:
414
"""Distributed index for DataFrames and Series."""
415
416
def compute(self, scheduler=None, **kwargs):
417
"""Compute index values."""
418
419
def persist(self, scheduler=None, **kwargs):
420
"""Persist index in memory."""
421
422
def set_index(df, other, divisions=None, sorted=None, npartitions=None,
423
shuffle=None, compute=None):
424
"""Set DataFrame index."""
425
426
def reset_index(df, drop=False):
427
"""Reset DataFrame index."""
428
```
429
430
### Aggregation Specifications
431
432
Custom aggregation definitions for groupby operations.
433
434
```python { .api }
435
class Aggregation:
436
"""
437
Specification for custom aggregations.
438
439
Parameters:
440
- name: str - Name of aggregation
441
- chunk: callable - Function for chunk-level aggregation
442
- agg: callable - Function for combining chunks
443
- finalize: callable - Function for final result
444
"""
445
446
def __init__(self, name, chunk, agg, finalize=None):
447
"""Initialize aggregation specification."""
448
```
449
450
## Usage Examples
451
452
### Basic DataFrame Operations
453
454
```python
455
import dask.dataframe as dd
456
import pandas as pd
457
458
# Read large CSV file
459
df = dd.read_csv('large_dataset.csv')
460
461
# Basic operations
462
filtered = df[df.value > 100]
463
grouped = df.groupby('category').value.mean()
464
sorted_df = df.sort_values('timestamp')
465
466
# Compute results
467
result = grouped.compute()
468
```
469
470
### Multi-file Processing
471
472
```python
473
import dask.dataframe as dd
474
475
# Read multiple files with pattern
476
df = dd.read_csv('data/year=*/month=*/day=*.csv')
477
478
# Process with familiar pandas operations
479
daily_stats = (df.groupby(['date', 'category'])
480
.agg({'value': ['mean', 'sum', 'count']})
481
.compute())
482
```
483
484
### Custom Aggregations
485
486
```python
487
import dask.dataframe as dd
488
from dask.dataframe import Aggregation
489
import pandas as pd
490
491
# Define custom aggregation
492
def chunk_median(x):
493
return x.quantile([0.25, 0.5, 0.75])
494
495
def combine_quantiles(x):
496
return pd.concat(x).groupby(level=0).median()
497
498
median_agg = Aggregation(
499
name='median',
500
chunk=chunk_median,
501
agg=combine_quantiles
502
)
503
504
# Use custom aggregation
505
df = dd.read_csv('data.csv')
506
result = df.groupby('category').value.agg(median_agg).compute()
507
```
508
509
### Memory-Efficient Processing
510
511
```python
512
import dask.dataframe as dd
513
514
# Read and process without loading full dataset
515
df = dd.read_parquet('large_dataset.parquet')
516
517
# Chain operations efficiently
518
result = (df.query('value > 0')
519
.groupby('category')
520
.value.sum()
521
.sort_values(ascending=False)
522
.head(10)
523
.compute())
524
525
# Persist intermediate results for reuse
526
df_clean = df.dropna().persist()
527
stat1 = df_clean.value.mean().compute()
528
stat2 = df_clean.value.std().compute()
529
```
530
531
### Time Series Operations
532
533
```python
534
import dask.dataframe as dd
535
536
# Read time series data
537
df = dd.read_csv('timeseries.csv', parse_dates=['timestamp'])
538
df = df.set_index('timestamp').repartition(freq='1D')
539
540
# Time-based operations
541
daily_avg = df.resample('D').mean()
542
rolling_mean = df.value.rolling('7D').mean()
543
544
# Compute results
545
results = dd.compute(daily_avg, rolling_mean)
546
```