or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

ai-ml.mdcatalog.mddata-io.mddataframe-operations.mdexpressions.mdindex.mdsession.mdsql.mdudf.md
tile.json

tessl/pypi-daft

Distributed Dataframes for Multimodal Data with high-performance query engine and support for complex nested data structures, AI/ML operations, and seamless cloud storage integration.

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
pypipkg:pypi/daft@0.6.x

To install, run

npx @tessl/cli install tessl/pypi-daft@0.6.0

index.mddocs/

Daft

Daft is a distributed query engine for large-scale data processing that provides both Python DataFrame API and SQL interface, implemented in Rust for high performance. It specializes in multimodal data types including Images, URLs, Tensors and complex nested data structures, built on Apache Arrow for seamless interchange and record-setting I/O performance with cloud storage systems like S3.

Package Information

  • Package Name: daft
  • Language: Python
  • Installation: pip install daft
  • Optional Dependencies: pip install 'daft[aws,azure,gcp,ray,pandas,sql,iceberg,deltalake,unity]'

Core Imports

import daft

For common operations:

from daft import DataFrame, col, lit, when, coalesce
import daft.functions as F

For specific functionality:

from daft import (
    # Data conversion functions
    from_pydict, from_pandas, from_arrow, from_ray_dataset, from_dask_dataframe,
    
    # Data I/O functions  
    read_parquet, read_csv, read_json, read_deltalake, read_iceberg,
    read_sql, read_lance, read_video_frames, read_warc, read_mcap,
    read_huggingface, from_glob_path,
    
    # Session and catalog management
    current_session, set_catalog, attach_catalog, list_tables,
    
    # SQL interface
    sql, sql_expr,
    
    # UDF creation
    func, udf,
    
    # Configuration
    set_execution_config, set_planning_config,
    
    # Types and utilities
    DataType, Schema, Window, ResourceRequest,
    ImageFormat, ImageMode, TimeUnit
)

Basic Usage

import daft

# Create DataFrame from Python data
df = daft.from_pydict({
    "name": ["Alice", "Bob", "Charlie"],
    "age": [25, 30, 35],
    "city": ["New York", "London", "Tokyo"]
})

# Basic operations
result = (df
    .filter(col("age") > 28)
    .select("name", "city", (col("age") + 1).alias("next_age"))
    .collect()
)

# SQL interface
df2 = daft.sql("SELECT name, city FROM df WHERE age > 28")

# Read from various formats
parquet_df = daft.read_parquet("s3://bucket/data/*.parquet")
csv_df = daft.read_csv("data.csv")
delta_df = daft.read_deltalake("s3://bucket/delta-table")

Architecture

Daft follows a distributed, lazy evaluation architecture optimized for modern data workloads:

  • DataFrames: Distributed data structures supporting both relational and multimodal operations
  • Expressions: Column-level computations with type safety and optimization
  • IO Layer: High-performance readers for 10+ data formats with cloud storage optimization
  • Query Engine: Rust-based execution with intelligent caching and predicate pushdown
  • Catalog Integration: Native support for data catalogs (Iceberg, Delta, Unity, Glue)
  • AI/ML Integration: Built-in functions for embeddings, LLM operations, and model inference

Capabilities

DataFrame Operations

Core DataFrame functionality including creation, selection, filtering, grouping, aggregation, and joining operations. Supports both lazy and eager evaluation with distributed processing.

class DataFrame:
    def select(*columns: ColumnInputType, **projections: Expression) -> DataFrame: ...
    def filter(predicate: Union[Expression, str]) -> DataFrame: ...
    def groupby(*group_by: ManyColumnsInputType) -> GroupedDataFrame: ...
    def collect(num_preview_rows: Optional[int] = 8) -> DataFrame: ...

DataFrame Operations

Data Input/Output

Reading and writing data from multiple formats including CSV, Parquet, JSON, Delta Lake, Apache Iceberg, Hudi, Lance, and databases. Optimized for cloud storage with support for AWS S3, Azure Blob, and Google Cloud Storage.

def read_parquet(path: Union[str, List[str]], **kwargs) -> DataFrame: ...
def read_csv(path: Union[str, List[str]], **kwargs) -> DataFrame: ...
def read_deltalake(table_uri: str, **kwargs) -> DataFrame: ...
def read_iceberg(table: str, **kwargs) -> DataFrame: ...

Data Input/Output

Expressions and Functions

Column expressions for data transformation, computation, and manipulation. Includes mathematical operations, string processing, date/time handling, and conditional logic.

def col(name: str) -> Expression: ...
def lit(value: Any) -> Expression: ...
def coalesce(*exprs: Expression) -> Expression: ...
def when(predicate: Expression) -> Expression: ...

Expressions and Functions

User-Defined Functions

Support for custom Python functions with three execution modes: row-wise (1-to-1), async row-wise, and generator (1-to-many). Functions can be decorated to work seamlessly with DataFrame operations.

