Python APIs for using Delta Lake with Apache Spark
—
Utilities for configuring Spark sessions, managing Delta Lake dependencies, handling table features, and protocol version management. Provides comprehensive setup and configuration options for Delta Lake integration.
Configure Spark sessions with Delta Lake dependencies and settings.
def configure_spark_with_delta_pip(
spark_session_builder: SparkSession.Builder,
extra_packages: Optional[List[str]] = None
) -> SparkSession.Builder:
"""
Configure SparkSession builder to automatically download Delta Lake JARs.
Required when using pip-installed delta-spark without spark-submit packages.
Parameters:
- spark_session_builder: SparkSession.Builder to configure
- extra_packages: Optional additional Maven packages to include
Returns:
Configured SparkSession.Builder with Delta dependencies
Example:
builder = SparkSession.builder.appName("DeltaApp").master("local[*]")
spark = configure_spark_with_delta_pip(builder).getOrCreate()
"""# Alternative manual configuration for Spark sessions
spark = SparkSession.builder \
.appName("DeltaLakeApp") \
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
.config("spark.jars.packages", "io.delta:delta-spark_2.13:4.0.0") \
.getOrCreate()Manage table protocol versions and feature support.
class DeltaTable:
def upgradeTableProtocol(
self,
reader_version: int,
writer_version: int
) -> None:
"""
Upgrade table protocol to support new features.
Parameters:
- reader_version: Minimum reader version required
- writer_version: Minimum writer version required
Note: Cannot downgrade protocol versions
"""
def addFeatureSupport(self, feature_name: str) -> None:
"""
Add support for specific table feature.
Parameters:
- feature_name: Name of feature to enable
Automatically upgrades protocol if needed:
- Writer-only features: upgrades to writer version 7
- Reader-writer features: upgrades to (3, 7)
"""
def dropFeatureSupport(
self,
feature_name: str,
truncate_history: Optional[bool] = None
) -> None:
"""
Drop support for table feature and normalize protocol.
Parameters:
- feature_name: Name of feature to drop
- truncate_history: Whether to truncate history before downgrade
Normalizes protocol to weakest possible form after dropping feature.
"""class DeltaTable {
def upgradeTableProtocol(readerVersion: Int, writerVersion: Int): Unit
def addFeatureSupport(featureName: String): Unit
def dropFeatureSupport(featureName: String): Unit
def dropFeatureSupport(featureName: String, truncateHistory: Boolean): Unit
}from pyspark.sql import SparkSession
from delta import configure_spark_with_delta_pip
# Method 1: Using configure_spark_with_delta_pip (recommended for pip installs)
builder = SparkSession.builder \
.appName("DeltaLakeApplication") \
.master("local[*]") \
.config("spark.sql.adaptive.enabled", "true") \
.config("spark.sql.adaptive.coalescePartitions.enabled", "true")
spark = configure_spark_with_delta_pip(builder).getOrCreate()
# Method 2: Manual configuration (for cluster deployments)
spark = SparkSession.builder \
.appName("DeltaLakeApplication") \
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
.getOrCreate()# Include additional packages with Delta
extra_packages = [
"org.apache.spark:spark-sql-kafka-0-10_2.13:3.5.0",
"org.apache.spark:spark-avro_2.13:3.5.0"
]
builder = SparkSession.builder.appName("DeltaWithKafka").master("local[*]")
spark = configure_spark_with_delta_pip(builder, extra_packages).getOrCreate()# Check current protocol version
table_detail = delta_table.detail()
current_protocol = table_detail.select("minReaderVersion", "minWriterVersion").collect()[0]
print(f"Current protocol: reader={current_protocol[0]}, writer={current_protocol[1]}")
# Upgrade protocol to support new features
delta_table.upgradeTableProtocol(reader_version=3, writer_version=7)
# Add specific feature support
delta_table.addFeatureSupport("columnMapping")
delta_table.addFeatureSupport("changeDataFeed")
delta_table.addFeatureSupport("generatedColumns")
# Verify protocol upgrade
updated_detail = delta_table.detail()
new_protocol = updated_detail.select("minReaderVersion", "minWriterVersion").collect()[0]
print(f"Updated protocol: reader={new_protocol[0]}, writer={new_protocol[1]}")# Enable change data feed
delta_table.addFeatureSupport("changeDataFeed")
# Enable column mapping for schema evolution
delta_table.addFeatureSupport("columnMapping")
# Check supported features (requires protocol v7+)
table_features = spark.sql(f"DESCRIBE DETAIL delta.`{table_path}`") \
.select("tableFeatures").collect()[0][0]
print(f"Enabled features: {table_features}")
# Drop feature support (with history truncation)
delta_table.dropFeatureSupport("changeDataFeed", truncate_history=True)# Production Spark session with Delta optimizations
spark = SparkSession.builder \
.appName("ProductionDeltaApp") \
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
.config("spark.databricks.delta.optimizeWrite.enabled", "true") \
.config("spark.databricks.delta.autoCompact.enabled", "true") \
.config("spark.databricks.delta.properties.defaults.autoOptimize.optimizeWrite", "true") \
.config("spark.databricks.delta.properties.defaults.autoOptimize.autoCompact", "true") \
.config("spark.sql.adaptive.enabled", "true") \
.config("spark.sql.adaptive.coalescePartitions.enabled", "true") \
.config("spark.sql.adaptive.advisoryPartitionSizeInBytes", "128MB") \
.getOrCreate()import os
# Development environment
if os.getenv("ENV") == "development":
builder = SparkSession.builder \
.appName("DeltaDev") \
.master("local[*]") \
.config("spark.ui.port", "4041") \
.config("spark.sql.warehouse.dir", "/tmp/spark-warehouse")
# Production environment
elif os.getenv("ENV") == "production":
builder = SparkSession.builder \
.appName("DeltaProd") \
.config("spark.sql.adaptive.enabled", "true") \
.config("spark.sql.adaptive.coalescePartitions.enabled", "true") \
.config("spark.databricks.delta.optimizeWrite.enabled", "true")
spark = configure_spark_with_delta_pip(builder).getOrCreate()# Set default table properties for new tables
spark.conf.set("spark.databricks.delta.properties.defaults.logRetentionDuration", "interval 30 days")
spark.conf.set("spark.databricks.delta.properties.defaults.deletedFileRetentionDuration", "interval 7 days")
spark.conf.set("spark.databricks.delta.properties.defaults.enableChangeDataFeed", "true")
# Configure specific table properties
delta_table_builder = DeltaTable.create(spark) \
.tableName("events") \
.addColumn("id", "BIGINT") \
.addColumn("timestamp", "TIMESTAMP") \
.addColumn("event_type", "STRING") \
.property("delta.logRetentionDuration", "interval 90 days") \
.property("delta.enableChangeDataFeed", "true") \
.property("delta.autoOptimize.optimizeWrite", "true") \
.property("delta.autoOptimize.autoCompact", "true")
delta_table = delta_table_builder.execute()spark.sql.extensions: Delta SQL extensionsspark.sql.catalog.spark_catalog: Delta catalog implementationspark.databricks.delta.optimizeWrite.enabled: Write optimizationspark.databricks.delta.autoCompact.enabled: Auto-compactiondelta.logRetentionDuration: Transaction log retentiondelta.deletedFileRetentionDuration: Deleted file retentiondelta.enableChangeDataFeed: Change data capturedelta.autoOptimize.optimizeWrite: Write optimization per tabledelta.autoOptimize.autoCompact: Auto-compaction per tabledelta.columnMapping.mode: Schema evolution modeTable features and their protocol requirements:
Install with Tessl CLI
npx tessl i tessl/pypi-delta-spark