Native Delta Lake Python binding based on delta-rs with Pandas integration
—
Transaction properties, commit configurations, and ACID transaction control for ensuring data consistency and managing concurrent access to Delta Lake tables.
@dataclass
class CommitProperties:
def __init__(
self,
max_retry_commit_attempts: int | None = None,
app_metadata: dict[str, Any] | None = None,
app_id: str | None = None
) -> None: ...
@property
def max_retry_commit_attempts(self) -> int | None: ...
@property
def app_metadata(self) -> dict[str, Any] | None: ...
@property
def app_id(self) -> str | None: ...Configuration for commit behavior and retry logic.
@dataclass
class PostCommitHookProperties:
def __init__(
self,
create_checkpoint: bool = True,
cleanup_expired_logs: bool | None = None,
) -> None: ...
@property
def create_checkpoint(self) -> bool: ...
@property
def cleanup_expired_logs(self) -> bool | None: ...Configuration for operations that run after successful commits.
class Transaction:
def commit(
self,
actions: list[AddAction],
commit_properties: CommitProperties | None = None,
post_commit_hook_properties: PostCommitHookProperties | None = None
) -> int: ...Low-level transaction interface for advanced operations.
@dataclass
class AddAction:
path: str
size: int
partition_values: Mapping[str, str | None]
modification_time: int
data_change: bool
stats: strRepresents a file addition in the transaction log.
def create_table_with_add_actions(
table_uri: str,
schema: Schema,
add_actions: list[AddAction],
partition_by: list[str] | None = None,
name: str | None = None,
description: str | None = None,
configuration: dict[str, str | None] | None = None,
storage_options: dict[str, str] | None = None
) -> DeltaTable: ...Create a table directly from add actions (advanced use case).
from deltalake import DeltaTable, write_deltalake
from deltalake.transaction import CommitProperties, PostCommitHookProperties
# Configure commit properties
commit_props = CommitProperties(
max_retry_commit_attempts=5,
app_metadata={"application": "data_pipeline", "version": "1.0.0"},
app_id="my_application"
)
# Configure post-commit hooks
post_commit_props = PostCommitHookProperties(
create_checkpoint=True, # Create checkpoints automatically
cleanup_expired_logs=True # Clean up old log files
)
# Use in write operation
write_deltalake(
"path/to/table",
data,
mode="append",
commit_properties=commit_props,
post_commithook_properties=post_commit_props
)dt = DeltaTable("path/to/table")
# Update with transaction properties
result = dt.update(
predicate="status = 'pending'",
new_values={"status": "processed", "processed_at": "current_timestamp()"},
commit_properties=CommitProperties(
max_retry_commit_attempts=3,
app_metadata={"operation": "batch_update", "batch_id": "12345"}
),
post_commithook_properties=PostCommitHookProperties(
create_checkpoint=False, # Skip checkpoint for this update
cleanup_expired_logs=False
)
)
print(f"Updated {result['num_updated_rows']} rows")import pyarrow as pa
source_data = pa.table({
'id': [1, 2, 3],
'status': ['active', 'inactive', 'pending'],
'last_modified': ['2023-01-01', '2023-01-02', '2023-01-03']
})
# Merge with custom transaction properties
merge_result = (
dt.merge(
source=source_data,
predicate="target.id = source.id",
commit_properties=CommitProperties(
max_retry_commit_attempts=10, # High retry for critical operation
app_metadata={
"operation": "daily_sync",
"source": "external_system",
"sync_timestamp": "2023-01-15T10:00:00Z"
}
),
post_commithook_properties=PostCommitHookProperties(
create_checkpoint=True,
cleanup_expired_logs=True
)
)
.when_matched_update_all()
.when_not_matched_insert_all()
.execute()
)# Delete with tracking metadata
delete_result = dt.delete(
predicate="created_at < '2022-01-01'",
commit_properties=CommitProperties(
app_metadata={
"operation": "data_retention_cleanup",
"retention_policy": "delete_older_than_1_year",
"executed_by": "automated_cleanup_job"
}
),
post_commithook_properties=PostCommitHookProperties(
create_checkpoint=True, # Create checkpoint after cleanup
cleanup_expired_logs=True # Clean logs after delete
)
)
print(f"Deleted {delete_result['num_deleted_rows']} old records")from deltalake.transaction import Transaction, AddAction
# Low-level transaction for advanced scenarios
def custom_transaction_example():
dt = DeltaTable("path/to/table")
# Create add actions manually (advanced use case)
add_actions = [
AddAction(
path="data/part-00001.parquet",
size=1024000,
partition_values={"year": "2023", "month": "01"},
modification_time=1672531200000, # Unix timestamp in milliseconds
data_change=True,
stats='{"numRecords": 1000}'
)
]
# Get transaction and commit
transaction = dt._table.create_transaction() # Internal API
commit_props = CommitProperties(
max_retry_commit_attempts=3,
app_metadata={"custom_operation": "manual_file_addition"}
)
# Commit the transaction
version = transaction.commit(
add_actions,
commit_properties=commit_props
)
print(f"Committed transaction at version {version}")import time
from deltalake.exceptions import CommitFailedError
def robust_update_with_retry():
dt = DeltaTable("path/to/table")
max_retries = 5
base_delay = 1.0
for attempt in range(max_retries):
try:
result = dt.update(
predicate="status = 'processing'",
new_values={"status": "completed"},
commit_properties=CommitProperties(
max_retry_commit_attempts=1, # Let us handle retries
app_metadata={
"attempt": attempt + 1,
"operation": "status_update"
}
)
)
print(f"Update succeeded on attempt {attempt + 1}")
return result
except CommitFailedError as e:
if attempt == max_retries - 1:
print(f"Update failed after {max_retries} attempts")
raise
# Exponential backoff
delay = base_delay * (2 ** attempt)
print(f"Attempt {attempt + 1} failed, retrying in {delay}s...")
time.sleep(delay)
# Reload table to get latest state
dt = DeltaTable(dt.table_uri)
# Use the robust update function
try:
result = robust_update_with_retry()
print(f"Successfully updated {result['num_updated_rows']} rows")
except Exception as e:
print(f"All retry attempts failed: {e}")def analyze_transaction_history():
dt = DeltaTable("path/to/table")
# Get commit history
history = dt.history(limit=10)
print("Recent transaction history:")
for commit in history:
version = commit.get("version")
operation = commit.get("operation", "unknown")
timestamp = commit.get("timestamp")
# Extract app metadata if present
operation_parameters = commit.get("operationParameters", {})
app_metadata = operation_parameters.get("app_metadata")
print(f"Version {version}: {operation} at {timestamp}")
if app_metadata:
print(f" App metadata: {app_metadata}")
# Show operation metrics
if "operationMetrics" in commit:
metrics = commit["operationMetrics"]
for key, value in metrics.items():
print(f" {key}: {value}")
print()
# Run analysis
analyze_transaction_history()# Manual checkpoint creation
dt.create_checkpoint()
print("Checkpoint created successfully")
# Cleanup metadata (removes old log files)
dt.cleanup_metadata()
print("Old metadata cleaned up")
# Configure automatic cleanup
post_commit_props = PostCommitHookProperties(
create_checkpoint=True,
cleanup_expired_logs=True
)
# All subsequent operations will use these settings
write_deltalake(
dt,
new_data,
mode="append",
post_commithook_properties=post_commit_props
)Install with Tessl CLI
npx tessl i tessl/pypi-deltalake