0
# Groupby and Aggregation Operations
1
2
Optimized groupby operations leveraging GPU parallelism for high-performance aggregations with support for custom aggregation functions and advanced grouping patterns.
3
4
## Capabilities
5
6
### Legacy Groupby Aggregation
7
8
High-performance groupby aggregation function available in legacy mode with GPU-optimized implementations for common aggregation operations.
9
10
```python { .api }
11
def groupby_agg(df, by, agg_dict, split_out=None, split_every=None, **kwargs):
12
"""
13
Perform optimized groupby aggregation with GPU acceleration.
14
15
Available only when DASK_DATAFRAME__QUERY_PLANNING=False (legacy mode).
16
Provides significant performance improvements for supported aggregations.
17
18
Parameters:
19
- df: DataFrame - Input DataFrame to group
20
- by: str, list, or callable - Grouping key(s)
21
- agg_dict: dict or str - Aggregation specification
22
- str: Single aggregation function name
23
- dict: {column: aggregation} mapping
24
- split_out: int, optional - Number of output partitions
25
- split_every: int, optional - Tree reduction branching factor
26
- **kwargs: Additional arguments for groupby operation
27
28
Returns:
29
DataFrame - Aggregated results
30
31
Supported Optimized Aggregations:
32
- 'count': Count non-null values
33
- 'mean': Arithmetic mean
34
- 'std': Standard deviation
35
- 'var': Variance
36
- 'sum': Sum of values
37
- 'min': Minimum value
38
- 'max': Maximum value
39
- 'first': First non-null value
40
- 'last': Last non-null value
41
- list: Collect values into lists
42
43
Notes:
44
- Deprecated in expression mode - use standard DataFrame.groupby()
45
- GPU-optimized implementations provide significant speedup
46
- Supports custom aggregation functions with fallback to CPU
47
"""
48
```
49
50
### Optimized Aggregations Constant
51
52
Available optimized aggregation operations that leverage GPU acceleration for maximum performance.
53
54
```python { .api }
55
OPTIMIZED_AGGS = (
56
'count',
57
'mean',
58
'std',
59
'var',
60
'sum',
61
'min',
62
'max',
63
list,
64
'first',
65
'last'
66
)
67
"""
68
Tuple of aggregation functions optimized for GPU execution.
69
70
These aggregations are specially optimized in dask-cudf's legacy groupby
71
implementation to leverage cuDF's high-performance GPU operations.
72
"""
73
```
74
75
### Expression-Based Groupby (Modern API)
76
77
Modern groupby operations using the expression-based API with automatic optimization and query planning.
78
79
```python { .api }
80
class DataFrame:
81
def groupby(self, by, **kwargs):
82
"""
83
Group DataFrame using expression-based API.
84
85
Returns a DataFrameGroupBy object that supports the same aggregation
86
methods as pandas/Dask but with GPU acceleration.
87
88
Parameters:
89
- by: str, list, or callable - Grouping specification
90
- **kwargs: Additional groupby arguments
91
92
Returns:
93
DataFrameGroupBy - Groupby object for aggregations
94
95
Examples:
96
df.groupby('category').sum()
97
df.groupby(['region', 'category']).agg({'value': 'mean', 'count': 'sum'})
98
"""
99
100
class Series:
101
def groupby(self, by, **kwargs):
102
"""
103
Group Series using expression-based API.
104
105
Parameters:
106
- by: str, list, or callable - Grouping specification
107
- **kwargs: Additional groupby arguments
108
109
Returns:
110
SeriesGroupBy - Groupby object for aggregations
111
"""
112
```
113
114
## Usage Examples
115
116
### Legacy Groupby Aggregation
117
118
```python
119
import dask_cudf
120
import cudf
121
import os
122
123
# Ensure legacy mode is enabled
124
os.environ['DASK_DATAFRAME__QUERY_PLANNING'] = 'False'
125
126
# Create sample data
127
df = cudf.DataFrame({
128
'category': ['A', 'B', 'A', 'B', 'A', 'B'] * 1000,
129
'value': range(6000),
130
'amount': [1.1, 2.2, 3.3, 4.4, 5.5, 6.6] * 1000
131
})
132
133
ddf = dask_cudf.from_cudf(df, npartitions=4)
134
135
# Single aggregation
136
result1 = dask_cudf.groupby_agg(ddf, 'category', 'sum')
137
138
# Multiple aggregations
139
result2 = dask_cudf.groupby_agg(
140
ddf,
141
'category',
142
{'value': 'mean', 'amount': ['sum', 'count']}
143
)
144
145
print(result1.compute())
146
print(result2.compute())
147
```
148
149
### Modern Expression-Based Groupby
150
151
```python
152
import dask
153
import dask_cudf
154
import cudf
155
156
# Ensure expression mode (default in newer versions)
157
# os.environ['DASK_DATAFRAME__QUERY_PLANNING'] = 'True'
158
159
# Create sample data
160
df = cudf.DataFrame({
161
'region': ['North', 'South', 'East', 'West'] * 2500,
162
'category': ['X', 'Y'] * 5000,
163
'sales': range(10000),
164
'profit': [x * 0.1 for x in range(10000)]
165
})
166
167
ddf = dask_cudf.from_cudf(df, npartitions=8)
168
169
# Standard groupby operations
170
by_region = ddf.groupby('region')['sales'].sum()
171
by_category = ddf.groupby('category').agg({
172
'sales': 'mean',
173
'profit': ['sum', 'std']
174
})
175
176
# Multi-level groupby
177
multi_group = ddf.groupby(['region', 'category']).agg({
178
'sales': 'sum',
179
'profit': 'mean'
180
})
181
182
# Compute results
183
print("By Region:")
184
print(by_region.compute())
185
186
print("\nBy Category:")
187
print(by_category.compute())
188
189
print("\nMulti-level:")
190
print(multi_group.compute())
191
```
192
193
### Performance Comparison Example
194
195
```python
196
import time
197
import cudf
198
import dask_cudf
199
import pandas as pd
200
import dask.dataframe as dd
201
202
# Create large dataset
203
n_rows = 1_000_000
204
df_cudf = cudf.DataFrame({
205
'group': ['A', 'B', 'C', 'D'] * (n_rows // 4),
206
'value1': range(n_rows),
207
'value2': [x * 0.5 for x in range(n_rows)]
208
})
209
210
df_pandas = df_cudf.to_pandas()
211
212
# Dask-cuDF version
213
ddf_cudf = dask_cudf.from_cudf(df_cudf, npartitions=10)
214
215
# Dask-pandas version
216
ddf_pandas = dd.from_pandas(df_pandas, npartitions=10)
217
218
# Time GPU-accelerated groupby
219
start_time = time.time()
220
result_gpu = ddf_cudf.groupby('group').agg({
221
'value1': 'sum',
222
'value2': 'mean'
223
}).compute()
224
gpu_time = time.time() - start_time
225
226
# Time CPU groupby
227
start_time = time.time()
228
result_cpu = ddf_pandas.groupby('group').agg({
229
'value1': 'sum',
230
'value2': 'mean'
231
}).compute()
232
cpu_time = time.time() - start_time
233
234
print(f"GPU time: {gpu_time:.2f}s")
235
print(f"CPU time: {cpu_time:.2f}s")
236
print(f"Speedup: {cpu_time/gpu_time:.1f}x")
237
```
238
239
### Custom Aggregations
240
241
```python
242
# Custom aggregation function
243
def custom_agg(series):
244
"""Custom aggregation: ratio of max to min"""
245
if len(series) == 0:
246
return 0
247
return series.max() / series.min() if series.min() != 0 else float('inf')
248
249
# Apply custom aggregation
250
result = ddf.groupby('category')['value'].agg(custom_agg)
251
print(result.compute())
252
253
# Multiple custom aggregations
254
result = ddf.groupby('region').agg({
255
'sales': [custom_agg, 'sum'],
256
'profit': ['mean', lambda x: x.std() / x.mean()] # Coefficient of variation
257
})
258
print(result.compute())
259
```