CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-delta-spark

Python APIs for using Delta Lake with Apache Spark

Pending
Overview
Eval results
Files

configuration.mddocs/

Configuration and Setup

Utilities for configuring Spark sessions, managing Delta Lake dependencies, handling table features, and protocol version management. Provides comprehensive setup and configuration options for Delta Lake integration.

Capabilities

Spark Session Configuration

Configure Spark sessions with Delta Lake dependencies and settings.

def configure_spark_with_delta_pip(
    spark_session_builder: SparkSession.Builder,
    extra_packages: Optional[List[str]] = None
) -> SparkSession.Builder:
    """
    Configure SparkSession builder to automatically download Delta Lake JARs.
    
    Required when using pip-installed delta-spark without spark-submit packages.
    
    Parameters:
    - spark_session_builder: SparkSession.Builder to configure
    - extra_packages: Optional additional Maven packages to include
    
    Returns:
    Configured SparkSession.Builder with Delta dependencies
    
    Example:
        builder = SparkSession.builder.appName("DeltaApp").master("local[*]")
        spark = configure_spark_with_delta_pip(builder).getOrCreate()
    """

Manual Spark Configuration

# Alternative manual configuration for Spark sessions
spark = SparkSession.builder \
    .appName("DeltaLakeApp") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
    .config("spark.jars.packages", "io.delta:delta-spark_2.13:4.0.0") \
    .getOrCreate()

Protocol Management

Manage table protocol versions and feature support.

class DeltaTable:
    def upgradeTableProtocol(
        self,
        reader_version: int,
        writer_version: int
    ) -> None:
        """
        Upgrade table protocol to support new features.
        
        Parameters:
        - reader_version: Minimum reader version required
        - writer_version: Minimum writer version required
        
        Note: Cannot downgrade protocol versions
        """
    
    def addFeatureSupport(self, feature_name: str) -> None:
        """
        Add support for specific table feature.
        
        Parameters:
        - feature_name: Name of feature to enable
        
        Automatically upgrades protocol if needed:
        - Writer-only features: upgrades to writer version 7
        - Reader-writer features: upgrades to (3, 7)
        """
    
    def dropFeatureSupport(
        self,
        feature_name: str,
        truncate_history: Optional[bool] = None
    ) -> None:
        """
        Drop support for table feature and normalize protocol.
        
        Parameters:
        - feature_name: Name of feature to drop
        - truncate_history: Whether to truncate history before downgrade
        
        Normalizes protocol to weakest possible form after dropping feature.
        """
class DeltaTable {
  def upgradeTableProtocol(readerVersion: Int, writerVersion: Int): Unit
  def addFeatureSupport(featureName: String): Unit
  def dropFeatureSupport(featureName: String): Unit
  def dropFeatureSupport(featureName: String, truncateHistory: Boolean): Unit
}

Usage Examples

Basic Spark Session Setup

from pyspark.sql import SparkSession
from delta import configure_spark_with_delta_pip

# Method 1: Using configure_spark_with_delta_pip (recommended for pip installs)
builder = SparkSession.builder \
    .appName("DeltaLakeApplication") \
    .master("local[*]") \
    .config("spark.sql.adaptive.enabled", "true") \
    .config("spark.sql.adaptive.coalescePartitions.enabled", "true")

spark = configure_spark_with_delta_pip(builder).getOrCreate()

# Method 2: Manual configuration (for cluster deployments)
spark = SparkSession.builder \
    .appName("DeltaLakeApplication") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
    .getOrCreate()

Adding Extra Packages

# Include additional packages with Delta
extra_packages = [
    "org.apache.spark:spark-sql-kafka-0-10_2.13:3.5.0",
    "org.apache.spark:spark-avro_2.13:3.5.0"
]

builder = SparkSession.builder.appName("DeltaWithKafka").master("local[*]")
spark = configure_spark_with_delta_pip(builder, extra_packages).getOrCreate()

Protocol Version Management

# Check current protocol version
table_detail = delta_table.detail()
current_protocol = table_detail.select("minReaderVersion", "minWriterVersion").collect()[0]
print(f"Current protocol: reader={current_protocol[0]}, writer={current_protocol[1]}")

# Upgrade protocol to support new features
delta_table.upgradeTableProtocol(reader_version=3, writer_version=7)

# Add specific feature support
delta_table.addFeatureSupport("columnMapping")
delta_table.addFeatureSupport("changeDataFeed") 
delta_table.addFeatureSupport("generatedColumns")

# Verify protocol upgrade
updated_detail = delta_table.detail()
new_protocol = updated_detail.select("minReaderVersion", "minWriterVersion").collect()[0]
print(f"Updated protocol: reader={new_protocol[0]}, writer={new_protocol[1]}")

Feature Management

