CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-deltalake

Native Delta Lake Python binding based on delta-rs with Pandas integration

Pending
Overview
Eval results
Files

writing-modification.mddocs/

Writing and Data Modification

Functions for writing data to Delta tables and modifying existing records through update, delete, and merge operations. Provides ACID transaction guarantees and supports various data sources.

Capabilities

Writing Data

def write_deltalake(
    table_or_uri: str | Path | DeltaTable,
    data: ArrowStreamExportable | ArrowArrayExportable | Sequence[ArrowArrayExportable],
    *,
    partition_by: list[str] | str | None = None,
    mode: Literal["error", "append", "overwrite", "ignore"] = "error",
    name: str | None = None,
    description: str | None = None,
    configuration: Mapping[str, str | None] | None = None,
    schema_mode: Literal["merge", "overwrite"] | None = None,
    storage_options: dict[str, str] | None = None,
    target_file_size: int | None = None,
    writer_properties: WriterProperties | None = None,
    post_commithook_properties: PostCommitHookProperties | None = None,
    commit_properties: CommitProperties | None = None,
) -> None: ...

Write data to a Delta table with comprehensive configuration options.

Parameters:

  • table_or_uri: Target table path or existing DeltaTable instance
  • data: Data to write (pandas DataFrame, PyArrow Table, RecordBatch, etc.)
  • partition_by: Column names for partitioning
  • mode: Write mode behavior
  • schema_mode: How to handle schema differences
  • storage_options: Backend-specific configuration
  • writer_properties: Parquet writer configuration
  • commit_properties: Transaction commit settings
  • post_commithook_properties: Post-commit operations

Converting Existing Tables

def convert_to_deltalake(
    uri: str | Path,
    mode: Literal["error", "ignore"] = "error",
    partition_by: Schema | None = None,
    partition_strategy: Literal["hive"] | None = None,
    name: str | None = None,
    description: str | None = None,
    configuration: Mapping[str, str | None] | None = None,
    storage_options: dict[str, str] | None = None,
    commit_properties: CommitProperties | None = None,
    post_commithook_properties: PostCommitHookProperties | None = None,
) -> None: ...

Convert an existing Parquet table to Delta Lake format.

Update Operations

def update(
    self,
    updates: dict[str, str] | None = None,
    new_values: dict[str, int | float | str | datetime | bool | list[Any]] | None = None,
    predicate: str | None = None,
    writer_properties: WriterProperties | None = None,
    error_on_type_mismatch: bool = True,
    post_commithook_properties: PostCommitHookProperties | None = None,
    commit_properties: CommitProperties | None = None,
) -> dict[str, Any]: ...

Update records in the Delta table matching an optional predicate.

Parameters:

  • updates: Column name to SQL expression mapping
  • new_values: Column name to Python value mapping
  • predicate: SQL WHERE clause for filtering rows to update
  • writer_properties: Parquet writer configuration
  • error_on_type_mismatch: Raise error on type conflicts

Delete Operations

def delete(
    self,
    predicate: str | None = None,
    writer_properties: WriterProperties | None = None,
    post_commithook_properties: PostCommitHookProperties | None = None,
    commit_properties: CommitProperties | None = None,
) -> dict[str, Any]: ...

Delete records matching the specified predicate.

Merge Operations

def merge(
    self,
    source: pyarrow.Table | pyarrow.RecordBatch | pyarrow.dataset.Dataset,
    predicate: str,
    source_alias: str | None = None,
    target_alias: str | None = None,
    writer_properties: WriterProperties | None = None,
    post_commithook_properties: PostCommitHookProperties | None = None,
    commit_properties: CommitProperties | None = None,
) -> TableMerger: ...

Start a merge operation (UPSERT) with the specified source data.

TableMerger Class

class TableMerger:
    def when_matched_update(
        self,
        updates: dict[str, str] | None = None,
        predicate: str | None = None,
    ) -> TableMerger: ...
    
    def when_matched_update_all(self, predicate: str | None = None) -> TableMerger: ...
    
    def when_matched_delete(self, predicate: str | None = None) -> TableMerger: ...
    
    def when_not_matched_insert(
        self,
        updates: dict[str, str] | None = None,
        predicate: str | None = None,
    ) -> TableMerger: ...
    
    def when_not_matched_insert_all(self, predicate: str | None = None) -> TableMerger: ...
    
    def execute(self) -> dict[str, Any]: ...

Builder pattern for constructing complex merge operations.

Usage Examples

Basic Writing Operations

from deltalake import write_deltalake, DeltaTable
import pandas as pd

# Create sample data
data = pd.DataFrame({
    'id': [1, 2, 3, 4, 5],
    'name': ['Alice', 'Bob', 'Charlie', 'David', 'Eve'],
    'age': [25, 30, 35, 28, 32],
    'department': ['Engineering', 'Sales', 'Engineering', 'Sales', 'Marketing']
})

# Write to new table
write_deltalake("path/to/new-table", data, mode="error")

# Append more data
new_data = pd.DataFrame({
    'id': [6, 7],
    'name': ['Frank', 'Grace'],
    'age': [29, 27],
    'department': ['Engineering', 'Marketing']
})

write_deltalake("path/to/new-table", new_data, mode="append")

# Overwrite entire table
write_deltalake("path/to/new-table", data, mode="overwrite")

Partitioned Writing

# Write with partitioning
write_deltalake(
    "path/to/partitioned-table",
    data,
    partition_by=["department"],
    mode="error"
)