@daft.func
def custom_function(input: str) -> str: ...

@daft.func  
async def async_function(input: str) -> str: ...

@daft.func
def generator_function(input: str) -> Iterator[str]: ...

User-Defined Functions

SQL Interface

Execute SQL queries directly on DataFrames and registered tables. Supports standard SQL syntax with extensions for multimodal data operations.

def sql(query: str) -> DataFrame: ...
def sql_expr(expression: str) -> Expression: ...

SQL Interface

AI/ML Functions

Built-in functions for AI and machine learning workflows including text embeddings, LLM generation, and model inference operations.

def embed_text(text: Expression, model: str) -> Expression: ...
def llm_generate(prompt: Expression, model: str) -> Expression: ...

AI/ML Functions

Data Catalog Integration

Integration with data catalogs for metadata management, table discovery, and governance. Supports Unity Catalog, Apache Iceberg, AWS Glue, and custom catalog implementations.

class Catalog:
    def list_tables(pattern: str = None) -> List[Identifier]: ...
    def get_table(identifier: Union[Identifier, str]) -> Table: ...
    def create_table(identifier: Union[Identifier, str], source: Union[Schema, DataFrame]) -> Table: ...

Data Catalog Integration

Session Management

Session-based configuration and resource management for distributed computing. Handles catalog connections, temporary tables, and execution settings.

def set_execution_config(config: ExecutionConfig) -> None: ...
def set_planning_config(config: PlanningConfig) -> None: ...
def current_session() -> Session: ...

Session Management

Core Data Types

Series

Column-level data container and operations.

class Series:
    @property
    def name(self) -> str:
        """Get series name."""
    
    def rename(self, name: str) -> Series:
        """Rename series."""
    
    def datatype(self) -> DataType:
        """Get data type."""
    
    def __len__(self) -> int:
        """Get length."""
    
    def to_arrow(self) -> "pyarrow.Array":
        """Convert to Apache Arrow array."""
    
    def to_pylist(self) -> List[Any]:
        """Convert to Python list."""
    
    def cast(self, dtype: DataType) -> Series:
        """Cast to different data type."""
    
    def filter(self, mask: Series) -> Series:
        """Filter by boolean mask."""
    
    def take(self, idx: Series) -> Series:
        """Take values by indices."""
    
    def slice(self, start: int, end: int) -> Series:
        """Slice series."""

File

File metadata and operations.

class File:
    """File handling and metadata operations."""
    
    @property
    def path(self) -> str:
        """Get file path."""
    
    @property
    def size(self) -> int:
        """Get file size in bytes."""
    
    def read(self) -> bytes:
        """Read file contents."""

Schema

Schema definitions for DataFrames.

class Schema:
    """Schema definition for DataFrame structure."""
    
    def column_names(self) -> List[str]:
        """Get column names."""
    
    def to_pydict(self) -> Dict[str, DataType]:
        """Convert to Python dictionary."""

Data Types

class DataType:
    @staticmethod
    def int8() -> DataType: ...
    @staticmethod  
    def int16() -> DataType: ...
    @staticmethod
    def int32() -> DataType: ...
    @staticmethod
    def int64() -> DataType: ...
    @staticmethod
    def uint8() -> DataType: ...
    @staticmethod
    def uint16() -> DataType: ...
    @staticmethod
    def uint32() -> DataType: ...
    @staticmethod
    def uint64() -> DataType: ...
    @staticmethod
    def float32() -> DataType: ...
    @staticmethod
    def float64() -> DataType: ...
    @staticmethod
    def bool() -> DataType: ...
    @staticmethod
    def string() -> DataType: ...
    @staticmethod
    def binary() -> DataType: ...
    @staticmethod
    def date() -> DataType: ...
    @staticmethod
    def timestamp(unit: TimeUnit) -> DataType: ...
    @staticmethod
    def list(inner: DataType) -> DataType: ...
    @staticmethod
    def struct(fields: Dict[str, DataType]) -> DataType: ...
    @staticmethod
    def image(mode: ImageMode = None) -> DataType: ...
    @staticmethod
    def tensor(dtype: DataType) -> DataType: ...

enum TimeUnit:
    Nanoseconds
    Microseconds  
    Milliseconds
    Seconds

enum ImageMode:
    L     # 8-bit grayscale
    LA    # 8-bit grayscale with alpha
    RGB   # 8-bit RGB
    RGBA  # 8-bit RGB with alpha

enum ImageFormat:
    PNG
    JPEG
    TIFF
    GIF
    BMP

Resource Management

class ResourceRequest:
    """Resource allocation specification for distributed tasks."""
    
    def __init__(
        self,
        num_cpus: float = None,
        num_gpus: float = None,
        memory_bytes: int = None
    ): ...

def refresh_logger() -> None:
    """Refresh Daft's internal rust logging to current python log level."""

Visualization

def register_viz_hook(hook_fn: Callable) -> None:
    """Register custom visualization hook for DataFrame display."""