CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-daft

Distributed Dataframes for Multimodal Data with high-performance query engine and support for complex nested data structures, AI/ML operations, and seamless cloud storage integration.

Pending
Overview
Eval results
Files

data-io.mddocs/

Data Input/Output

Comprehensive data reading and writing capabilities supporting 10+ formats with optimized cloud storage integration. Daft provides high-performance I/O with intelligent caching, predicate pushdown, and parallel processing.

Capabilities

Parquet Files

Read and write Apache Parquet files with columnar optimization and metadata handling.

def read_parquet(
    path: Union[str, List[str]], 
    columns: Optional[List[str]] = None,
    predicate: Optional[Expression] = None,
    io_config: Optional[IOConfig] = None,
    **kwargs
) -> DataFrame:
    """
    Read Parquet files from local filesystem or cloud storage.
    
    Parameters:
    - path: File path(s) or glob pattern
    - columns: Specific columns to read (all if None)
    - predicate: Filter predicate for pushdown optimization
    - io_config: IO configuration for cloud storage
    
    Returns:
    DataFrame: DataFrame from Parquet data
    """

CSV Files

Read comma-separated value files with flexible parsing options.

def read_csv(
    path: Union[str, List[str]],
    delimiter: str = ",",
    has_header: bool = True,
    column_names: Optional[List[str]] = None,
    dtype: Optional[Dict[str, DataType]] = None,
    io_config: Optional[IOConfig] = None,
    **kwargs
) -> DataFrame:
    """
    Read CSV files with customizable parsing.
    
    Parameters:
    - path: File path(s) or glob pattern
    - delimiter: Field separator character
    - has_header: Whether first row contains column names
    - column_names: Explicit column names (overrides header)
    - dtype: Column data type specifications
    - io_config: IO configuration for cloud storage
    
    Returns:
    DataFrame: DataFrame from CSV data
    """

JSON Files

Read JSON and JSONL (newline-delimited JSON) files.

def read_json(
    path: Union[str, List[str]],
    schema: Optional[Schema] = None,
    io_config: Optional[IOConfig] = None,
    **kwargs
) -> DataFrame:
    """
    Read JSON/JSONL files with schema inference.
    
    Parameters:
    - path: File path(s) or glob pattern
    - schema: Explicit schema (inferred if None)
    - io_config: IO configuration for cloud storage
    
    Returns:
    DataFrame: DataFrame from JSON data
    """

Delta Lake

Read Apache Delta Lake tables with time travel and metadata support.

def read_deltalake(
    table_uri: str,
    version: Optional[int] = None,
    timestamp: Optional[str] = None,
    columns: Optional[List[str]] = None,
    predicate: Optional[Expression] = None,
    io_config: Optional[IOConfig] = None,
    **kwargs
) -> DataFrame:
    """
    Read Delta Lake tables with time travel capability.
    
    Parameters:
    - table_uri: Delta table URI or path
    - version: Specific table version to read
    - timestamp: Read table as of timestamp
    - columns: Specific columns to read
    - predicate: Filter predicate for optimization
    - io_config: IO configuration for cloud storage
    
    Returns:
    DataFrame: DataFrame from Delta table
    """

Apache Iceberg

Read Apache Iceberg tables with catalog integration.

def read_iceberg(
    table: str,
    columns: Optional[List[str]] = None,
    predicate: Optional[Expression] = None,
    snapshot_id: Optional[int] = None,
    io_config: Optional[IOConfig] = None,
    **kwargs
) -> DataFrame:
    """
    Read Apache Iceberg tables.
    
    Parameters:
    - table: Table identifier or path
    - columns: Specific columns to read
    - predicate: Filter predicate for optimization
    - snapshot_id: Specific snapshot to read
    - io_config: IO configuration
    
    Returns:
    DataFrame: DataFrame from Iceberg table
    """

Apache Hudi

Read Apache Hudi tables with incremental processing support.

