CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-deltalake

Native Delta Lake Python binding based on delta-rs with Pandas integration

Pending
Overview
Eval results
Files

data-reading.mddocs/

Data Reading and Conversion

Converting Delta tables to various formats including pandas DataFrames, PyArrow tables, and streaming readers for efficient data processing and integration with the Python data ecosystem.

Imports

from deltalake import DeltaTable
from typing import Callable, Any
import pandas as pd
import pyarrow
import pyarrow.fs as pa_fs
from pyarrow.dataset import Expression
from arro3.core import RecordBatch, RecordBatchReader

# Type aliases for filtering
FilterLiteralType = tuple[str, str, Any]
FilterConjunctionType = list[FilterLiteralType]
FilterDNFType = list[FilterConjunctionType]
FilterType = FilterConjunctionType | FilterDNFType

Capabilities

Pandas Integration

def to_pandas(
    self,
    partitions: list[tuple[str, str, Any]] | None = None,
    columns: list[str] | None = None,
    filesystem: str | pa_fs.FileSystem | None = None,
    filters: FilterType | Expression | None = None,
    types_mapper: Callable[[pyarrow.DataType], Any] | None = None
) -> pd.DataFrame: ...

Convert Delta table to pandas DataFrame with optional column selection and filtering.

Parameters:

  • partitions: Partition-level filters for efficient data access
  • columns: Specific columns to include in the result
  • filesystem: Custom filesystem for reading files (string path or PyArrow FileSystem)
  • filters: Row-level filters in DNF (Disjunctive Normal Form) or PyArrow Expression
  • types_mapper: Optional function to map PyArrow data types to pandas types

PyArrow Integration

def to_pyarrow_table(
    self,
    partitions: list[tuple[str, str, Any]] | None = None,
    columns: list[str] | None = None,
    filesystem: str | pa_fs.FileSystem | None = None,
    filters: FilterType | Expression | None = None
) -> pyarrow.Table: ...

def to_pyarrow_dataset(
    self,
    partitions: list[tuple[str, str, Any]] | None = None,
    filesystem: Any | None = None
) -> pyarrow.dataset.Dataset: ...

def to_pyarrow_scan(
    self,
    columns: list[str] | None = None,
    filters: list[tuple[str, str, Any]] | list[list[tuple[str, str, Any]]] | None = None,
    partitions: list[tuple[str, str, Any]] | None = None,
    limit: int | None = None,
    batch_size: int | None = None
) -> Iterator[RecordBatch]: ...

Streaming Data Access

def to_arro3_reader(
    self,
    columns: list[str] | None = None,
    filters: list[tuple[str, str, Any]] | list[list[tuple[str, str, Any]]] | None = None,
    partitions: list[tuple[str, str, Any]] | None = None,
    batch_size: int | None = None
) -> RecordBatchReader: ...

Create a streaming reader for processing large datasets without loading everything into memory.

File and Action Information

def files(
    self, 
    partition_filters: list[tuple[str, str, str | list[str]]] | None = None
) -> list[str]: ...

def file_uris(
    self, 
    partition_filters: list[tuple[str, str, str | list[str]]] | None = None
) -> list[str]: ...

def get_add_actions(self, flatten: bool = False) -> RecordBatch: ...

Access underlying file information and transaction log add actions.

Change Data Feed (CDF)

def load_cdf(
    self,
    starting_version: int = 0,
    ending_version: int | None = None,
    starting_timestamp: str | None = None,
    ending_timestamp: str | None = None,
    columns: list[str] | None = None,
    predicate: str | None = None,
    allow_out_of_range: bool = False,
) -> RecordBatchReader: ...

Load the Change Data Feed (CDF) to track row-level changes between table versions.

DataFusion Integration

def __datafusion_table_provider__(self) -> Any: ...

Internal method for DataFusion SQL engine integration (used by QueryBuilder).

Usage Examples

Basic Data Reading

from deltalake import DeltaTable

dt = DeltaTable("path/to/table")

# Convert to pandas
df = dt.to_pandas()
print(f"DataFrame shape: {df.shape}")

# Select specific columns
df_subset = dt.to_pandas(columns=["id", "name"])

# Convert to PyArrow Table
arrow_table = dt.to_pyarrow_table()
print(f"Arrow table schema: {arrow_table.schema}")

Filtering Data

# Row-level filters (DNF format)
# Single filter: age > 25
filters = [("age", ">", 25)]
df_filtered = dt.to_pandas(filters=filters)

# Multiple filters (AND): age > 25 AND name != "Alice"
filters = [("age", ">", 25), ("name", "!=", "Alice")]
df_filtered = dt.to_pandas(filters=filters)

# OR filters: (age > 25) OR (name = "Bob")
filters = [[("age", ">", 25)], [("name", "=", "Bob")]]
df_filtered = dt.to_pandas(filters=filters)

# Partition filters for performance
partition_filters = [("year", "=", "2023")]
df_2023 = dt.to_pandas(partitions=partition_filters)

Streaming Large Datasets

from deltalake import DeltaTable

dt = DeltaTable("path/to/large-table")

# Process data in batches
total_rows = 0
for batch in dt.to_pyarrow_scan(batch_size=10000):
    total_rows += batch.num_rows
    # Process batch
    df_batch = batch.to_pandas()
    # Your processing logic here
    
print(f"Processed {total_rows} total rows")

# Using arro3 reader
reader = dt.to_arro3_reader(batch_size=5000)
for batch in reader:
    # Process each RecordBatch
    print(f"Batch has {batch.num_rows} rows")

Working with PyArrow Datasets

import pyarrow.compute as pc

# Get as PyArrow dataset for advanced operations
dataset = dt.to_pyarrow_dataset()

# Use PyArrow compute functions
result = dataset.to_table(
    filter=pc.and_(
        pc.greater(pc.field("age"), 25),
        pc.not_equal(pc.field("name"), "Alice")
    ),
    columns=["id", "name", "age"]
)

# Convert to pandas if needed
df = result.to_pandas()

File Information

# Get list of data files
files = dt.files()
print(f"Table has {len(files)} data files")

# Get full URIs
file_uris = dt.file_uris()
for uri in file_uris[:3]:  # Show first 3
    print(f"File: {uri}")

# Get detailed add actions from transaction log
add_actions = dt.get_add_actions()
add_df = add_actions.to_pandas()
print("Add actions:")
print(add_df[["path", "size", "modification_time"]].head())

Memory-Efficient Processing

# For very large tables, use streaming approach
def process_large_table(table_path):
    dt = DeltaTable(table_path)
    
    # Process in chunks to manage memory
    chunk_size = 50000
    processed_count = 0
    
    for batch in dt.to_pyarrow_scan(batch_size=chunk_size):
        # Convert to pandas for processing
        chunk_df = batch.to_pandas()
        
        # Your data processing logic
        processed_data = chunk_df.groupby("category").sum()
        
        processed_count += len(chunk_df)
        print(f"Processed {processed_count} rows so far...")
        
        # Save intermediate results or accumulate
        # processed_data.to_csv(f"output_chunk_{processed_count}.csv")
    
    return processed_count

# Use the function
total_processed = process_large_table("path/to/huge-table")

Install with Tessl CLI

npx tessl i tessl/pypi-deltalake

docs

data-reading.md

index.md

query-operations.md

schema-management.md

table-maintenance.md

table-operations.md

transaction-management.md

writing-modification.md

tile.json