Native Delta Lake Python binding based on delta-rs with Pandas integration
—
Converting Delta tables to various formats including pandas DataFrames, PyArrow tables, and streaming readers for efficient data processing and integration with the Python data ecosystem.
from deltalake import DeltaTable
from typing import Callable, Any
import pandas as pd
import pyarrow
import pyarrow.fs as pa_fs
from pyarrow.dataset import Expression
from arro3.core import RecordBatch, RecordBatchReader
# Type aliases for filtering
FilterLiteralType = tuple[str, str, Any]
FilterConjunctionType = list[FilterLiteralType]
FilterDNFType = list[FilterConjunctionType]
FilterType = FilterConjunctionType | FilterDNFTypedef to_pandas(
self,
partitions: list[tuple[str, str, Any]] | None = None,
columns: list[str] | None = None,
filesystem: str | pa_fs.FileSystem | None = None,
filters: FilterType | Expression | None = None,
types_mapper: Callable[[pyarrow.DataType], Any] | None = None
) -> pd.DataFrame: ...Convert Delta table to pandas DataFrame with optional column selection and filtering.
Parameters:
partitions: Partition-level filters for efficient data accesscolumns: Specific columns to include in the resultfilesystem: Custom filesystem for reading files (string path or PyArrow FileSystem)filters: Row-level filters in DNF (Disjunctive Normal Form) or PyArrow Expressiontypes_mapper: Optional function to map PyArrow data types to pandas typesdef to_pyarrow_table(
self,
partitions: list[tuple[str, str, Any]] | None = None,
columns: list[str] | None = None,
filesystem: str | pa_fs.FileSystem | None = None,
filters: FilterType | Expression | None = None
) -> pyarrow.Table: ...
def to_pyarrow_dataset(
self,
partitions: list[tuple[str, str, Any]] | None = None,
filesystem: Any | None = None
) -> pyarrow.dataset.Dataset: ...
def to_pyarrow_scan(
self,
columns: list[str] | None = None,
filters: list[tuple[str, str, Any]] | list[list[tuple[str, str, Any]]] | None = None,
partitions: list[tuple[str, str, Any]] | None = None,
limit: int | None = None,
batch_size: int | None = None
) -> Iterator[RecordBatch]: ...def to_arro3_reader(
self,
columns: list[str] | None = None,
filters: list[tuple[str, str, Any]] | list[list[tuple[str, str, Any]]] | None = None,
partitions: list[tuple[str, str, Any]] | None = None,
batch_size: int | None = None
) -> RecordBatchReader: ...Create a streaming reader for processing large datasets without loading everything into memory.
def files(
self,
partition_filters: list[tuple[str, str, str | list[str]]] | None = None
) -> list[str]: ...
def file_uris(
self,
partition_filters: list[tuple[str, str, str | list[str]]] | None = None
) -> list[str]: ...
def get_add_actions(self, flatten: bool = False) -> RecordBatch: ...Access underlying file information and transaction log add actions.
def load_cdf(
self,
starting_version: int = 0,
ending_version: int | None = None,
starting_timestamp: str | None = None,
ending_timestamp: str | None = None,
columns: list[str] | None = None,
predicate: str | None = None,
allow_out_of_range: bool = False,
) -> RecordBatchReader: ...Load the Change Data Feed (CDF) to track row-level changes between table versions.
def __datafusion_table_provider__(self) -> Any: ...Internal method for DataFusion SQL engine integration (used by QueryBuilder).
from deltalake import DeltaTable
dt = DeltaTable("path/to/table")
# Convert to pandas
df = dt.to_pandas()
print(f"DataFrame shape: {df.shape}")
# Select specific columns
df_subset = dt.to_pandas(columns=["id", "name"])
# Convert to PyArrow Table
arrow_table = dt.to_pyarrow_table()
print(f"Arrow table schema: {arrow_table.schema}")# Row-level filters (DNF format)
# Single filter: age > 25
filters = [("age", ">", 25)]
df_filtered = dt.to_pandas(filters=filters)
# Multiple filters (AND): age > 25 AND name != "Alice"
filters = [("age", ">", 25), ("name", "!=", "Alice")]
df_filtered = dt.to_pandas(filters=filters)
# OR filters: (age > 25) OR (name = "Bob")
filters = [[("age", ">", 25)], [("name", "=", "Bob")]]
df_filtered = dt.to_pandas(filters=filters)
# Partition filters for performance
partition_filters = [("year", "=", "2023")]
df_2023 = dt.to_pandas(partitions=partition_filters)from deltalake import DeltaTable
dt = DeltaTable("path/to/large-table")
# Process data in batches
total_rows = 0
for batch in dt.to_pyarrow_scan(batch_size=10000):
total_rows += batch.num_rows
# Process batch
df_batch = batch.to_pandas()
# Your processing logic here
print(f"Processed {total_rows} total rows")
# Using arro3 reader
reader = dt.to_arro3_reader(batch_size=5000)
for batch in reader:
# Process each RecordBatch
print(f"Batch has {batch.num_rows} rows")import pyarrow.compute as pc
# Get as PyArrow dataset for advanced operations
dataset = dt.to_pyarrow_dataset()
# Use PyArrow compute functions
result = dataset.to_table(
filter=pc.and_(
pc.greater(pc.field("age"), 25),
pc.not_equal(pc.field("name"), "Alice")
),
columns=["id", "name", "age"]
)
# Convert to pandas if needed
df = result.to_pandas()# Get list of data files
files = dt.files()
print(f"Table has {len(files)} data files")
# Get full URIs
file_uris = dt.file_uris()
for uri in file_uris[:3]: # Show first 3
print(f"File: {uri}")
# Get detailed add actions from transaction log
add_actions = dt.get_add_actions()
add_df = add_actions.to_pandas()
print("Add actions:")
print(add_df[["path", "size", "modification_time"]].head())# For very large tables, use streaming approach
def process_large_table(table_path):
dt = DeltaTable(table_path)
# Process in chunks to manage memory
chunk_size = 50000
processed_count = 0
for batch in dt.to_pyarrow_scan(batch_size=chunk_size):
# Convert to pandas for processing
chunk_df = batch.to_pandas()
# Your data processing logic
processed_data = chunk_df.groupby("category").sum()
processed_count += len(chunk_df)
print(f"Processed {processed_count} rows so far...")
# Save intermediate results or accumulate
# processed_data.to_csv(f"output_chunk_{processed_count}.csv")
return processed_count
# Use the function
total_processed = process_large_table("path/to/huge-table")Install with Tessl CLI
npx tessl i tessl/pypi-deltalake