Dask cuDF - A GPU Backend for Dask DataFrame providing GPU-accelerated parallel and larger-than-memory DataFrame computing
—
Core functionality for creating, manipulating, and converting GPU-accelerated DataFrames with seamless Dask DataFrame API compatibility and distributed computing support.
Create Dask-cuDF collections from cuDF objects with automatic partitioning and memory-efficient data distribution across GPU workers.
def from_cudf(data, npartitions=None, chunksize=None, sort=True, name=None):
"""
Create a Dask-cuDF collection from a cuDF object.
Parameters:
- data: cudf.DataFrame, cudf.Series, or cudf.Index - Source cuDF object
- npartitions: int, optional - Number of partitions to create
- chunksize: int, optional - Approximate size of each partition in rows
- sort: bool, default True - Whether to sort by index
- name: str, optional - Name for the collection
Returns:
DataFrame, Series, or Index - Corresponding Dask-cuDF collection
Raises:
- NotImplementedError: If data has MultiIndex (not supported)
Notes:
- Either npartitions or chunksize should be specified
- Uses Dask's from_pandas internally with cuDF backend
"""Primary collection types providing distributed GPU-accelerated data structures with Dask DataFrame compatibility.
class DataFrame:
"""
A distributed Dask DataFrame backed by cuDF DataFrames.
Provides GPU-accelerated operations with pandas-like API.
Most Dask DataFrame operations are supported with GPU acceleration.
"""
def __init__(self, dsk, name, meta, divisions):
"""
Initialize DataFrame.
Parameters:
- dsk: dict - Task graph
- name: str - Collection name
- meta: cudf.DataFrame - Metadata object
- divisions: tuple - Index divisions
"""
class Series:
"""
A distributed Dask Series backed by cuDF Series.
Provides GPU-accelerated operations for single-column data.
"""
def __init__(self, dsk, name, meta, divisions):
"""
Initialize Series.
Parameters:
- dsk: dict - Task graph
- name: str - Collection name
- meta: cudf.Series - Metadata object
- divisions: tuple - Index divisions
"""
class Index:
"""
A distributed Dask Index backed by cuDF Index.
Provides GPU-accelerated index operations.
"""
def __init__(self, dsk, name, meta, divisions):
"""
Initialize Index.
Parameters:
- dsk: dict - Task graph
- name: str - Collection name
- meta: cudf.Index - Metadata object
- divisions: tuple - Index divisions
"""Combine multiple DataFrames along specified axes with GPU acceleration and optimized memory management.
def concat(dfs, axis=0, join='outer', ignore_index=False, **kwargs):
"""
Concatenate Dask-cuDF objects along axis.
This is an alias to dask.dataframe.concat that works with cuDF backend.
Parameters:
- dfs: sequence - Objects to concatenate
- axis: int, default 0 - Axis to concatenate along
- join: str, default 'outer' - How to handle indexes
- ignore_index: bool, default False - Reset index in result
- **kwargs: Additional arguments passed to cudf.concat
Returns:
DataFrame or Series - Concatenated result
"""
def from_delayed(dasks, meta=None, divisions=None, prefix='from-delayed', verify_meta=True):
"""
Create DataFrame from list of delayed objects.
This function is a thin wrapper around dask.dataframe.from_delayed,
creating Dask-cuDF collections from Dask delayed tasks that return cuDF objects.
Parameters:
- dasks: list of Delayed objects - Tasks that return cuDF DataFrames/Series
- meta: DataFrame or Series, optional - Metadata defining structure and dtypes
- divisions: sequence, optional - Index divisions for result
- prefix: str, default 'from-delayed' - Task name prefix in graph
- verify_meta: bool, default True - Verify metadata consistency
Returns:
DataFrame or Series - Dask-cuDF collection from delayed tasks
Notes:
- Automatically uses cuDF backend for delayed task results
- Tasks should return cuDF DataFrame or Series objects
- Meta parameter should match the structure of delayed task outputs
"""import cudf
import dask_cudf
# Create cuDF DataFrame
cudf_df = cudf.DataFrame({
'a': [1, 2, 3, 4, 5, 6],
'b': [10, 20, 30, 40, 50, 60],
'c': ['x', 'y', 'z', 'x', 'y', 'z']
})
# Convert to Dask-cuDF with 2 partitions
ddf = dask_cudf.from_cudf(cudf_df, npartitions=2)
# Access partition information
print(f"Partitions: {ddf.npartitions}")
print(f"Columns: {list(ddf.columns)}")# Create cuDF Series
cudf_series = cudf.Series([1, 4, 9, 16, 25, 36], name='squares')
# Convert to Dask-cuDF Series
dseries = dask_cudf.from_cudf(cudf_series, npartitions=3)
# Perform operations
result = dseries.sum().compute()
print(f"Sum of squares: {result}")# Create multiple DataFrames
df1 = cudf.DataFrame({'x': [1, 2], 'y': [3, 4]})
df2 = cudf.DataFrame({'x': [5, 6], 'y': [7, 8]})
# Convert to Dask-cuDF
ddf1 = dask_cudf.from_cudf(df1, npartitions=1)
ddf2 = dask_cudf.from_cudf(df2, npartitions=1)
# Concatenate
combined = dask_cudf.concat([ddf1, ddf2])
result = combined.compute()
### Creating DataFrame from Delayed Tasks
```python
import dask
import cudf
import dask_cudf
# Create delayed tasks that return cuDF DataFrames
@dask.delayed
def load_partition(i):
# Simulate loading data partition
return cudf.DataFrame({
'id': range(i*100, (i+1)*100),
'value': range(i*100, (i+1)*100)
})
# Create list of delayed tasks
delayed_tasks = [load_partition(i) for i in range(5)]
# Convert to Dask-cuDF DataFrame
ddf = dask_cudf.from_delayed(
delayed_tasks,
meta=cudf.DataFrame({'id': [], 'value': []}, dtype='int64')
)
print(f"Created DataFrame with {ddf.npartitions} partitions")
print(ddf.compute().head())Install with Tessl CLI
npx tessl i tessl/pypi-dask-cudf