Native Delta Lake Python binding based on delta-rs with Pandas integration
—
Functions for writing data to Delta tables and modifying existing records through update, delete, and merge operations. Provides ACID transaction guarantees and supports various data sources.
def write_deltalake(
table_or_uri: str | Path | DeltaTable,
data: ArrowStreamExportable | ArrowArrayExportable | Sequence[ArrowArrayExportable],
*,
partition_by: list[str] | str | None = None,
mode: Literal["error", "append", "overwrite", "ignore"] = "error",
name: str | None = None,
description: str | None = None,
configuration: Mapping[str, str | None] | None = None,
schema_mode: Literal["merge", "overwrite"] | None = None,
storage_options: dict[str, str] | None = None,
target_file_size: int | None = None,
writer_properties: WriterProperties | None = None,
post_commithook_properties: PostCommitHookProperties | None = None,
commit_properties: CommitProperties | None = None,
) -> None: ...Write data to a Delta table with comprehensive configuration options.
Parameters:
table_or_uri: Target table path or existing DeltaTable instancedata: Data to write (pandas DataFrame, PyArrow Table, RecordBatch, etc.)partition_by: Column names for partitioningmode: Write mode behaviorschema_mode: How to handle schema differencesstorage_options: Backend-specific configurationwriter_properties: Parquet writer configurationcommit_properties: Transaction commit settingspost_commithook_properties: Post-commit operationsdef convert_to_deltalake(
uri: str | Path,
mode: Literal["error", "ignore"] = "error",
partition_by: Schema | None = None,
partition_strategy: Literal["hive"] | None = None,
name: str | None = None,
description: str | None = None,
configuration: Mapping[str, str | None] | None = None,
storage_options: dict[str, str] | None = None,
commit_properties: CommitProperties | None = None,
post_commithook_properties: PostCommitHookProperties | None = None,
) -> None: ...Convert an existing Parquet table to Delta Lake format.
def update(
self,
updates: dict[str, str] | None = None,
new_values: dict[str, int | float | str | datetime | bool | list[Any]] | None = None,
predicate: str | None = None,
writer_properties: WriterProperties | None = None,
error_on_type_mismatch: bool = True,
post_commithook_properties: PostCommitHookProperties | None = None,
commit_properties: CommitProperties | None = None,
) -> dict[str, Any]: ...Update records in the Delta table matching an optional predicate.
Parameters:
updates: Column name to SQL expression mappingnew_values: Column name to Python value mappingpredicate: SQL WHERE clause for filtering rows to updatewriter_properties: Parquet writer configurationerror_on_type_mismatch: Raise error on type conflictsdef delete(
self,
predicate: str | None = None,
writer_properties: WriterProperties | None = None,
post_commithook_properties: PostCommitHookProperties | None = None,
commit_properties: CommitProperties | None = None,
) -> dict[str, Any]: ...Delete records matching the specified predicate.
def merge(
self,
source: pyarrow.Table | pyarrow.RecordBatch | pyarrow.dataset.Dataset,
predicate: str,
source_alias: str | None = None,
target_alias: str | None = None,
writer_properties: WriterProperties | None = None,
post_commithook_properties: PostCommitHookProperties | None = None,
commit_properties: CommitProperties | None = None,
) -> TableMerger: ...Start a merge operation (UPSERT) with the specified source data.
class TableMerger:
def when_matched_update(
self,
updates: dict[str, str] | None = None,
predicate: str | None = None,
) -> TableMerger: ...
def when_matched_update_all(self, predicate: str | None = None) -> TableMerger: ...
def when_matched_delete(self, predicate: str | None = None) -> TableMerger: ...
def when_not_matched_insert(
self,
updates: dict[str, str] | None = None,
predicate: str | None = None,
) -> TableMerger: ...
def when_not_matched_insert_all(self, predicate: str | None = None) -> TableMerger: ...
def execute(self) -> dict[str, Any]: ...Builder pattern for constructing complex merge operations.
from deltalake import write_deltalake, DeltaTable
import pandas as pd
# Create sample data
data = pd.DataFrame({
'id': [1, 2, 3, 4, 5],
'name': ['Alice', 'Bob', 'Charlie', 'David', 'Eve'],
'age': [25, 30, 35, 28, 32],
'department': ['Engineering', 'Sales', 'Engineering', 'Sales', 'Marketing']
})
# Write to new table
write_deltalake("path/to/new-table", data, mode="error")
# Append more data
new_data = pd.DataFrame({
'id': [6, 7],
'name': ['Frank', 'Grace'],
'age': [29, 27],
'department': ['Engineering', 'Marketing']
})
write_deltalake("path/to/new-table", new_data, mode="append")
# Overwrite entire table
write_deltalake("path/to/new-table", data, mode="overwrite")# Write with partitioning
write_deltalake(
"path/to/partitioned-table",
data,
partition_by=["department"],
mode="error"
)
# Write with multiple partition columns
sales_data = pd.DataFrame({
'id': range(100),
'sale_date': pd.date_range('2023-01-01', periods=100),
'amount': np.random.randint(100, 1000, 100),
'region': np.random.choice(['US', 'EU', 'ASIA'], 100)
})
write_deltalake(
"path/to/sales-table",
sales_data,
partition_by=["region", "sale_date"],
mode="error"
)dt = DeltaTable("path/to/table")
# Update with new values
result = dt.update(
predicate="age < 30",
new_values={"department": "Junior Engineering"}
)
print(f"Updated {result['num_updated_rows']} rows")
# Update with SQL expressions
dt.update(
predicate="department = 'Sales'",
updates={"age": "age + 1"} # Increment age by 1
)
# Conditional updates
dt.update(
predicate="name = 'Alice'",
new_values={
"age": 26,
"department": "Senior Engineering"
}
)# Delete specific records
result = dt.delete(predicate="age > 35")
print(f"Deleted {result['num_deleted_rows']} rows")
# Delete all records from a department
dt.delete(predicate="department = 'Marketing'")
# Delete without predicate (truncate)
dt.delete() # Deletes all rowsimport pyarrow as pa
# Source data for merge
source_data = pa.table({
'id': [2, 3, 6],
'name': ['Bob Updated', 'Charlie Updated', 'New Person'],
'age': [31, 36, 40],
'department': ['Sales', 'Engineering', 'HR']
})
# Perform merge operation
merge_result = (
dt.merge(
source=source_data,
predicate="target.id = source.id",
source_alias="source",
target_alias="target"
)
.when_matched_update_all() # Update all columns when matched
.when_not_matched_insert_all() # Insert when not matched
.execute()
)
print(f"Merge completed:")
print(f"- Rows updated: {merge_result['num_source_rows']}")
print(f"- Rows inserted: {merge_result['num_target_rows_inserted']}")# Complex merge with conditions
merge_result = (
dt.merge(
source=source_data,
predicate="target.id = source.id"
)
.when_matched_update(
updates={
"name": "source.name",
"age": "source.age"
},
predicate="target.age < source.age" # Only update if source age is higher
)
.when_matched_delete(
predicate="source.department = 'DELETED'" # Delete if marked for deletion
)
.when_not_matched_insert(
updates={
"id": "source.id",
"name": "source.name",
"age": "source.age",
"department": "source.department"
},
predicate="source.age >= 18" # Only insert adults
)
.execute()
)# Convert Parquet table to Delta
convert_to_deltalake(
"path/to/parquet-table",
partition_by=["year", "month"],
name="My Converted Table",
description="Converted from Parquet format"
)
# Convert with configuration
convert_to_deltalake(
"s3://bucket/parquet-data",
storage_options={
"AWS_REGION": "us-west-2",
"AWS_ACCESS_KEY_ID": "key",
"AWS_SECRET_ACCESS_KEY": "secret"
},
target_file_size=128 * 1024 * 1024 # 128MB target file size
)from deltalake.writer import WriterProperties, Compression
# Configure writer properties
writer_props = WriterProperties(
compression=Compression.SNAPPY,
max_row_group_size=10000,
write_batch_size=1000
)
# Write with custom properties
write_deltalake(
"path/to/table",
data,
writer_properties=writer_props,
target_file_size=64 * 1024 * 1024, # 64MB files
configuration={
"delta.autoOptimize.optimizeWrite": "true",
"delta.autoOptimize.autoCompact": "true"
}
)@dataclass
class BloomFilterProperties:
def __init__(
self,
set_bloom_filter_enabled: bool | None,
fpp: float | None = None,
ndv: int | None = None,
) -> None: ...
set_bloom_filter_enabled: bool | None
fpp: float | None
ndv: int | NoneConfigure bloom filter properties for parquet writer optimization. Bloom filters help skip data files during reads by providing probabilistic membership testing.
Parameters:
set_bloom_filter_enabled: Enable bloom filter with default values if no fpp/ndv providedfpp: False positive probability (between 0 and 1 exclusive)ndv: Number of distinct values for the bloom filter@dataclass
class ColumnProperties:
def __init__(
self,
dictionary_enabled: bool | None = None,
statistics_enabled: Literal["NONE", "CHUNK", "PAGE"] | None = None,
bloom_filter_properties: BloomFilterProperties | None = None,
) -> None: ...
dictionary_enabled: bool | None
statistics_enabled: Literal["NONE", "CHUNK", "PAGE"] | None
bloom_filter_properties: BloomFilterProperties | NoneConfigure per-column properties for parquet writing, including dictionary encoding, statistics collection levels, and bloom filter settings.
Parameters:
dictionary_enabled: Enable dictionary encoding for the columnstatistics_enabled: Statistics level for the columnbloom_filter_properties: Bloom filter configuration for the columnInstall with Tessl CLI
npx tessl i tessl/pypi-deltalake