Distributed Dataframes for Multimodal Data with high-performance query engine and support for complex nested data structures, AI/ML operations, and seamless cloud storage integration.
—
Comprehensive data reading and writing capabilities supporting 10+ formats with optimized cloud storage integration. Daft provides high-performance I/O with intelligent caching, predicate pushdown, and parallel processing.
Read and write Apache Parquet files with columnar optimization and metadata handling.
def read_parquet(
path: Union[str, List[str]],
columns: Optional[List[str]] = None,
predicate: Optional[Expression] = None,
io_config: Optional[IOConfig] = None,
**kwargs
) -> DataFrame:
"""
Read Parquet files from local filesystem or cloud storage.
Parameters:
- path: File path(s) or glob pattern
- columns: Specific columns to read (all if None)
- predicate: Filter predicate for pushdown optimization
- io_config: IO configuration for cloud storage
Returns:
DataFrame: DataFrame from Parquet data
"""Read comma-separated value files with flexible parsing options.
def read_csv(
path: Union[str, List[str]],
delimiter: str = ",",
has_header: bool = True,
column_names: Optional[List[str]] = None,
dtype: Optional[Dict[str, DataType]] = None,
io_config: Optional[IOConfig] = None,
**kwargs
) -> DataFrame:
"""
Read CSV files with customizable parsing.
Parameters:
- path: File path(s) or glob pattern
- delimiter: Field separator character
- has_header: Whether first row contains column names
- column_names: Explicit column names (overrides header)
- dtype: Column data type specifications
- io_config: IO configuration for cloud storage
Returns:
DataFrame: DataFrame from CSV data
"""Read JSON and JSONL (newline-delimited JSON) files.
def read_json(
path: Union[str, List[str]],
schema: Optional[Schema] = None,
io_config: Optional[IOConfig] = None,
**kwargs
) -> DataFrame:
"""
Read JSON/JSONL files with schema inference.
Parameters:
- path: File path(s) or glob pattern
- schema: Explicit schema (inferred if None)
- io_config: IO configuration for cloud storage
Returns:
DataFrame: DataFrame from JSON data
"""Read Apache Delta Lake tables with time travel and metadata support.
def read_deltalake(
table_uri: str,
version: Optional[int] = None,
timestamp: Optional[str] = None,
columns: Optional[List[str]] = None,
predicate: Optional[Expression] = None,
io_config: Optional[IOConfig] = None,
**kwargs
) -> DataFrame:
"""
Read Delta Lake tables with time travel capability.
Parameters:
- table_uri: Delta table URI or path
- version: Specific table version to read
- timestamp: Read table as of timestamp
- columns: Specific columns to read
- predicate: Filter predicate for optimization
- io_config: IO configuration for cloud storage
Returns:
DataFrame: DataFrame from Delta table
"""Read Apache Iceberg tables with catalog integration.
def read_iceberg(
table: str,
columns: Optional[List[str]] = None,
predicate: Optional[Expression] = None,
snapshot_id: Optional[int] = None,
io_config: Optional[IOConfig] = None,
**kwargs
) -> DataFrame:
"""
Read Apache Iceberg tables.
Parameters:
- table: Table identifier or path
- columns: Specific columns to read
- predicate: Filter predicate for optimization
- snapshot_id: Specific snapshot to read
- io_config: IO configuration
Returns:
DataFrame: DataFrame from Iceberg table
"""Read Apache Hudi tables with incremental processing support.
def read_hudi(
table_uri: str,
columns: Optional[List[str]] = None,
predicate: Optional[Expression] = None,
io_config: Optional[IOConfig] = None,
**kwargs
) -> DataFrame:
"""
Read Apache Hudi tables.
Parameters:
- table_uri: Hudi table URI or path
- columns: Specific columns to read
- predicate: Filter predicate for optimization
- io_config: IO configuration
Returns:
DataFrame: DataFrame from Hudi table
"""Read Lance columnar format optimized for ML workloads.
def read_lance(
uri: str,
columns: Optional[List[str]] = None,
predicate: Optional[Expression] = None,
io_config: Optional[IOConfig] = None,
**kwargs
) -> DataFrame:
"""
Read Lance columnar format.
Parameters:
- uri: Lance dataset URI
- columns: Specific columns to read
- predicate: Filter predicate
- io_config: IO configuration
Returns:
DataFrame: DataFrame from Lance data
"""Read data from SQL databases with connection management.
def read_sql(
sql: str,
connection_string: str,
io_config: Optional[IOConfig] = None,
**kwargs
) -> DataFrame:
"""
Read data from SQL databases.
Parameters:
- sql: SQL query to execute
- connection_string: Database connection string
- io_config: IO configuration
Returns:
DataFrame: DataFrame from SQL query results
"""Read datasets from HuggingFace Hub.
def read_huggingface(
path: str,
split: Optional[str] = None,
streaming: bool = False,
io_config: Optional[IOConfig] = None,
**kwargs
) -> DataFrame:
"""
Read HuggingFace datasets.
Parameters:
- path: Dataset name or path on HuggingFace Hub
- split: Dataset split to read (train, test, validation)
- streaming: Enable streaming mode for large datasets
- io_config: IO configuration
Returns:
DataFrame: DataFrame from HuggingFace dataset
"""Extract frames from video files for computer vision workloads.
def read_video_frames(
path: Union[str, List[str]],
sample_rate: Optional[float] = None,
frame_count: Optional[int] = None,
io_config: Optional[IOConfig] = None,
**kwargs
) -> DataFrame:
"""
Read video frames as DataFrame.
Parameters:
- path: Video file path(s)
- sample_rate: Frames per second to extract
- frame_count: Maximum number of frames
- io_config: IO configuration
Returns:
DataFrame: DataFrame with video frame data
"""Read WARC files for web crawling and archival data.
def read_warc(
path: Union[str, List[str]],
columns: Optional[List[str]] = None,
io_config: Optional[IOConfig] = None,
**kwargs
) -> DataFrame:
"""
Read WARC (Web ARChive) files.
Parameters:
- path: WARC file path(s)
- columns: Specific columns to extract
- io_config: IO configuration
Returns:
DataFrame: DataFrame from WARC data
"""Read MCAP files for robotics and sensor data.
def read_mcap(
path: Union[str, List[str]],
topics: Optional[List[str]] = None,
io_config: Optional[IOConfig] = None,
**kwargs
) -> DataFrame:
"""
Read MCAP robotics format files.
Parameters:
- path: MCAP file path(s)
- topics: Specific topics to read
- io_config: IO configuration
Returns:
DataFrame: DataFrame from MCAP data
"""Read datasets from Hugging Face Hub.
def read_huggingface(
dataset_name: str,
split: Optional[str] = None,
subset: Optional[str] = None,
**kwargs
) -> DataFrame:
"""
Read dataset from Hugging Face Hub.
Parameters:
- dataset_name: Name of the dataset on Hugging Face Hub
- split: Dataset split (train, test, validation)
- subset: Dataset subset/configuration name
Returns:
DataFrame: DataFrame from Hugging Face dataset
"""Create DataFrames from file system patterns.
def from_glob_path(
path: str,
io_config: Optional[IOConfig] = None
) -> DataFrame:
"""
Create DataFrame from file glob pattern.
Parameters:
- path: Glob pattern for files
- io_config: IO configuration
Returns:
DataFrame: DataFrame with file metadata
"""
def range(n: int) -> DataFrame:
"""
Create DataFrame with range of integers.
Parameters:
- n: Number of integers (0 to n-1)
Returns:
DataFrame: DataFrame with single integer column
"""class IOConfig:
"""General IO configuration settings."""
def __init__(
self,
s3: Optional[S3Config] = None,
azure: Optional[AzureConfig] = None,
gcs: Optional[GCSConfig] = None,
http: Optional[HTTPConfig] = None
): ...
class S3Config:
"""AWS S3 configuration."""
def __init__(
self,
region_name: Optional[str] = None,
endpoint_url: Optional[str] = None,
credentials: Optional[S3Credentials] = None,
use_ssl: bool = True
): ...
class S3Credentials:
"""S3 authentication credentials."""
def __init__(
self,
access_key_id: str,
secret_access_key: str,
session_token: Optional[str] = None
): ...
class AzureConfig:
"""Azure Blob Storage configuration."""
def __init__(
self,
storage_account: Optional[str] = None,
access_key: Optional[str] = None,
sas_token: Optional[str] = None
): ...
class GCSConfig:
"""Google Cloud Storage configuration."""
def __init__(
self,
project_id: Optional[str] = None,
service_account_key: Optional[str] = None
): ...
class HTTPConfig:
"""HTTP client configuration."""
def __init__(
self,
timeout: Optional[int] = None,
max_retries: Optional[int] = None,
headers: Optional[Dict[str, str]] = None
): ...import daft
from daft.io import IOConfig, S3Config, S3Credentials
# Configure S3 access
s3_config = S3Config(
region_name="us-west-2",
credentials=S3Credentials(
access_key_id="your-key",
secret_access_key="your-secret"
)
)
io_config = IOConfig(s3=s3_config)
# Read from S3
df = daft.read_parquet(
"s3://my-bucket/data/*.parquet",
io_config=io_config
)# Read from multiple sources
parquet_df = daft.read_parquet("data/raw/*.parquet")
csv_df = daft.read_csv("data/supplements.csv")
delta_df = daft.read_deltalake("s3://bucket/delta-table")
# Combine data
combined = (parquet_df
.union_all(csv_df)
.union_all(delta_df)
.collect()
)from daft import col
# Use predicate pushdown for efficiency
filtered_df = daft.read_parquet(
"s3://large-dataset/*.parquet",
columns=["id", "name", "value"],
predicate=(col("date") >= "2024-01-01") & (col("status") == "active"),
io_config=io_config
)# Extract video frames for ML
video_df = daft.read_video_frames(
"videos/*.mp4",
sample_rate=1.0, # 1 frame per second
frame_count=100 # Max 100 frames per video
)
# Process frames
processed = (video_df
.select("filename", "frame_index", "image_data")
.filter(col("frame_index") % 10 == 0) # Every 10th frame
.collect()
)class DataSource:
"""Abstract data source interface for custom readers."""
class DataSourceTask:
"""Represents a task for reading data from a source."""
class DataSink:
"""Abstract data sink interface for custom writers."""Install with Tessl CLI
npx tessl i tessl/pypi-daft