CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-dask-cudf

Dask cuDF - A GPU Backend for Dask DataFrame providing GPU-accelerated parallel and larger-than-memory DataFrame computing

Pending
Overview
Eval results
Files

groupby-operations.mddocs/

Groupby and Aggregation Operations

Optimized groupby operations leveraging GPU parallelism for high-performance aggregations with support for custom aggregation functions and advanced grouping patterns.

Capabilities

Legacy Groupby Aggregation

High-performance groupby aggregation function available in legacy mode with GPU-optimized implementations for common aggregation operations.

def groupby_agg(df, by, agg_dict, split_out=None, split_every=None, **kwargs):
    """
    Perform optimized groupby aggregation with GPU acceleration.
    
    Available only when DASK_DATAFRAME__QUERY_PLANNING=False (legacy mode).
    Provides significant performance improvements for supported aggregations.
    
    Parameters:
    - df: DataFrame - Input DataFrame to group
    - by: str, list, or callable - Grouping key(s)
    - agg_dict: dict or str - Aggregation specification
        - str: Single aggregation function name
        - dict: {column: aggregation} mapping
    - split_out: int, optional - Number of output partitions
    - split_every: int, optional - Tree reduction branching factor
    - **kwargs: Additional arguments for groupby operation
    
    Returns:
    DataFrame - Aggregated results
    
    Supported Optimized Aggregations:
    - 'count': Count non-null values
    - 'mean': Arithmetic mean
    - 'std': Standard deviation
    - 'var': Variance
    - 'sum': Sum of values
    - 'min': Minimum value
    - 'max': Maximum value
    - 'first': First non-null value
    - 'last': Last non-null value
    - list: Collect values into lists
    
    Notes:
    - Deprecated in expression mode - use standard DataFrame.groupby()
    - GPU-optimized implementations provide significant speedup
    - Supports custom aggregation functions with fallback to CPU
    """

Optimized Aggregations Constant

Available optimized aggregation operations that leverage GPU acceleration for maximum performance.

OPTIMIZED_AGGS = (
    'count',
    'mean', 
    'std',
    'var',
    'sum',
    'min',
    'max',
    list,
    'first',
    'last'
)
"""
Tuple of aggregation functions optimized for GPU execution.

These aggregations are specially optimized in dask-cudf's legacy groupby
implementation to leverage cuDF's high-performance GPU operations.
"""

Expression-Based Groupby (Modern API)

Modern groupby operations using the expression-based API with automatic optimization and query planning.

class DataFrame:
    def groupby(self, by, **kwargs):
        """
        Group DataFrame using expression-based API.
        
        Returns a DataFrameGroupBy object that supports the same aggregation
        methods as pandas/Dask but with GPU acceleration.
        
        Parameters:
        - by: str, list, or callable - Grouping specification
        - **kwargs: Additional groupby arguments
        
        Returns:
        DataFrameGroupBy - Groupby object for aggregations
        
        Examples:
        df.groupby('category').sum()
        df.groupby(['region', 'category']).agg({'value': 'mean', 'count': 'sum'})
        """

class Series:
    def groupby(self, by, **kwargs):
        """
        Group Series using expression-based API.
        
        Parameters:
        - by: str, list, or callable - Grouping specification
        - **kwargs: Additional groupby arguments
        
        Returns:
        SeriesGroupBy - Groupby object for aggregations
        """

Usage Examples

Legacy Groupby Aggregation

import dask_cudf
import cudf
import os

# Ensure legacy mode is enabled
os.environ['DASK_DATAFRAME__QUERY_PLANNING'] = 'False'

# Create sample data
df = cudf.DataFrame({
    'category': ['A', 'B', 'A', 'B', 'A', 'B'] * 1000,
    'value': range(6000),
    'amount': [1.1, 2.2, 3.3, 4.4, 5.5, 6.6] * 1000
})

ddf = dask_cudf.from_cudf(df, npartitions=4)

# Single aggregation
result1 = dask_cudf.groupby_agg(ddf, 'category', 'sum')

# Multiple aggregations
result2 = dask_cudf.groupby_agg(
    ddf, 
    'category',
    {'value': 'mean', 'amount': ['sum', 'count']}
)

print(result1.compute())
print(result2.compute())

Modern Expression-Based Groupby

import dask
import dask_cudf
import cudf

# Ensure expression mode (default in newer versions)
# os.environ['DASK_DATAFRAME__QUERY_PLANNING'] = 'True'

# Create sample data
df = cudf.DataFrame({
    'region': ['North', 'South', 'East', 'West'] * 2500,
    'category': ['X', 'Y'] * 5000,
    'sales': range(10000),
    'profit': [x * 0.1 for x in range(10000)]
})

ddf = dask_cudf.from_cudf(df, npartitions=8)

# Standard groupby operations
by_region = ddf.groupby('region')['sales'].sum()
by_category = ddf.groupby('category').agg({
    'sales': 'mean',
    'profit': ['sum', 'std']
})

# Multi-level groupby
multi_group = ddf.groupby(['region', 'category']).agg({
    'sales': 'sum',
    'profit': 'mean'
})

# Compute results
print("By Region:")
print(by_region.compute())

print("\nBy Category:")
print(by_category.compute())

print("\nMulti-level:")
print(multi_group.compute())

Performance Comparison Example

import time
import cudf
import dask_cudf
import pandas as pd
import dask.dataframe as dd

# Create large dataset
n_rows = 1_000_000
df_cudf = cudf.DataFrame({
    'group': ['A', 'B', 'C', 'D'] * (n_rows // 4),
    'value1': range(n_rows),
    'value2': [x * 0.5 for x in range(n_rows)]
})

df_pandas = df_cudf.to_pandas()

# Dask-cuDF version
ddf_cudf = dask_cudf.from_cudf(df_cudf, npartitions=10)

# Dask-pandas version  
ddf_pandas = dd.from_pandas(df_pandas, npartitions=10)

# Time GPU-accelerated groupby
start_time = time.time()
result_gpu = ddf_cudf.groupby('group').agg({
    'value1': 'sum',
    'value2': 'mean'
}).compute()
gpu_time = time.time() - start_time

# Time CPU groupby
start_time = time.time()
result_cpu = ddf_pandas.groupby('group').agg({
    'value1': 'sum', 
    'value2': 'mean'
}).compute()
cpu_time = time.time() - start_time

print(f"GPU time: {gpu_time:.2f}s")
print(f"CPU time: {cpu_time:.2f}s")
print(f"Speedup: {cpu_time/gpu_time:.1f}x")

Custom Aggregations

# Custom aggregation function
def custom_agg(series):
    """Custom aggregation: ratio of max to min"""
    if len(series) == 0:
        return 0
    return series.max() / series.min() if series.min() != 0 else float('inf')

# Apply custom aggregation
result = ddf.groupby('category')['value'].agg(custom_agg)
print(result.compute())

# Multiple custom aggregations
result = ddf.groupby('region').agg({
    'sales': [custom_agg, 'sum'],
    'profit': ['mean', lambda x: x.std() / x.mean()]  # Coefficient of variation
})
print(result.compute())

Install with Tessl CLI

npx tessl i tessl/pypi-dask-cudf

docs

core-operations.md

data-io.md

data-type-accessors.md

groupby-operations.md

index.md

tile.json