Dask cuDF - A GPU Backend for Dask DataFrame providing GPU-accelerated parallel and larger-than-memory DataFrame computing
—
Optimized groupby operations leveraging GPU parallelism for high-performance aggregations with support for custom aggregation functions and advanced grouping patterns.
High-performance groupby aggregation function available in legacy mode with GPU-optimized implementations for common aggregation operations.
def groupby_agg(df, by, agg_dict, split_out=None, split_every=None, **kwargs):
"""
Perform optimized groupby aggregation with GPU acceleration.
Available only when DASK_DATAFRAME__QUERY_PLANNING=False (legacy mode).
Provides significant performance improvements for supported aggregations.
Parameters:
- df: DataFrame - Input DataFrame to group
- by: str, list, or callable - Grouping key(s)
- agg_dict: dict or str - Aggregation specification
- str: Single aggregation function name
- dict: {column: aggregation} mapping
- split_out: int, optional - Number of output partitions
- split_every: int, optional - Tree reduction branching factor
- **kwargs: Additional arguments for groupby operation
Returns:
DataFrame - Aggregated results
Supported Optimized Aggregations:
- 'count': Count non-null values
- 'mean': Arithmetic mean
- 'std': Standard deviation
- 'var': Variance
- 'sum': Sum of values
- 'min': Minimum value
- 'max': Maximum value
- 'first': First non-null value
- 'last': Last non-null value
- list: Collect values into lists
Notes:
- Deprecated in expression mode - use standard DataFrame.groupby()
- GPU-optimized implementations provide significant speedup
- Supports custom aggregation functions with fallback to CPU
"""Available optimized aggregation operations that leverage GPU acceleration for maximum performance.
OPTIMIZED_AGGS = (
'count',
'mean',
'std',
'var',
'sum',
'min',
'max',
list,
'first',
'last'
)
"""
Tuple of aggregation functions optimized for GPU execution.
These aggregations are specially optimized in dask-cudf's legacy groupby
implementation to leverage cuDF's high-performance GPU operations.
"""Modern groupby operations using the expression-based API with automatic optimization and query planning.
class DataFrame:
def groupby(self, by, **kwargs):
"""
Group DataFrame using expression-based API.
Returns a DataFrameGroupBy object that supports the same aggregation
methods as pandas/Dask but with GPU acceleration.
Parameters:
- by: str, list, or callable - Grouping specification
- **kwargs: Additional groupby arguments
Returns:
DataFrameGroupBy - Groupby object for aggregations
Examples:
df.groupby('category').sum()
df.groupby(['region', 'category']).agg({'value': 'mean', 'count': 'sum'})
"""
class Series:
def groupby(self, by, **kwargs):
"""
Group Series using expression-based API.
Parameters:
- by: str, list, or callable - Grouping specification
- **kwargs: Additional groupby arguments
Returns:
SeriesGroupBy - Groupby object for aggregations
"""import dask_cudf
import cudf
import os
# Ensure legacy mode is enabled
os.environ['DASK_DATAFRAME__QUERY_PLANNING'] = 'False'
# Create sample data
df = cudf.DataFrame({
'category': ['A', 'B', 'A', 'B', 'A', 'B'] * 1000,
'value': range(6000),
'amount': [1.1, 2.2, 3.3, 4.4, 5.5, 6.6] * 1000
})
ddf = dask_cudf.from_cudf(df, npartitions=4)
# Single aggregation
result1 = dask_cudf.groupby_agg(ddf, 'category', 'sum')
# Multiple aggregations
result2 = dask_cudf.groupby_agg(
ddf,
'category',
{'value': 'mean', 'amount': ['sum', 'count']}
)
print(result1.compute())
print(result2.compute())import dask
import dask_cudf
import cudf
# Ensure expression mode (default in newer versions)
# os.environ['DASK_DATAFRAME__QUERY_PLANNING'] = 'True'
# Create sample data
df = cudf.DataFrame({
'region': ['North', 'South', 'East', 'West'] * 2500,
'category': ['X', 'Y'] * 5000,
'sales': range(10000),
'profit': [x * 0.1 for x in range(10000)]
})
ddf = dask_cudf.from_cudf(df, npartitions=8)
# Standard groupby operations
by_region = ddf.groupby('region')['sales'].sum()
by_category = ddf.groupby('category').agg({
'sales': 'mean',
'profit': ['sum', 'std']
})
# Multi-level groupby
multi_group = ddf.groupby(['region', 'category']).agg({
'sales': 'sum',
'profit': 'mean'
})
# Compute results
print("By Region:")
print(by_region.compute())
print("\nBy Category:")
print(by_category.compute())
print("\nMulti-level:")
print(multi_group.compute())import time
import cudf
import dask_cudf
import pandas as pd
import dask.dataframe as dd
# Create large dataset
n_rows = 1_000_000
df_cudf = cudf.DataFrame({
'group': ['A', 'B', 'C', 'D'] * (n_rows // 4),
'value1': range(n_rows),
'value2': [x * 0.5 for x in range(n_rows)]
})
df_pandas = df_cudf.to_pandas()
# Dask-cuDF version
ddf_cudf = dask_cudf.from_cudf(df_cudf, npartitions=10)
# Dask-pandas version
ddf_pandas = dd.from_pandas(df_pandas, npartitions=10)
# Time GPU-accelerated groupby
start_time = time.time()
result_gpu = ddf_cudf.groupby('group').agg({
'value1': 'sum',
'value2': 'mean'
}).compute()
gpu_time = time.time() - start_time
# Time CPU groupby
start_time = time.time()
result_cpu = ddf_pandas.groupby('group').agg({
'value1': 'sum',
'value2': 'mean'
}).compute()
cpu_time = time.time() - start_time
print(f"GPU time: {gpu_time:.2f}s")
print(f"CPU time: {cpu_time:.2f}s")
print(f"Speedup: {cpu_time/gpu_time:.1f}x")# Custom aggregation function
def custom_agg(series):
"""Custom aggregation: ratio of max to min"""
if len(series) == 0:
return 0
return series.max() / series.min() if series.min() != 0 else float('inf')
# Apply custom aggregation
result = ddf.groupby('category')['value'].agg(custom_agg)
print(result.compute())
# Multiple custom aggregations
result = ddf.groupby('region').agg({
'sales': [custom_agg, 'sum'],
'profit': ['mean', lambda x: x.std() / x.mean()] # Coefficient of variation
})
print(result.compute())Install with Tessl CLI
npx tessl i tessl/pypi-dask-cudf