def read_hudi(
    table_uri: str,
    columns: Optional[List[str]] = None,
    predicate: Optional[Expression] = None,
    io_config: Optional[IOConfig] = None,
    **kwargs
) -> DataFrame:
    """
    Read Apache Hudi tables.
    
    Parameters:
    - table_uri: Hudi table URI or path
    - columns: Specific columns to read
    - predicate: Filter predicate for optimization
    - io_config: IO configuration
    
    Returns:
    DataFrame: DataFrame from Hudi table
    """

Lance Columnar Format

Read Lance columnar format optimized for ML workloads.

def read_lance(
    uri: str,
    columns: Optional[List[str]] = None,
    predicate: Optional[Expression] = None,
    io_config: Optional[IOConfig] = None,
    **kwargs
) -> DataFrame:
    """
    Read Lance columnar format.
    
    Parameters:
    - uri: Lance dataset URI
    - columns: Specific columns to read
    - predicate: Filter predicate
    - io_config: IO configuration
    
    Returns:
    DataFrame: DataFrame from Lance data
    """

SQL Databases

Read data from SQL databases with connection management.

def read_sql(
    sql: str,
    connection_string: str,
    io_config: Optional[IOConfig] = None,
    **kwargs
) -> DataFrame:
    """
    Read data from SQL databases.
    
    Parameters:
    - sql: SQL query to execute
    - connection_string: Database connection string
    - io_config: IO configuration
    
    Returns:
    DataFrame: DataFrame from SQL query results
    """

HuggingFace Datasets

Read datasets from HuggingFace Hub.

def read_huggingface(
    path: str,
    split: Optional[str] = None,
    streaming: bool = False,
    io_config: Optional[IOConfig] = None,
    **kwargs
) -> DataFrame:
    """
    Read HuggingFace datasets.
    
    Parameters:
    - path: Dataset name or path on HuggingFace Hub
    - split: Dataset split to read (train, test, validation)
    - streaming: Enable streaming mode for large datasets
    - io_config: IO configuration
    
    Returns:
    DataFrame: DataFrame from HuggingFace dataset
    """

Video Frames

Extract frames from video files for computer vision workloads.

def read_video_frames(
    path: Union[str, List[str]],
    sample_rate: Optional[float] = None,
    frame_count: Optional[int] = None,
    io_config: Optional[IOConfig] = None,
    **kwargs
) -> DataFrame:
    """
    Read video frames as DataFrame.
    
    Parameters:
    - path: Video file path(s)
    - sample_rate: Frames per second to extract
    - frame_count: Maximum number of frames
    - io_config: IO configuration
    
    Returns:
    DataFrame: DataFrame with video frame data
    """

Web Archives (WARC)

Read WARC files for web crawling and archival data.

def read_warc(
    path: Union[str, List[str]],
    columns: Optional[List[str]] = None,
    io_config: Optional[IOConfig] = None,
    **kwargs
) -> DataFrame:
    """
    Read WARC (Web ARChive) files.
    
    Parameters:
    - path: WARC file path(s)
    - columns: Specific columns to extract
    - io_config: IO configuration
    
    Returns:
    DataFrame: DataFrame from WARC data
    """

MCAP Robotics Format

Read MCAP files for robotics and sensor data.

def read_mcap(
    path: Union[str, List[str]],
    topics: Optional[List[str]] = None,
    io_config: Optional[IOConfig] = None,
    **kwargs
) -> DataFrame:
    """
    Read MCAP robotics format files.
    
    Parameters:
    - path: MCAP file path(s)
    - topics: Specific topics to read
    - io_config: IO configuration
    
    Returns:
    DataFrame: DataFrame from MCAP data
    """

Hugging Face Datasets

Read datasets from Hugging Face Hub.

def read_huggingface(
    dataset_name: str,
    split: Optional[str] = None,
    subset: Optional[str] = None,
    **kwargs
) -> DataFrame:
    """
    Read dataset from Hugging Face Hub.
    
    Parameters:
    - dataset_name: Name of the dataset on Hugging Face Hub
    - split: Dataset split (train, test, validation)
    - subset: Dataset subset/configuration name
    
    Returns:
    DataFrame: DataFrame from Hugging Face dataset
    """

