Native Delta Lake Python binding based on delta-rs with Pandas integration
—
Operations for table optimization, vacuum cleanup, and checkpoint management to maintain table performance and storage efficiency over time.
from deltalake import DeltaTable, WriterProperties, PostCommitHookProperties, CommitProperties
from datetime import timedelta
from typing import Iterabledef vacuum(
self,
retention_hours: int | None = None,
dry_run: bool = True,
enforce_retention_duration: bool = True,
post_commithook_properties: PostCommitHookProperties | None = None,
commit_properties: CommitProperties | None = None,
full: bool = False,
keep_versions: list[int] | None = None,
) -> list[str]: ...Clean up files no longer referenced by the Delta table and older than the retention threshold.
Parameters:
retention_hours: Retention threshold (uses table config if None)dry_run: List files without deleting when Trueenforce_retention_duration: Enforce minimum retention when Truefull: Remove all unreferenced files when Truekeep_versions: Specific versions to preservedef create_checkpoint(self) -> None: ...
def cleanup_metadata(self) -> None: ...Create checkpoints and clean up transaction log metadata.
def optimize(self) -> TableOptimizer: ...
class TableOptimizer:
def compact(
self,
partition_filters: list[tuple[str, str, Any]] | None = None,
target_size: int | None = None,
max_concurrent_tasks: int | None = None,
min_commit_interval: int | timedelta | None = None,
writer_properties: WriterProperties | None = None,
post_commithook_properties: PostCommitHookProperties | None = None,
commit_properties: CommitProperties | None = None,
) -> dict[str, Any]: ...
def z_order(
self,
columns: Iterable[str],
partition_filters: list[tuple[str, str, Any]] | None = None,
target_size: int | None = None,
max_concurrent_tasks: int | None = None,
max_spill_size: int = 20 * 1024 * 1024 * 1024,
min_commit_interval: int | timedelta | None = None,
writer_properties: WriterProperties | None = None,
post_commithook_properties: PostCommitHookProperties | None = None,
commit_properties: CommitProperties | None = None,
) -> dict[str, Any]: ...File compaction and Z-ordering for query performance optimization.
def repair_table(
self,
dry_run: bool = True
) -> dict[str, Any]: ...Repair table by checking for inconsistencies and fixing them.
from deltalake import DeltaTable
dt = DeltaTable("path/to/table")
# Dry run to see what would be deleted
files_to_delete = dt.vacuum(dry_run=True)
print(f"Would delete {len(files_to_delete)} files:")
for file in files_to_delete[:5]: # Show first 5
print(f" {file}")
# Actually delete old files (7 days retention)
deleted_files = dt.vacuum(
retention_hours=7 * 24, # 7 days
dry_run=False
)
print(f"Deleted {len(deleted_files)} files")
# Vacuum with custom retention (1 day, requires disabling enforcement)
deleted_files = dt.vacuum(
retention_hours=24, # 1 day
dry_run=False,
enforce_retention_duration=False # Allow shorter retention
)from deltalake.transaction import CommitProperties, PostCommitHookProperties
# Vacuum with transaction properties
commit_props = CommitProperties(
app_metadata={
"operation": "scheduled_vacuum",
"retention_policy": "30_days"
}
)
post_commit_props = PostCommitHookProperties(
create_checkpoint=True,
cleanup_expired_logs=True
)
deleted_files = dt.vacuum(
retention_hours=30 * 24, # 30 days
dry_run=False,
commit_properties=commit_props,
post_commithook_properties=post_commit_props
)
print(f"Vacuum completed, deleted {len(deleted_files)} files")# Full vacuum removes ALL unreferenced files
deleted_files = dt.vacuum(
dry_run=False,
full=True # Remove all unreferenced files regardless of time
)
# Vacuum while preserving specific versions
important_versions = [10, 15, 20] # Versions to keep
deleted_files = dt.vacuum(
retention_hours=7 * 24,
dry_run=False,
keep_versions=important_versions
)
print(f"Deleted {len(deleted_files)} files while preserving versions {important_versions}")# Create checkpoint manually
dt.create_checkpoint()
print("Checkpoint created")
# Clean up old metadata files
dt.cleanup_metadata()
print("Metadata cleaned up")
# Check current version and checkpoint status
current_version = dt.version()
print(f"Current version: {current_version}")
# Get history to see checkpoint information
history = dt.history(limit=5)
for commit in history:
version = commit.get("version")
operation = commit.get("operation")
print(f"Version {version}: {operation}")# Basic file compaction
optimizer = dt.optimize()
compact_result = optimizer.compact()
print("Compaction results:")
print(f" Files added: {compact_result.get('num_files_added', 0)}")
print(f" Files removed: {compact_result.get('num_files_removed', 0)}")
print(f" Partitions optimized: {compact_result.get('partitions_optimized', 0)}")
# Compact specific partitions
compact_result = optimizer.compact(
partition_filters=[("year", "=", "2023"), ("month", "=", "01")],
target_size=128 * 1024 * 1024 # 128MB target file size
)
# Z-order optimization for better query performance
zorder_result = optimizer.z_order(
columns=["customer_id", "order_date"], # Columns to optimize for
target_size=256 * 1024 * 1024 # 256MB target files
)
print("\nZ-order optimization results:")
print(f" Files added: {zorder_result.get('num_files_added', 0)}")
print(f" Files removed: {zorder_result.get('num_files_removed', 0)}")# Optimize only recent partitions
recent_partitions = [
("year", "=", "2023"),
("month", ">=", "10")
]
compact_result = optimizer.compact(
partition_filters=recent_partitions,
target_size=64 * 1024 * 1024, # 64MB files
max_concurrent_tasks=4 # Parallel optimization
)
# Z-order specific partitions with different columns
high_traffic_partitions = [("region", "=", "US")]
zorder_result = optimizer.z_order(
columns=["user_id", "timestamp", "event_type"],
partition_filters=high_traffic_partitions,
max_concurrent_tasks=8
)def comprehensive_maintenance(table_path: str, retention_days: int = 7):
"""Perform comprehensive table maintenance"""
dt = DeltaTable(table_path)
print(f"Starting maintenance for {table_path}")
print(f"Current version: {dt.version()}")
# 1. Create checkpoint
print("\n1. Creating checkpoint...")
dt.create_checkpoint()
# 2. Optimize table
print("\n2. Optimizing table...")
optimizer = dt.optimize()
# Compact small files
compact_result = optimizer.compact(target_size=128 * 1024 * 1024)
print(f" Compaction: {compact_result.get('num_files_removed', 0)} files removed, "
f"{compact_result.get('num_files_added', 0)} files added")
# Z-order on common query columns (example)
try:
zorder_result = optimizer.z_order(columns=["id", "created_date"])
print(f" Z-order: {zorder_result.get('num_files_removed', 0)} files removed, "
f"{zorder_result.get('num_files_added', 0)} files added")
except Exception as e:
print(f" Z-order failed (columns may not exist): {e}")
# 3. Vacuum old files
print(f"\n3. Vacuuming files older than {retention_days} days...")
retention_hours = retention_days * 24
# Dry run first
files_to_delete = dt.vacuum(retention_hours=retention_hours, dry_run=True)
print(f" Would delete {len(files_to_delete)} files")
# Actually delete
if files_to_delete:
deleted_files = dt.vacuum(retention_hours=retention_hours, dry_run=False)
print(f" Deleted {len(deleted_files)} files")
# 4. Clean up metadata
print("\n4. Cleaning up metadata...")
dt.cleanup_metadata()
# 5. Final status
print(f"\nMaintenance completed. Final version: {dt.version()}")
return {
"compaction": compact_result,
"vacuum_deleted": len(deleted_files) if files_to_delete else 0
}
# Run comprehensive maintenance
maintenance_result = comprehensive_maintenance("path/to/table", retention_days=14)import schedule
import time
from datetime import datetime
def scheduled_maintenance():
"""Maintenance job for scheduled execution"""
tables_to_maintain = [
"path/to/table1",
"path/to/table2",
"path/to/table3"
]
for table_path in tables_to_maintain:
try:
print(f"\n{datetime.now()}: Maintaining {table_path}")
result = comprehensive_maintenance(table_path, retention_days=30)
print(f"Maintenance completed for {table_path}")
except Exception as e:
print(f"Maintenance failed for {table_path}: {e}")
# Schedule maintenance jobs
schedule.every().sunday.at("02:00").do(scheduled_maintenance) # Weekly
schedule.every().day.at("01:00").do(lambda: vacuum_only_maintenance()) # Daily vacuum
def vacuum_only_maintenance():
"""Daily vacuum-only maintenance"""
tables = ["path/to/high_traffic_table1", "path/to/high_traffic_table2"]
for table_path in tables:
try:
dt = DeltaTable(table_path)
deleted = dt.vacuum(retention_hours=7*24, dry_run=False)
print(f"Daily vacuum for {table_path}: deleted {len(deleted)} files")
except Exception as e:
print(f"Daily vacuum failed for {table_path}: {e}")
# Run scheduler (in production, this would be in a separate service)
# while True:
# schedule.run_pending()
# time.sleep(3600) # Check every hourdef table_health_check(table_path: str):
"""Comprehensive table health check"""
dt = DeltaTable(table_path)
print(f"Health check for {table_path}")
print(f"Current version: {dt.version()}")
# Check basic table properties
try:
schema = dt.schema()
metadata = dt.metadata()
files = dt.files()
print(f"Schema fields: {len(schema.fields)}")
print(f"Table files: {len(files)}")
print(f"Partition columns: {metadata.partition_columns}")
except Exception as e:
print(f"Error reading table properties: {e}")
return False
# Check for repair needs
try:
repair_result = dt.repair_table(dry_run=True)
issues_found = repair_result.get("issues_found", 0)
if issues_found > 0:
print(f"Found {issues_found} issues that need repair")
# Actually repair if needed
repair_result = dt.repair_table(dry_run=False)
print(f"Repair completed: {repair_result}")
else:
print("No repair issues found")
except Exception as e:
print(f"Repair check failed: {e}")
# Check file size distribution
try:
add_actions = dt.get_add_actions()
sizes_df = add_actions.to_pandas()
if not sizes_df.empty:
avg_size = sizes_df['size'].mean()
min_size = sizes_df['size'].min()
max_size = sizes_df['size'].max()
print(f"File sizes - Avg: {avg_size/1024/1024:.1f}MB, "
f"Min: {min_size/1024/1024:.1f}MB, Max: {max_size/1024/1024:.1f}MB")
# Recommend optimization if needed
if max_size / min_size > 100 or avg_size < 10*1024*1024: # 10MB
print("Recommendation: Consider running optimize.compact()")
except Exception as e:
print(f"File analysis failed: {e}")
return True
# Run health checks
tables_to_check = ["path/to/table1", "path/to/table2"]
for table_path in tables_to_check:
table_health_check(table_path)
print("-" * 50)Install with Tessl CLI
npx tessl i tessl/pypi-deltalake