CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-deltalake

Native Delta Lake Python binding based on delta-rs with Pandas integration

Pending
Overview
Eval results
Files

table-maintenance.mddocs/

Table Maintenance

Operations for table optimization, vacuum cleanup, and checkpoint management to maintain table performance and storage efficiency over time.

Imports

from deltalake import DeltaTable, WriterProperties, PostCommitHookProperties, CommitProperties
from datetime import timedelta
from typing import Iterable

Capabilities

Vacuum Operations

def vacuum(
    self,
    retention_hours: int | None = None,
    dry_run: bool = True,
    enforce_retention_duration: bool = True,
    post_commithook_properties: PostCommitHookProperties | None = None,
    commit_properties: CommitProperties | None = None,
    full: bool = False,
    keep_versions: list[int] | None = None,
) -> list[str]: ...

Clean up files no longer referenced by the Delta table and older than the retention threshold.

Parameters:

  • retention_hours: Retention threshold (uses table config if None)
  • dry_run: List files without deleting when True
  • enforce_retention_duration: Enforce minimum retention when True
  • full: Remove all unreferenced files when True
  • keep_versions: Specific versions to preserve

Checkpoint Management

def create_checkpoint(self) -> None: ...

def cleanup_metadata(self) -> None: ...

Create checkpoints and clean up transaction log metadata.

Table Optimization

def optimize(self) -> TableOptimizer: ...

class TableOptimizer:
    def compact(
        self,
        partition_filters: list[tuple[str, str, Any]] | None = None,
        target_size: int | None = None,
        max_concurrent_tasks: int | None = None,
        min_commit_interval: int | timedelta | None = None,
        writer_properties: WriterProperties | None = None,
        post_commithook_properties: PostCommitHookProperties | None = None,
        commit_properties: CommitProperties | None = None,
    ) -> dict[str, Any]: ...
    
    def z_order(
        self,
        columns: Iterable[str],
        partition_filters: list[tuple[str, str, Any]] | None = None,
        target_size: int | None = None,
        max_concurrent_tasks: int | None = None,
        max_spill_size: int = 20 * 1024 * 1024 * 1024,
        min_commit_interval: int | timedelta | None = None,
        writer_properties: WriterProperties | None = None,
        post_commithook_properties: PostCommitHookProperties | None = None,
        commit_properties: CommitProperties | None = None,
    ) -> dict[str, Any]: ...

File compaction and Z-ordering for query performance optimization.

Repair Operations

def repair_table(
    self,
    dry_run: bool = True
) -> dict[str, Any]: ...

Repair table by checking for inconsistencies and fixing them.

Usage Examples

Basic Vacuum Operations

from deltalake import DeltaTable

dt = DeltaTable("path/to/table")

# Dry run to see what would be deleted
files_to_delete = dt.vacuum(dry_run=True)
print(f"Would delete {len(files_to_delete)} files:")
for file in files_to_delete[:5]:  # Show first 5
    print(f"  {file}")

# Actually delete old files (7 days retention)
deleted_files = dt.vacuum(
    retention_hours=7 * 24,  # 7 days
    dry_run=False
)
print(f"Deleted {len(deleted_files)} files")

# Vacuum with custom retention (1 day, requires disabling enforcement)
deleted_files = dt.vacuum(
    retention_hours=24,  # 1 day
    dry_run=False,
    enforce_retention_duration=False  # Allow shorter retention
)

Advanced Vacuum Configuration

from deltalake.transaction import CommitProperties, PostCommitHookProperties

# Vacuum with transaction properties
commit_props = CommitProperties(
    app_metadata={
        "operation": "scheduled_vacuum",
        "retention_policy": "30_days"
    }
)

post_commit_props = PostCommitHookProperties(
    create_checkpoint=True,
    cleanup_expired_logs=True
)

deleted_files = dt.vacuum(
    retention_hours=30 * 24,  # 30 days
    dry_run=False,
    commit_properties=commit_props,
    post_commithook_properties=post_commit_props
)

print(f"Vacuum completed, deleted {len(deleted_files)} files")

Full Vacuum and Version Preservation

# Full vacuum removes ALL unreferenced files
deleted_files = dt.vacuum(
    dry_run=False,
    full=True  # Remove all unreferenced files regardless of time
)

