Distributed Dataframes for Multimodal Data with high-performance query engine and support for complex nested data structures, AI/ML operations, and seamless cloud storage integration.
—
Core DataFrame functionality for distributed data processing. DataFrames are the primary data structure in Daft, providing lazy evaluation, distributed processing, and rich transformation capabilities.
Create DataFrames from various Python data structures and external sources.
def from_pydict(data: Dict[str, List[Any]]) -> DataFrame:
"""
Create DataFrame from Python dictionary.
Parameters:
- data: Dictionary with column names as keys and lists of values
Returns:
DataFrame: New DataFrame instance
"""
def from_pylist(data: List[Dict[str, Any]]) -> DataFrame:
"""
Create DataFrame from list of dictionaries.
Parameters:
- data: List of dictionaries representing rows
Returns:
DataFrame: New DataFrame instance
"""
def from_pandas(df: "pandas.DataFrame") -> DataFrame:
"""
Create DataFrame from pandas DataFrame.
Parameters:
- df: pandas DataFrame to convert
Returns:
DataFrame: New DataFrame instance
"""
def from_arrow(table: "pyarrow.Table") -> DataFrame:
"""
Create DataFrame from Apache Arrow table.
Parameters:
- table: pyarrow Table to convert
Returns:
DataFrame: New DataFrame instance
"""
def from_ray_dataset(ds: "ray.data.Dataset") -> DataFrame:
"""
Create DataFrame from Ray dataset.
Parameters:
- ds: Ray dataset to convert
Returns:
DataFrame: New DataFrame instance
"""
def from_dask_dataframe(ddf: "dask.DataFrame") -> DataFrame:
"""
Create DataFrame from Dask DataFrame.
Parameters:
- ddf: Dask DataFrame to convert
Returns:
DataFrame: New DataFrame instance
"""Select, rename, and transform columns in DataFrames.
class DataFrame:
def select(*columns: ColumnInputType, **projections: Expression) -> DataFrame:
"""
Select columns and create new projections.
Parameters:
- columns: Column names or expressions to select
- projections: Named expressions for new columns
Returns:
DataFrame: New DataFrame with selected columns
"""
def exclude(*names: str) -> DataFrame:
"""
Exclude columns by name.
Parameters:
- names: Column names to exclude
Returns:
DataFrame: New DataFrame without excluded columns
"""
def with_column_renamed(existing: str, new: str) -> DataFrame:
"""
Rename a single column.
Parameters:
- existing: Current column name
- new: New column name
Returns:
DataFrame: New DataFrame with renamed column
"""
def with_columns_renamed(cols_map: Dict[str, str]) -> DataFrame:
"""
Rename multiple columns.
Parameters:
- cols_map: Dictionary mapping old names to new names
Returns:
DataFrame: New DataFrame with renamed columns
"""Filter rows based on conditions and slice DataFrames.
class DataFrame:
def filter(predicate: Union[Expression, str]) -> DataFrame:
"""
Filter rows by condition.
Parameters:
- predicate: Boolean expression or SQL WHERE clause
Returns:
DataFrame: New DataFrame with filtered rows
"""
def where(predicate: Union[Expression, str]) -> DataFrame:
"""
Alias for filter().
Parameters:
- predicate: Boolean expression or SQL WHERE clause
Returns:
DataFrame: New DataFrame with filtered rows
"""
def limit(num: int) -> DataFrame:
"""
Limit to first N rows.
Parameters:
- num: Maximum number of rows to return
Returns:
DataFrame: New DataFrame with limited rows
"""
def offset(num: int) -> DataFrame:
"""
Skip first N rows.
Parameters:
- num: Number of rows to skip
Returns:
DataFrame: New DataFrame starting from offset
"""Remove duplicates, null values, and NaN values.
class DataFrame:
def drop_duplicates(*subset: ColumnInputType) -> DataFrame:
"""
Remove duplicate rows.
Parameters:
- subset: Column names to consider for duplicates (all columns if empty)
Returns:
DataFrame: New DataFrame without duplicates
"""
def distinct(*on: ColumnInputType) -> DataFrame:
"""
Get distinct rows.
Parameters:
- on: Column names to consider for distinctness (all columns if empty)
Returns:
DataFrame: New DataFrame with distinct rows
"""
def drop_null(*cols: ColumnInputType) -> DataFrame:
"""
Drop rows with null values.
Parameters:
- cols: Column names to check for nulls (all columns if empty)
Returns:
DataFrame: New DataFrame without null rows
"""
def drop_nan(*cols: ColumnInputType) -> DataFrame:
"""
Drop rows with NaN values.
Parameters:
- cols: Column names to check for NaN (all columns if empty)
Returns:
DataFrame: New DataFrame without NaN rows
"""Group data and perform aggregation operations.
class DataFrame:
def groupby(*group_by: ManyColumnsInputType) -> GroupedDataFrame:
"""
Group DataFrame by columns.
Parameters:
- group_by: Column names or expressions to group by
Returns:
GroupedDataFrame: Grouped DataFrame for aggregation
"""
def sum(*cols: ColumnInputType) -> DataFrame:
"""
Sum numeric columns.
Parameters:
- cols: Column names to sum (all numeric columns if empty)
Returns:
DataFrame: DataFrame with sum aggregation
"""
def mean(*cols: ColumnInputType) -> DataFrame:
"""
Calculate mean of numeric columns.
Parameters:
- cols: Column names to average (all numeric columns if empty)
Returns:
DataFrame: DataFrame with mean aggregation
"""
def count(*cols: ColumnInputType) -> DataFrame:
"""
Count non-null values.
Parameters:
- cols: Column names to count (all columns if empty)
Returns:
DataFrame: DataFrame with count aggregation
"""
def agg(*to_agg: Union[Expression, Iterable[Expression]]) -> DataFrame:
"""
General aggregation with expressions.
Parameters:
- to_agg: Aggregation expressions
Returns:
DataFrame: DataFrame with custom aggregations
"""
class GroupedDataFrame:
def sum(*cols: ColumnInputType) -> DataFrame:
"""Sum within groups."""
def mean(*cols: ColumnInputType) -> DataFrame:
"""Mean within groups."""
def count(*cols: ColumnInputType) -> DataFrame:
"""Count within groups."""
def agg(*to_agg: Union[Expression, Iterable[Expression]]) -> DataFrame:
"""Custom aggregation within groups."""Combine DataFrames using set operations.
class DataFrame:
def union(other: DataFrame) -> DataFrame:
"""
Union with another DataFrame (removes duplicates).
Parameters:
- other: DataFrame to union with
Returns:
DataFrame: Combined DataFrame without duplicates
"""
def union_all(other: DataFrame) -> DataFrame:
"""
Union all rows with another DataFrame (keeps duplicates).
Parameters:
- other: DataFrame to union with
Returns:
DataFrame: Combined DataFrame with all rows
"""
def intersect(other: DataFrame) -> DataFrame:
"""
Intersection with another DataFrame.
Parameters:
- other: DataFrame to intersect with
Returns:
DataFrame: DataFrame with common rows
"""
def except_distinct(other: DataFrame) -> DataFrame:
"""
Rows in this DataFrame but not in other (distinct).
Parameters:
- other: DataFrame to subtract
Returns:
DataFrame: DataFrame with difference
"""Apply complex transformations and manipulations.
class DataFrame:
def explode(*columns: ColumnInputType) -> DataFrame:
"""
Explode array/list columns into separate rows.
Parameters:
- columns: Array/list column names to explode
Returns:
DataFrame: DataFrame with exploded columns
"""
def transform(func: Callable[..., DataFrame], *args: Any, **kwargs: Any) -> DataFrame:
"""
Apply transformation function to DataFrame.
Parameters:
- func: Function that takes DataFrame and returns DataFrame
- args: Positional arguments to pass to function
- kwargs: Keyword arguments to pass to function
Returns:
DataFrame: Transformed DataFrame
"""Execute lazy operations and materialize results.
class DataFrame:
def collect(num_preview_rows: Optional[int] = 8) -> DataFrame:
"""
Execute lazy operations and collect results.
Parameters:
- num_preview_rows: Number of rows to preview (for display)
Returns:
DataFrame: Materialized DataFrame
"""
def show(n: int = 8) -> None:
"""
Display first N rows of DataFrame.
Parameters:
- n: Number of rows to display
"""
def count_rows() -> int:
"""
Count total number of rows (materializes data).
Returns:
int: Total row count
"""Control data distribution and partitioning.
class DataFrame:
def repartition(num: Optional[int], *partition_by: ColumnInputType) -> DataFrame:
"""
Repartition DataFrame.
Parameters:
- num: Target number of partitions
- partition_by: Columns to partition by
Returns:
DataFrame: Repartitioned DataFrame
"""
def into_partitions(num: int) -> DataFrame:
"""
Distribute into specified number of partitions.
Parameters:
- num: Number of partitions
Returns:
DataFrame: DataFrame with specified partitions
"""Convert DataFrames to other formats.
class DataFrame:
def to_pandas(coerce_temporal_nanoseconds: bool = False) -> "pandas.DataFrame":
"""
Convert to pandas DataFrame.
Parameters:
- coerce_temporal_nanoseconds: Handle nanosecond precision
Returns:
pandas.DataFrame: Converted DataFrame
"""
def to_arrow() -> "pyarrow.Table":
"""
Convert to Apache Arrow table.
Returns:
pyarrow.Table: Arrow representation
"""
def to_pydict() -> Dict[str, List[Any]]:
"""
Convert to Python dictionary.
Returns:
Dict: Dictionary with column names as keys
"""
def to_pylist() -> List[Dict[str, Any]]:
"""
Convert to list of dictionaries.
Returns:
List: List of row dictionaries
"""import daft
from daft import col
# Create DataFrame
df = daft.from_pydict({
"name": ["Alice", "Bob", "Charlie", "Diana"],
"age": [25, 30, 35, 25],
"salary": [50000, 75000, 85000, 60000],
"department": ["Engineering", "Sales", "Engineering", "Marketing"]
})
# Filter and select
result = (df
.filter(col("age") >= 30)
.select("name", "department", (col("salary") * 1.1).alias("new_salary"))
.collect()
)
# Group and aggregate
dept_stats = (df
.groupby("department")
.agg(
col("salary").mean().alias("avg_salary"),
col("age").max().alias("max_age"),
col("name").count().alias("employee_count")
)
.collect()
)# Remove duplicates and null values, then transform
cleaned_df = (df
.drop_duplicates("name", "age")
.drop_null("salary")
.with_column_renamed("department", "dept")
.filter(col("salary") > 0)
.collect()
)ColumnInputType = Union[str, Expression]
ManyColumnsInputType = Union[ColumnInputType, Iterable[ColumnInputType]]Install with Tessl CLI
npx tessl i tessl/pypi-daft