# Write with multiple partition columns
sales_data = pd.DataFrame({
    'id': range(100),
    'sale_date': pd.date_range('2023-01-01', periods=100),
    'amount': np.random.randint(100, 1000, 100),
    'region': np.random.choice(['US', 'EU', 'ASIA'], 100)
})

write_deltalake(
    "path/to/sales-table",
    sales_data,
    partition_by=["region", "sale_date"],
    mode="error"
)

Update Operations

dt = DeltaTable("path/to/table")

# Update with new values
result = dt.update(
    predicate="age < 30",
    new_values={"department": "Junior Engineering"}
)
print(f"Updated {result['num_updated_rows']} rows")

# Update with SQL expressions
dt.update(
    predicate="department = 'Sales'",
    updates={"age": "age + 1"}  # Increment age by 1
)

# Conditional updates
dt.update(
    predicate="name = 'Alice'",
    new_values={
        "age": 26,
        "department": "Senior Engineering"
    }
)

Delete Operations

# Delete specific records
result = dt.delete(predicate="age > 35")
print(f"Deleted {result['num_deleted_rows']} rows")

# Delete all records from a department
dt.delete(predicate="department = 'Marketing'")

# Delete without predicate (truncate)
dt.delete()  # Deletes all rows

Merge Operations (UPSERT)

import pyarrow as pa

# Source data for merge
source_data = pa.table({
    'id': [2, 3, 6],
    'name': ['Bob Updated', 'Charlie Updated', 'New Person'],
    'age': [31, 36, 40],
    'department': ['Sales', 'Engineering', 'HR']
})

# Perform merge operation
merge_result = (
    dt.merge(
        source=source_data,
        predicate="target.id = source.id",
        source_alias="source",
        target_alias="target"
    )
    .when_matched_update_all()  # Update all columns when matched
    .when_not_matched_insert_all()  # Insert when not matched
    .execute()
)

print(f"Merge completed:")
print(f"- Rows updated: {merge_result['num_source_rows']}")
print(f"- Rows inserted: {merge_result['num_target_rows_inserted']}")

Advanced Merge with Conditions

# Complex merge with conditions
merge_result = (
    dt.merge(
        source=source_data,
        predicate="target.id = source.id"
    )
    .when_matched_update(
        updates={
            "name": "source.name",
            "age": "source.age"
        },
        predicate="target.age < source.age"  # Only update if source age is higher
    )
    .when_matched_delete(
        predicate="source.department = 'DELETED'"  # Delete if marked for deletion
    )
    .when_not_matched_insert(
        updates={
            "id": "source.id",
            "name": "source.name", 
            "age": "source.age",
            "department": "source.department"
        },
        predicate="source.age >= 18"  # Only insert adults
    )
    .execute()
)

Converting Existing Tables

# Convert Parquet table to Delta
convert_to_deltalake(
    "path/to/parquet-table",
    partition_by=["year", "month"],
    name="My Converted Table",
    description="Converted from Parquet format"
)

# Convert with configuration
convert_to_deltalake(
    "s3://bucket/parquet-data",
    storage_options={
        "AWS_REGION": "us-west-2",
        "AWS_ACCESS_KEY_ID": "key",
        "AWS_SECRET_ACCESS_KEY": "secret"
    },
    target_file_size=128 * 1024 * 1024  # 128MB target file size
)

Write Configuration

from deltalake.writer import WriterProperties, Compression

# Configure writer properties
writer_props = WriterProperties(
    compression=Compression.SNAPPY,
    max_row_group_size=10000,
    write_batch_size=1000
)

# Write with custom properties
write_deltalake(
    "path/to/table",
    data,
    writer_properties=writer_props,
    target_file_size=64 * 1024 * 1024,  # 64MB files
    configuration={
        "delta.autoOptimize.optimizeWrite": "true",
        "delta.autoOptimize.autoCompact": "true"
    }
)

Writer Configuration Classes

BloomFilterProperties

@dataclass
class BloomFilterProperties:
    def __init__(
        self,
        set_bloom_filter_enabled: bool | None,
        fpp: float | None = None,
        ndv: int | None = None,
    ) -> None: ...
    
    set_bloom_filter_enabled: bool | None
    fpp: float | None
    ndv: int | None

Configure bloom filter properties for parquet writer optimization. Bloom filters help skip data files during reads by providing probabilistic membership testing.

Parameters:

  • set_bloom_filter_enabled: Enable bloom filter with default values if no fpp/ndv provided
  • fpp: False positive probability (between 0 and 1 exclusive)
  • ndv: Number of distinct values for the bloom filter

ColumnProperties

@dataclass
class ColumnProperties:
    def __init__(
        self,
        dictionary_enabled: bool | None = None,
        statistics_enabled: Literal["NONE", "CHUNK", "PAGE"] | None = None,
        bloom_filter_properties: BloomFilterProperties | None = None,
    ) -> None: ...
    
    dictionary_enabled: bool | None
    statistics_enabled: Literal["NONE", "CHUNK", "PAGE"] | None
    bloom_filter_properties: BloomFilterProperties | None

Configure per-column properties for parquet writing, including dictionary encoding, statistics collection levels, and bloom filter settings.

Parameters:

  • dictionary_enabled: Enable dictionary encoding for the column
  • statistics_enabled: Statistics level for the column
  • bloom_filter_properties: Bloom filter configuration for the column

Install with Tessl CLI

npx tessl i tessl/pypi-deltalake

docs

data-reading.md

index.md

query-operations.md

schema-management.md

table-maintenance.md

table-operations.md

transaction-management.md

writing-modification.md

tile.json