CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-delta-spark

Python APIs for using Delta Lake with Apache Spark

Pending
Overview
Eval results
Files

optimization.mddocs/

Optimization and Maintenance

Performance optimization and table maintenance operations for Delta Lake including file compaction, Z-ordering, vacuum operations, and data layout optimization. Provides fine-grained control over storage efficiency and query performance.

Capabilities

Optimize Operations

Improve query performance through data layout optimization.

class DeltaTable:
    def optimize(self) -> DeltaOptimizeBuilder:
        """
        Create optimize builder for data layout optimization.
        
        Returns:
        DeltaOptimizeBuilder for configuring optimization operations
        """
class DeltaTable {
  def optimize(): DeltaOptimizeBuilder
}

Optimize Configuration

Configure optimization scope and execution.

class DeltaOptimizeBuilder:
    def where(self, partition_filter: str) -> DeltaOptimizeBuilder:
        """
        Apply partition filter to limit optimization scope.
        
        Parameters:
        - partition_filter: SQL condition to filter partitions for optimization
        
        Returns:
        DeltaOptimizeBuilder for method chaining
        """
    
    def executeCompaction(self) -> DataFrame:
        """
        Execute file compaction to reduce small file overhead.
        
        Returns:
        DataFrame with optimization metrics including files compacted, size changes
        """
    
    def executeZOrderBy(self, *cols: str) -> DataFrame:
        """
        Execute Z-order optimization for improved query performance.
        
        Parameters:
        - cols: Column names to use for Z-order clustering
        
        Returns:
        DataFrame with Z-order optimization metrics
        """
class DeltaOptimizeBuilder {
  def where(partitionFilter: String): DeltaOptimizeBuilder
  def executeCompaction(): DataFrame
  def executeZOrderBy(cols: String*): DataFrame
}

Vacuum Operations

Clean up unused files and optimize storage usage.

class DeltaTable:
    def vacuum(self, retention_hours: Optional[float] = None) -> DataFrame:
        """
        Remove files no longer referenced by the table.
        
        Parameters:
        - retention_hours: Hours to retain files (default: 168 hours / 7 days)
        
        Returns:
        DataFrame with vacuum operation results
        """
class DeltaTable {
  def vacuum(): DataFrame
  def vacuum(retentionHours: Double): DataFrame
}

Manifest Generation

Generate metadata manifests for external system integration.

class DeltaTable:
    def generate(self, mode: str) -> None:
        """
        Generate manifests for external system compatibility.
        
        Parameters:
        - mode: Manifest type ("symlink_format_manifest" for Presto/Athena)
        """
class DeltaTable {
  def generate(mode: String): Unit
}

Usage Examples

File Compaction

# Compact all partitions
optimize_result = delta_table.optimize().executeCompaction()
optimize_result.show()

# Compact specific partitions
optimize_result = (delta_table.optimize()
    .where("date >= '2023-01-01' AND region = 'us-west'")
    .executeCompaction())

# View optimization metrics
optimize_result.select(
    "path",
    "metrics.numFilesAdded",
    "metrics.numFilesRemoved", 
    "metrics.filesAdded.size",
    "metrics.filesRemoved.size"
).show()

Z-Order Optimization

# Z-order by commonly filtered columns
zorder_result = (delta_table.optimize()
    .executeZOrderBy("customer_id", "transaction_date"))

# Z-order specific partitions
zorder_result = (delta_table.optimize()
    .where("year = 2023")
    .executeZOrderBy("customer_id", "product_category"))

zorder_result.show()

Vacuum Operations

# Vacuum with default retention (7 days)
vacuum_result = delta_table.vacuum()
vacuum_result.show()

# Vacuum with custom retention period
vacuum_result = delta_table.vacuum(retention_hours=24)  # 1 day retention

# Dry run to see what would be deleted (use SQL)
spark.sql("VACUUM delta.`/path/to/table` RETAIN 168 HOURS DRY RUN").show()