# Vacuum while preserving specific versions
important_versions = [10, 15, 20]  # Versions to keep
deleted_files = dt.vacuum(
    retention_hours=7 * 24,
    dry_run=False,
    keep_versions=important_versions
)

print(f"Deleted {len(deleted_files)} files while preserving versions {important_versions}")

Checkpoint Management

# Create checkpoint manually
dt.create_checkpoint()
print("Checkpoint created")

# Clean up old metadata files
dt.cleanup_metadata()
print("Metadata cleaned up")

# Check current version and checkpoint status
current_version = dt.version()
print(f"Current version: {current_version}")

# Get history to see checkpoint information
history = dt.history(limit=5)
for commit in history:
    version = commit.get("version")
    operation = commit.get("operation")
    print(f"Version {version}: {operation}")

Table Optimization

# Basic file compaction
optimizer = dt.optimize()
compact_result = optimizer.compact()

print("Compaction results:")
print(f"  Files added: {compact_result.get('num_files_added', 0)}")
print(f"  Files removed: {compact_result.get('num_files_removed', 0)}")
print(f"  Partitions optimized: {compact_result.get('partitions_optimized', 0)}")

# Compact specific partitions
compact_result = optimizer.compact(
    partition_filters=[("year", "=", "2023"), ("month", "=", "01")],
    target_size=128 * 1024 * 1024  # 128MB target file size
)

# Z-order optimization for better query performance
zorder_result = optimizer.z_order(
    columns=["customer_id", "order_date"],  # Columns to optimize for
    target_size=256 * 1024 * 1024  # 256MB target files
)

print("\nZ-order optimization results:")
print(f"  Files added: {zorder_result.get('num_files_added', 0)}")
print(f"  Files removed: {zorder_result.get('num_files_removed', 0)}")

Partition-Specific Optimization

# Optimize only recent partitions
recent_partitions = [
    ("year", "=", "2023"),
    ("month", ">=", "10")
]

compact_result = optimizer.compact(
    partition_filters=recent_partitions,
    target_size=64 * 1024 * 1024,  # 64MB files
    max_concurrent_tasks=4  # Parallel optimization
)

# Z-order specific partitions with different columns
high_traffic_partitions = [("region", "=", "US")]

zorder_result = optimizer.z_order(
    columns=["user_id", "timestamp", "event_type"],
    partition_filters=high_traffic_partitions,
    max_concurrent_tasks=8
)

Comprehensive Maintenance Routine

def comprehensive_maintenance(table_path: str, retention_days: int = 7):
    """Perform comprehensive table maintenance"""
    dt = DeltaTable(table_path)
    
    print(f"Starting maintenance for {table_path}")
    print(f"Current version: {dt.version()}")
    
    # 1. Create checkpoint
    print("\n1. Creating checkpoint...")
    dt.create_checkpoint()
    
    # 2. Optimize table
    print("\n2. Optimizing table...")
    optimizer = dt.optimize()
    
    # Compact small files
    compact_result = optimizer.compact(target_size=128 * 1024 * 1024)
    print(f"   Compaction: {compact_result.get('num_files_removed', 0)} files removed, "
          f"{compact_result.get('num_files_added', 0)} files added")
    
    # Z-order on common query columns (example)
    try:
        zorder_result = optimizer.z_order(columns=["id", "created_date"])
        print(f"   Z-order: {zorder_result.get('num_files_removed', 0)} files removed, "
              f"{zorder_result.get('num_files_added', 0)} files added")
    except Exception as e:
        print(f"   Z-order failed (columns may not exist): {e}")
    
    # 3. Vacuum old files
    print(f"\n3. Vacuuming files older than {retention_days} days...")
    retention_hours = retention_days * 24
    
    # Dry run first
    files_to_delete = dt.vacuum(retention_hours=retention_hours, dry_run=True)
    print(f"   Would delete {len(files_to_delete)} files")
    
    # Actually delete
    if files_to_delete:
        deleted_files = dt.vacuum(retention_hours=retention_hours, dry_run=False)
        print(f"   Deleted {len(deleted_files)} files")
    
    # 4. Clean up metadata
    print("\n4. Cleaning up metadata...")
    dt.cleanup_metadata()
    
    # 5. Final status
    print(f"\nMaintenance completed. Final version: {dt.version()}")
    
    return {
        "compaction": compact_result,
        "vacuum_deleted": len(deleted_files) if files_to_delete else 0
    }