# Enable change data feed
delta_table.addFeatureSupport("changeDataFeed")

# Enable column mapping for schema evolution
delta_table.addFeatureSupport("columnMapping")

# Check supported features (requires protocol v7+)
table_features = spark.sql(f"DESCRIBE DETAIL delta.`{table_path}`") \
    .select("tableFeatures").collect()[0][0]
print(f"Enabled features: {table_features}")

# Drop feature support (with history truncation)
delta_table.dropFeatureSupport("changeDataFeed", truncate_history=True)

Advanced Configuration Options

# Production Spark session with Delta optimizations
spark = SparkSession.builder \
    .appName("ProductionDeltaApp") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
    .config("spark.databricks.delta.optimizeWrite.enabled", "true") \
    .config("spark.databricks.delta.autoCompact.enabled", "true") \
    .config("spark.databricks.delta.properties.defaults.autoOptimize.optimizeWrite", "true") \
    .config("spark.databricks.delta.properties.defaults.autoOptimize.autoCompact", "true") \
    .config("spark.sql.adaptive.enabled", "true") \
    .config("spark.sql.adaptive.coalescePartitions.enabled", "true") \
    .config("spark.sql.adaptive.advisoryPartitionSizeInBytes", "128MB") \
    .getOrCreate()

Environment-Specific Configuration

import os

# Development environment
if os.getenv("ENV") == "development":
    builder = SparkSession.builder \
        .appName("DeltaDev") \
        .master("local[*]") \
        .config("spark.ui.port", "4041") \
        .config("spark.sql.warehouse.dir", "/tmp/spark-warehouse")
        
# Production environment  
elif os.getenv("ENV") == "production":
    builder = SparkSession.builder \
        .appName("DeltaProd") \
        .config("spark.sql.adaptive.enabled", "true") \
        .config("spark.sql.adaptive.coalescePartitions.enabled", "true") \
        .config("spark.databricks.delta.optimizeWrite.enabled", "true")

spark = configure_spark_with_delta_pip(builder).getOrCreate()

Table Property Configuration

# Set default table properties for new tables
spark.conf.set("spark.databricks.delta.properties.defaults.logRetentionDuration", "interval 30 days")
spark.conf.set("spark.databricks.delta.properties.defaults.deletedFileRetentionDuration", "interval 7 days")
spark.conf.set("spark.databricks.delta.properties.defaults.enableChangeDataFeed", "true")

# Configure specific table properties
delta_table_builder = DeltaTable.create(spark) \
    .tableName("events") \
    .addColumn("id", "BIGINT") \
    .addColumn("timestamp", "TIMESTAMP") \
    .addColumn("event_type", "STRING") \
    .property("delta.logRetentionDuration", "interval 90 days") \
    .property("delta.enableChangeDataFeed", "true") \
    .property("delta.autoOptimize.optimizeWrite", "true") \
    .property("delta.autoOptimize.autoCompact", "true")

delta_table = delta_table_builder.execute()

Common Configuration Properties

Spark Configuration

  • spark.sql.extensions: Delta SQL extensions
  • spark.sql.catalog.spark_catalog: Delta catalog implementation
  • spark.databricks.delta.optimizeWrite.enabled: Write optimization
  • spark.databricks.delta.autoCompact.enabled: Auto-compaction

Table Properties

  • delta.logRetentionDuration: Transaction log retention
  • delta.deletedFileRetentionDuration: Deleted file retention
  • delta.enableChangeDataFeed: Change data capture
  • delta.autoOptimize.optimizeWrite: Write optimization per table
  • delta.autoOptimize.autoCompact: Auto-compaction per table
  • delta.columnMapping.mode: Schema evolution mode

Protocol Versions

  • Reader Version 1: Basic Delta functionality
  • Reader Version 2: Column mapping support
  • Reader Version 3: Table features support
  • Writer Version 2: Basic write operations
  • Writer Version 4: Change data feed, generated columns
  • Writer Version 7: Table features framework

Feature Dependencies

Table features and their protocol requirements:

  • appendOnly: (1, 2) - Append-only tables
  • invariants: (1, 2) - CHECK constraints
  • checkConstraints: (1, 3) - Named CHECK constraints
  • changeDataFeed: (1, 4) - Change data capture
  • generatedColumns: (1, 4) - Generated/computed columns
  • columnMapping: (2, 5) - Column name mapping
  • identityColumns: (1, 6) - Identity/auto-increment columns
  • deletionVectors: (3, 7) - Efficient row-level deletes
  • rowTracking: (1, 7) - Row-level change tracking

Install with Tessl CLI

npx tessl i tessl/pypi-delta-spark

docs

configuration.md

index.md

merge-operations.md

optimization.md

table-management.md

table-operations.md

time-travel.md

tile.json