File Path Utilities

Create DataFrames from file system patterns.

def from_glob_path(
    path: str,
    io_config: Optional[IOConfig] = None
) -> DataFrame:
    """
    Create DataFrame from file glob pattern.
    
    Parameters:
    - path: Glob pattern for files
    - io_config: IO configuration
    
    Returns:
    DataFrame: DataFrame with file metadata
    """

def range(n: int) -> DataFrame:
    """
    Create DataFrame with range of integers.
    
    Parameters:
    - n: Number of integers (0 to n-1)
    
    Returns:
    DataFrame: DataFrame with single integer column
    """

IO Configuration

Core Configuration Classes

class IOConfig:
    """General IO configuration settings."""
    def __init__(
        self,
        s3: Optional[S3Config] = None,
        azure: Optional[AzureConfig] = None,
        gcs: Optional[GCSConfig] = None,
        http: Optional[HTTPConfig] = None
    ): ...

class S3Config:
    """AWS S3 configuration."""
    def __init__(
        self,
        region_name: Optional[str] = None,
        endpoint_url: Optional[str] = None,
        credentials: Optional[S3Credentials] = None,
        use_ssl: bool = True
    ): ...

class S3Credentials:
    """S3 authentication credentials."""
    def __init__(
        self,
        access_key_id: str,
        secret_access_key: str,
        session_token: Optional[str] = None
    ): ...

class AzureConfig:
    """Azure Blob Storage configuration."""
    def __init__(
        self,
        storage_account: Optional[str] = None,
        access_key: Optional[str] = None,
        sas_token: Optional[str] = None
    ): ...

class GCSConfig:
    """Google Cloud Storage configuration."""
    def __init__(
        self,
        project_id: Optional[str] = None,
        service_account_key: Optional[str] = None
    ): ...

class HTTPConfig:
    """HTTP client configuration."""
    def __init__(
        self,
        timeout: Optional[int] = None,
        max_retries: Optional[int] = None,
        headers: Optional[Dict[str, str]] = None
    ): ...

Usage Examples

Cloud Storage Access

import daft
from daft.io import IOConfig, S3Config, S3Credentials

# Configure S3 access
s3_config = S3Config(
    region_name="us-west-2",
    credentials=S3Credentials(
        access_key_id="your-key",
        secret_access_key="your-secret"
    )
)
io_config = IOConfig(s3=s3_config)

# Read from S3
df = daft.read_parquet(
    "s3://my-bucket/data/*.parquet",
    io_config=io_config
)

Multi-Format Pipeline

# Read from multiple sources
parquet_df = daft.read_parquet("data/raw/*.parquet")
csv_df = daft.read_csv("data/supplements.csv")
delta_df = daft.read_deltalake("s3://bucket/delta-table")

# Combine data
combined = (parquet_df
    .union_all(csv_df)
    .union_all(delta_df)
    .collect()
)

Optimized Reading

from daft import col

# Use predicate pushdown for efficiency
filtered_df = daft.read_parquet(
    "s3://large-dataset/*.parquet",
    columns=["id", "name", "value"],
    predicate=(col("date") >= "2024-01-01") & (col("status") == "active"),
    io_config=io_config
)

Video Processing

# Extract video frames for ML
video_df = daft.read_video_frames(
    "videos/*.mp4",
    sample_rate=1.0,  # 1 frame per second
    frame_count=100   # Max 100 frames per video
)

# Process frames
processed = (video_df
    .select("filename", "frame_index", "image_data")
    .filter(col("frame_index") % 10 == 0)  # Every 10th frame
    .collect()
)

Data Sources and Sinks

class DataSource:
    """Abstract data source interface for custom readers."""
    
class DataSourceTask:
    """Represents a task for reading data from a source."""

class DataSink:
    """Abstract data sink interface for custom writers."""

Install with Tessl CLI

npx tessl i tessl/pypi-daft

docs

ai-ml.md

catalog.md

data-io.md

dataframe-operations.md

expressions.md

index.md

session.md

sql.md

udf.md

tile.json