# Run comprehensive maintenance
maintenance_result = comprehensive_maintenance("path/to/table", retention_days=14)

Scheduled Maintenance

import schedule
import time
from datetime import datetime

def scheduled_maintenance():
    """Maintenance job for scheduled execution"""
    tables_to_maintain = [
        "path/to/table1",
        "path/to/table2", 
        "path/to/table3"
    ]
    
    for table_path in tables_to_maintain:
        try:
            print(f"\n{datetime.now()}: Maintaining {table_path}")
            result = comprehensive_maintenance(table_path, retention_days=30)
            print(f"Maintenance completed for {table_path}")
            
        except Exception as e:
            print(f"Maintenance failed for {table_path}: {e}")

# Schedule maintenance jobs
schedule.every().sunday.at("02:00").do(scheduled_maintenance)  # Weekly
schedule.every().day.at("01:00").do(lambda: vacuum_only_maintenance())  # Daily vacuum

def vacuum_only_maintenance():
    """Daily vacuum-only maintenance"""
    tables = ["path/to/high_traffic_table1", "path/to/high_traffic_table2"]
    
    for table_path in tables:
        try:
            dt = DeltaTable(table_path)
            deleted = dt.vacuum(retention_hours=7*24, dry_run=False)
            print(f"Daily vacuum for {table_path}: deleted {len(deleted)} files")
        except Exception as e:
            print(f"Daily vacuum failed for {table_path}: {e}")

# Run scheduler (in production, this would be in a separate service)
# while True:
#     schedule.run_pending()
#     time.sleep(3600)  # Check every hour

Table Repair and Health Checks

def table_health_check(table_path: str):
    """Comprehensive table health check"""
    dt = DeltaTable(table_path)
    
    print(f"Health check for {table_path}")
    print(f"Current version: {dt.version()}")
    
    # Check basic table properties
    try:
        schema = dt.schema()
        metadata = dt.metadata()
        files = dt.files()
        
        print(f"Schema fields: {len(schema.fields)}")
        print(f"Table files: {len(files)}")
        print(f"Partition columns: {metadata.partition_columns}")
        
    except Exception as e:
        print(f"Error reading table properties: {e}")
        return False
    
    # Check for repair needs
    try:
        repair_result = dt.repair_table(dry_run=True)
        issues_found = repair_result.get("issues_found", 0)
        
        if issues_found > 0:
            print(f"Found {issues_found} issues that need repair")
            
            # Actually repair if needed
            repair_result = dt.repair_table(dry_run=False)
            print(f"Repair completed: {repair_result}")
        else:
            print("No repair issues found")
            
    except Exception as e:
        print(f"Repair check failed: {e}")
    
    # Check file size distribution
    try:
        add_actions = dt.get_add_actions()
        sizes_df = add_actions.to_pandas()
        
        if not sizes_df.empty:
            avg_size = sizes_df['size'].mean()
            min_size = sizes_df['size'].min()
            max_size = sizes_df['size'].max()
            
            print(f"File sizes - Avg: {avg_size/1024/1024:.1f}MB, "
                  f"Min: {min_size/1024/1024:.1f}MB, Max: {max_size/1024/1024:.1f}MB")
            
            # Recommend optimization if needed
            if max_size / min_size > 100 or avg_size < 10*1024*1024:  # 10MB
                print("Recommendation: Consider running optimize.compact()")
                
    except Exception as e:
        print(f"File analysis failed: {e}")
    
    return True

# Run health checks
tables_to_check = ["path/to/table1", "path/to/table2"]
for table_path in tables_to_check:
    table_health_check(table_path)
    print("-" * 50)

Install with Tessl CLI

npx tessl i tessl/pypi-deltalake

docs

data-reading.md

index.md

query-operations.md

schema-management.md

table-maintenance.md

table-operations.md

transaction-management.md

writing-modification.md

tile.json