Python APIs for using Delta Lake with Apache Spark
—
Performance optimization and table maintenance operations for Delta Lake including file compaction, Z-ordering, vacuum operations, and data layout optimization. Provides fine-grained control over storage efficiency and query performance.
Improve query performance through data layout optimization.
class DeltaTable:
def optimize(self) -> DeltaOptimizeBuilder:
"""
Create optimize builder for data layout optimization.
Returns:
DeltaOptimizeBuilder for configuring optimization operations
"""class DeltaTable {
def optimize(): DeltaOptimizeBuilder
}Configure optimization scope and execution.
class DeltaOptimizeBuilder:
def where(self, partition_filter: str) -> DeltaOptimizeBuilder:
"""
Apply partition filter to limit optimization scope.
Parameters:
- partition_filter: SQL condition to filter partitions for optimization
Returns:
DeltaOptimizeBuilder for method chaining
"""
def executeCompaction(self) -> DataFrame:
"""
Execute file compaction to reduce small file overhead.
Returns:
DataFrame with optimization metrics including files compacted, size changes
"""
def executeZOrderBy(self, *cols: str) -> DataFrame:
"""
Execute Z-order optimization for improved query performance.
Parameters:
- cols: Column names to use for Z-order clustering
Returns:
DataFrame with Z-order optimization metrics
"""class DeltaOptimizeBuilder {
def where(partitionFilter: String): DeltaOptimizeBuilder
def executeCompaction(): DataFrame
def executeZOrderBy(cols: String*): DataFrame
}Clean up unused files and optimize storage usage.
class DeltaTable:
def vacuum(self, retention_hours: Optional[float] = None) -> DataFrame:
"""
Remove files no longer referenced by the table.
Parameters:
- retention_hours: Hours to retain files (default: 168 hours / 7 days)
Returns:
DataFrame with vacuum operation results
"""class DeltaTable {
def vacuum(): DataFrame
def vacuum(retentionHours: Double): DataFrame
}Generate metadata manifests for external system integration.
class DeltaTable:
def generate(self, mode: str) -> None:
"""
Generate manifests for external system compatibility.
Parameters:
- mode: Manifest type ("symlink_format_manifest" for Presto/Athena)
"""class DeltaTable {
def generate(mode: String): Unit
}# Compact all partitions
optimize_result = delta_table.optimize().executeCompaction()
optimize_result.show()
# Compact specific partitions
optimize_result = (delta_table.optimize()
.where("date >= '2023-01-01' AND region = 'us-west'")
.executeCompaction())
# View optimization metrics
optimize_result.select(
"path",
"metrics.numFilesAdded",
"metrics.numFilesRemoved",
"metrics.filesAdded.size",
"metrics.filesRemoved.size"
).show()# Z-order by commonly filtered columns
zorder_result = (delta_table.optimize()
.executeZOrderBy("customer_id", "transaction_date"))
# Z-order specific partitions
zorder_result = (delta_table.optimize()
.where("year = 2023")
.executeZOrderBy("customer_id", "product_category"))
zorder_result.show()# Vacuum with default retention (7 days)
vacuum_result = delta_table.vacuum()
vacuum_result.show()
# Vacuum with custom retention period
vacuum_result = delta_table.vacuum(retention_hours=24) # 1 day retention
# Dry run to see what would be deleted (use SQL)
spark.sql("VACUUM delta.`/path/to/table` RETAIN 168 HOURS DRY RUN").show()# Generate symlink manifest for Presto/Athena
delta_table.generate("symlink_format_manifest")
# Check generated manifest files
manifest_path = "/path/to/table/_symlink_format_manifest"
dbutils.fs.ls(manifest_path) # In Databricksfrom pyspark.sql.functions import col, current_timestamp
# 1. Analyze table statistics before optimization
pre_stats = spark.sql(f"DESCRIBE DETAIL delta.`{table_path}`")
file_count_before = pre_stats.select("numFiles").collect()[0][0]
print(f"Files before optimization: {file_count_before}")
# 2. Optimize frequently queried partitions
recent_partitions = (delta_table.optimize()
.where("date >= current_date() - interval 30 days")
.executeZOrderBy("customer_id", "product_id"))
# 3. Compact older partitions
older_partitions = (delta_table.optimize()
.where("date < current_date() - interval 30 days")
.executeCompaction())
# 4. Update table statistics
spark.sql(f"ANALYZE TABLE delta.`{table_path}` COMPUTE STATISTICS")
# 5. Clean up old files
vacuum_result = delta_table.vacuum(retention_hours=168) # 7 days
# 6. Generate manifests for external access
delta_table.generate("symlink_format_manifest")
# 7. Check final statistics
post_stats = spark.sql(f"DESCRIBE DETAIL delta.`{table_path}`")
file_count_after = post_stats.select("numFiles").collect()[0][0]
print(f"Files after optimization: {file_count_after}")
print(f"File reduction: {file_count_before - file_count_after}")# Monitor optimize operations over time
optimize_history = (delta_table.history()
.filter(col("operation") == "OPTIMIZE")
.select(
"timestamp",
"operationParameters.predicate",
col("operationMetrics.numFilesAdded").alias("files_added"),
col("operationMetrics.numFilesRemoved").alias("files_removed"),
col("operationMetrics.totalFilesSize").alias("total_size"),
col("operationMetrics.numBatches").alias("batches")
))
optimize_history.show(truncate=False)
# Check vacuum history
vacuum_history = (delta_table.history()
.filter(col("operation") == "VACUUM")
.select(
"timestamp",
col("operationMetrics.numDeletedFiles").alias("deleted_files"),
col("operationMetrics.sizeOfDeletedFiles").alias("deleted_size")
))
vacuum_history.show()Common metrics returned by optimization operations:
numFilesAdded / numFilesRemoved: File count changesfilesAdded.size / filesRemoved.size: Size changes in bytesnumBatches: Number of optimization batches executedtotalFilesSize: Total size after optimizationzOrderStats: Z-order clustering effectiveness metricsInstall with Tessl CLI
npx tessl i tessl/pypi-delta-spark