Distributed Dataframes for Multimodal Data with high-performance query engine and support for complex nested data structures, AI/ML operations, and seamless cloud storage integration.
npx @tessl/cli install tessl/pypi-daft@0.6.0Daft is a distributed query engine for large-scale data processing that provides both Python DataFrame API and SQL interface, implemented in Rust for high performance. It specializes in multimodal data types including Images, URLs, Tensors and complex nested data structures, built on Apache Arrow for seamless interchange and record-setting I/O performance with cloud storage systems like S3.
pip install daftpip install 'daft[aws,azure,gcp,ray,pandas,sql,iceberg,deltalake,unity]'import daftFor common operations:
from daft import DataFrame, col, lit, when, coalesce
import daft.functions as FFor specific functionality:
from daft import (
# Data conversion functions
from_pydict, from_pandas, from_arrow, from_ray_dataset, from_dask_dataframe,
# Data I/O functions
read_parquet, read_csv, read_json, read_deltalake, read_iceberg,
read_sql, read_lance, read_video_frames, read_warc, read_mcap,
read_huggingface, from_glob_path,
# Session and catalog management
current_session, set_catalog, attach_catalog, list_tables,
# SQL interface
sql, sql_expr,
# UDF creation
func, udf,
# Configuration
set_execution_config, set_planning_config,
# Types and utilities
DataType, Schema, Window, ResourceRequest,
ImageFormat, ImageMode, TimeUnit
)import daft
# Create DataFrame from Python data
df = daft.from_pydict({
"name": ["Alice", "Bob", "Charlie"],
"age": [25, 30, 35],
"city": ["New York", "London", "Tokyo"]
})
# Basic operations
result = (df
.filter(col("age") > 28)
.select("name", "city", (col("age") + 1).alias("next_age"))
.collect()
)
# SQL interface
df2 = daft.sql("SELECT name, city FROM df WHERE age > 28")
# Read from various formats
parquet_df = daft.read_parquet("s3://bucket/data/*.parquet")
csv_df = daft.read_csv("data.csv")
delta_df = daft.read_deltalake("s3://bucket/delta-table")Daft follows a distributed, lazy evaluation architecture optimized for modern data workloads:
Core DataFrame functionality including creation, selection, filtering, grouping, aggregation, and joining operations. Supports both lazy and eager evaluation with distributed processing.
class DataFrame:
def select(*columns: ColumnInputType, **projections: Expression) -> DataFrame: ...
def filter(predicate: Union[Expression, str]) -> DataFrame: ...
def groupby(*group_by: ManyColumnsInputType) -> GroupedDataFrame: ...
def collect(num_preview_rows: Optional[int] = 8) -> DataFrame: ...Reading and writing data from multiple formats including CSV, Parquet, JSON, Delta Lake, Apache Iceberg, Hudi, Lance, and databases. Optimized for cloud storage with support for AWS S3, Azure Blob, and Google Cloud Storage.
def read_parquet(path: Union[str, List[str]], **kwargs) -> DataFrame: ...
def read_csv(path: Union[str, List[str]], **kwargs) -> DataFrame: ...
def read_deltalake(table_uri: str, **kwargs) -> DataFrame: ...
def read_iceberg(table: str, **kwargs) -> DataFrame: ...Column expressions for data transformation, computation, and manipulation. Includes mathematical operations, string processing, date/time handling, and conditional logic.
def col(name: str) -> Expression: ...
def lit(value: Any) -> Expression: ...
def coalesce(*exprs: Expression) -> Expression: ...
def when(predicate: Expression) -> Expression: ...Support for custom Python functions with three execution modes: row-wise (1-to-1), async row-wise, and generator (1-to-many). Functions can be decorated to work seamlessly with DataFrame operations.
@daft.func
def custom_function(input: str) -> str: ...
@daft.func
async def async_function(input: str) -> str: ...
@daft.func
def generator_function(input: str) -> Iterator[str]: ...Execute SQL queries directly on DataFrames and registered tables. Supports standard SQL syntax with extensions for multimodal data operations.
def sql(query: str) -> DataFrame: ...
def sql_expr(expression: str) -> Expression: ...Built-in functions for AI and machine learning workflows including text embeddings, LLM generation, and model inference operations.
def embed_text(text: Expression, model: str) -> Expression: ...
def llm_generate(prompt: Expression, model: str) -> Expression: ...Integration with data catalogs for metadata management, table discovery, and governance. Supports Unity Catalog, Apache Iceberg, AWS Glue, and custom catalog implementations.
class Catalog:
def list_tables(pattern: str = None) -> List[Identifier]: ...
def get_table(identifier: Union[Identifier, str]) -> Table: ...
def create_table(identifier: Union[Identifier, str], source: Union[Schema, DataFrame]) -> Table: ...Session-based configuration and resource management for distributed computing. Handles catalog connections, temporary tables, and execution settings.
def set_execution_config(config: ExecutionConfig) -> None: ...
def set_planning_config(config: PlanningConfig) -> None: ...
def current_session() -> Session: ...Column-level data container and operations.
class Series:
@property
def name(self) -> str:
"""Get series name."""
def rename(self, name: str) -> Series:
"""Rename series."""
def datatype(self) -> DataType:
"""Get data type."""
def __len__(self) -> int:
"""Get length."""
def to_arrow(self) -> "pyarrow.Array":
"""Convert to Apache Arrow array."""
def to_pylist(self) -> List[Any]:
"""Convert to Python list."""
def cast(self, dtype: DataType) -> Series:
"""Cast to different data type."""
def filter(self, mask: Series) -> Series:
"""Filter by boolean mask."""
def take(self, idx: Series) -> Series:
"""Take values by indices."""
def slice(self, start: int, end: int) -> Series:
"""Slice series."""File metadata and operations.
class File:
"""File handling and metadata operations."""
@property
def path(self) -> str:
"""Get file path."""
@property
def size(self) -> int:
"""Get file size in bytes."""
def read(self) -> bytes:
"""Read file contents."""Schema definitions for DataFrames.
class Schema:
"""Schema definition for DataFrame structure."""
def column_names(self) -> List[str]:
"""Get column names."""
def to_pydict(self) -> Dict[str, DataType]:
"""Convert to Python dictionary."""class DataType:
@staticmethod
def int8() -> DataType: ...
@staticmethod
def int16() -> DataType: ...
@staticmethod
def int32() -> DataType: ...
@staticmethod
def int64() -> DataType: ...
@staticmethod
def uint8() -> DataType: ...
@staticmethod
def uint16() -> DataType: ...
@staticmethod
def uint32() -> DataType: ...
@staticmethod
def uint64() -> DataType: ...
@staticmethod
def float32() -> DataType: ...
@staticmethod
def float64() -> DataType: ...
@staticmethod
def bool() -> DataType: ...
@staticmethod
def string() -> DataType: ...
@staticmethod
def binary() -> DataType: ...
@staticmethod
def date() -> DataType: ...
@staticmethod
def timestamp(unit: TimeUnit) -> DataType: ...
@staticmethod
def list(inner: DataType) -> DataType: ...
@staticmethod
def struct(fields: Dict[str, DataType]) -> DataType: ...
@staticmethod
def image(mode: ImageMode = None) -> DataType: ...
@staticmethod
def tensor(dtype: DataType) -> DataType: ...
enum TimeUnit:
Nanoseconds
Microseconds
Milliseconds
Seconds
enum ImageMode:
L # 8-bit grayscale
LA # 8-bit grayscale with alpha
RGB # 8-bit RGB
RGBA # 8-bit RGB with alpha
enum ImageFormat:
PNG
JPEG
TIFF
GIF
BMPclass ResourceRequest:
"""Resource allocation specification for distributed tasks."""
def __init__(
self,
num_cpus: float = None,
num_gpus: float = None,
memory_bytes: int = None
): ...
def refresh_logger() -> None:
"""Refresh Daft's internal rust logging to current python log level."""def register_viz_hook(hook_fn: Callable) -> None:
"""Register custom visualization hook for DataFrame display."""