Manifest Generation

# Generate symlink manifest for Presto/Athena
delta_table.generate("symlink_format_manifest")

# Check generated manifest files
manifest_path = "/path/to/table/_symlink_format_manifest"
dbutils.fs.ls(manifest_path)  # In Databricks

Comprehensive Maintenance Workflow

from pyspark.sql.functions import col, current_timestamp

# 1. Analyze table statistics before optimization
pre_stats = spark.sql(f"DESCRIBE DETAIL delta.`{table_path}`")
file_count_before = pre_stats.select("numFiles").collect()[0][0]

print(f"Files before optimization: {file_count_before}")

# 2. Optimize frequently queried partitions
recent_partitions = (delta_table.optimize()
    .where("date >= current_date() - interval 30 days")
    .executeZOrderBy("customer_id", "product_id"))

# 3. Compact older partitions  
older_partitions = (delta_table.optimize()
    .where("date < current_date() - interval 30 days")
    .executeCompaction())

# 4. Update table statistics
spark.sql(f"ANALYZE TABLE delta.`{table_path}` COMPUTE STATISTICS")

# 5. Clean up old files
vacuum_result = delta_table.vacuum(retention_hours=168)  # 7 days

# 6. Generate manifests for external access
delta_table.generate("symlink_format_manifest")

# 7. Check final statistics
post_stats = spark.sql(f"DESCRIBE DETAIL delta.`{table_path}`")
file_count_after = post_stats.select("numFiles").collect()[0][0]

print(f"Files after optimization: {file_count_after}")
print(f"File reduction: {file_count_before - file_count_after}")

Performance Monitoring

# Monitor optimize operations over time
optimize_history = (delta_table.history()
    .filter(col("operation") == "OPTIMIZE")
    .select(
        "timestamp",
        "operationParameters.predicate",
        col("operationMetrics.numFilesAdded").alias("files_added"),
        col("operationMetrics.numFilesRemoved").alias("files_removed"),
        col("operationMetrics.totalFilesSize").alias("total_size"),
        col("operationMetrics.numBatches").alias("batches")
    ))

optimize_history.show(truncate=False)

# Check vacuum history
vacuum_history = (delta_table.history()
    .filter(col("operation") == "VACUUM")
    .select(
        "timestamp",
        col("operationMetrics.numDeletedFiles").alias("deleted_files"),
        col("operationMetrics.sizeOfDeletedFiles").alias("deleted_size")
    ))

vacuum_history.show()

Optimization Best Practices

File Compaction

  • Run regularly to prevent small file accumulation
  • Target 128MB-1GB file sizes for optimal performance
  • Focus on frequently accessed partitions
  • Monitor file count vs query performance

Z-Order Optimization

  • Choose columns commonly used in WHERE clauses
  • Limit to 3-4 columns for best effectiveness
  • Reorder periodically as data patterns change
  • Consider column cardinality and query patterns

Vacuum Operations

  • Balance retention with storage costs
  • Coordinate with time travel requirements
  • Run during low-activity periods
  • Monitor storage reclaimed vs files deleted

Partition Strategy

  • Partition by commonly filtered columns
  • Avoid over-partitioning (aim for 1GB+ per partition)
  • Consider date-based partitioning for time-series data
  • Use OPTIMIZE with partition filters for efficiency

Optimization Metrics

Common metrics returned by optimization operations:

  • numFilesAdded / numFilesRemoved: File count changes
  • filesAdded.size / filesRemoved.size: Size changes in bytes
  • numBatches: Number of optimization batches executed
  • totalFilesSize: Total size after optimization
  • zOrderStats: Z-order clustering effectiveness metrics

Install with Tessl CLI

npx tessl i tessl/pypi-delta-spark

docs

configuration.md

index.md

merge-operations.md

optimization.md

table-management.md

table-operations.md

time-travel.md

tile.json