Python APIs for using Delta Lake with Apache Spark
npx @tessl/cli install tessl/pypi-delta-spark@4.0.0Delta Lake Python API provides comprehensive functionality for using Delta Lake with Apache Spark. It enables ACID transactions, scalable metadata handling, unified streaming and batch data processing, time travel capabilities, schema evolution, and concurrent read/write operations on data lakes.
pip install delta-sparkfrom delta import DeltaTable, configure_spark_with_delta_pip
from delta.tables import IdentityGeneratorFor type annotations:
from typing import Dict, List, Optional, Union
from pyspark.sql import Column, DataFrame, SparkSession
from pyspark.sql.types import DataType, StructType, StructFieldPython:
from pyspark.sql import SparkSession
from delta import DeltaTable, configure_spark_with_delta_pip
# Configure Spark with Delta Lake
builder = SparkSession.builder.appName("DeltaExample").master("local[*]")
spark = configure_spark_with_delta_pip(builder).getOrCreate()
# Create a Delta table from DataFrame
df = spark.range(10)
df.write.format("delta").mode("overwrite").save("/path/to/delta-table")
# Load Delta table
delta_table = DeltaTable.forPath(spark, "/path/to/delta-table")
# Query the table
delta_table.toDF().show()
# Update rows
delta_table.update(
condition="id > 5",
set={"id": "id + 100"}
)
# View history
delta_table.history().show()Delta Lake Python API provides several key layers:
Key components include:
Core table management including creation, reading, updating, deleting, and advanced operations like merge and time travel. Provides both path-based and catalog-based table access patterns.
class DeltaTable:
# Table Access
@classmethod
def forPath(cls, spark: SparkSession, path: str, hadoop_conf: Dict[str, str] = None) -> DeltaTable: ...
@classmethod
def forName(cls, spark: SparkSession, table_name: str) -> DeltaTable: ...
@classmethod
def isDeltaTable(cls, spark: SparkSession, identifier: str) -> bool: ...
# Basic Operations
def toDF(self) -> DataFrame: ...
def alias(self, alias_name: str) -> DeltaTable: ...
def delete(self, condition: Optional[Union[str, Column]] = None) -> None: ...
def update(self, condition: Optional[Union[str, Column]] = None, set: Optional[Dict[str, Union[str, Column]]] = None) -> None: ...
# Table Conversion
@classmethod
def convertToDelta(cls, spark: SparkSession, identifier: str, partition_schema: Optional[Union[str, StructType]] = None) -> DeltaTable: ...
# Table Details
def detail(self) -> DataFrame: ...Advanced merge functionality supporting complex upsert patterns with whenMatched, whenNotMatched, and whenNotMatchedBySource clauses. Enables schema evolution and handles concurrent modifications.
class DeltaMergeBuilder:
def whenMatchedUpdate(self, condition: str = None, set: dict = None) -> DeltaMergeBuilder: ...
def whenMatchedDelete(self, condition: str = None) -> DeltaMergeBuilder: ...
def whenNotMatchedInsert(self, condition: str = None, values: dict = None) -> DeltaMergeBuilder: ...
def execute(self) -> DataFrame: ...class DeltaMergeBuilder {
def whenMatchedUpdate(condition: Column, set: Map[String, Column]): DeltaMergeBuilder
def whenMatchedDelete(condition: Column): DeltaMergeBuilder
def whenNotMatchedInsert(values: Map[String, Column]): DeltaMergeBuilder
def execute(): DataFrame
}Table creation, schema management, and configuration including table builders for programmatic table creation with columns, partitioning, clustering, and properties.
class DeltaTableBuilder:
def tableName(self, identifier: str) -> DeltaTableBuilder: ...
def location(self, location: str) -> DeltaTableBuilder: ...
def addColumn(self, col_name: str, data_type: str, nullable: bool = True) -> DeltaTableBuilder: ...
def partitionedBy(self, *cols: str) -> DeltaTableBuilder: ...
def execute(self) -> DeltaTable: ...class DeltaTableBuilder {
def tableName(identifier: String): DeltaTableBuilder
def location(location: String): DeltaTableBuilder
def addColumn(colName: String, dataType: DataType): DeltaTableBuilder
def partitionedBy(cols: String*): DeltaTableBuilder
def execute(): DeltaTable
}Version control capabilities including time travel queries, table restoration, and history exploration. Supports both version-based and timestamp-based operations.
class DeltaTable:
def history(self, limit: int = None) -> DataFrame: ...
def restoreToVersion(self, version: int) -> DataFrame: ...
def restoreToTimestamp(self, timestamp: str) -> DataFrame: ...class DeltaTable {
def history(limit: Int): DataFrame
def restoreToVersion(version: Long): DataFrame
def restoreToTimestamp(timestamp: String): DataFrame
}Performance optimization including file compaction, Z-ordering, vacuum operations, and table maintenance. Provides fine-grained control over data layout and storage optimization.
class DeltaTable:
def optimize(self) -> DeltaOptimizeBuilder: ...
def vacuum(self, retention_hours: float = None) -> DataFrame: ...
class DeltaOptimizeBuilder:
def where(self, partition_filter: str) -> DeltaOptimizeBuilder: ...
def executeCompaction(self) -> DataFrame: ...
def executeZOrderBy(self, *cols: str) -> DataFrame: ...class DeltaTable {
def optimize(): DeltaOptimizeBuilder
def vacuum(retentionHours: Double): DataFrame
}
class DeltaOptimizeBuilder {
def where(partitionFilter: String): DeltaOptimizeBuilder
def executeCompaction(): DataFrame
def executeZOrderBy(cols: String*): DataFrame
}Table cloning functionality for creating copies at specific versions or timestamps. Supports both shallow and deep clones.
class DeltaTable:
def clone(self, target: str, is_shallow: bool, replace: bool = False, properties: Dict[str, str] = None) -> DeltaTable: ...
def cloneAtVersion(self, version: int, target: str, is_shallow: bool, replace: bool = False, properties: Dict[str, str] = None) -> DeltaTable: ...
def cloneAtTimestamp(self, timestamp: str, target: str, is_shallow: bool, replace: bool = False, properties: Dict[str, str] = None) -> DeltaTable: ...Utilities for configuring Spark sessions, managing Delta Lake dependencies, and handling table features and protocol versions.
def configure_spark_with_delta_pip(
spark_session_builder: SparkSession.Builder,
extra_packages: Optional[List[str]] = None
) -> SparkSession.Builder: ...
class DeltaTable:
def upgradeTableProtocol(self, reader_version: int, writer_version: int) -> None: ...
def addFeatureSupport(self, feature_name: str) -> None: ...
def dropFeatureSupport(self, feature_name: str, truncate_history: Optional[bool] = None) -> None: ...# Import from pyspark.errors
from pyspark.errors import PySparkException
# Base exception for concurrent modification errors
class DeltaConcurrentModificationException(PySparkException): ...
# Specific concurrent operation exceptions
class ConcurrentWriteException(PySparkException): ...
class MetadataChangedException(PySparkException): ...
class ProtocolChangedException(PySparkException): ...
class ConcurrentAppendException(PySparkException): ...
class ConcurrentDeleteReadException(PySparkException): ...
class ConcurrentDeleteDeleteException(PySparkException): ...
class ConcurrentTransactionException(PySparkException): ...Python:
from typing import Dict, Optional, Union, List
from pyspark.sql import Column, DataFrame, SparkSession
from pyspark.sql.types import DataType, StructType, StructField
from dataclasses import dataclass
# Expression and Column types
ExpressionOrColumn = Union[str, Column]
OptionalExpressionOrColumn = Optional[ExpressionOrColumn]
ColumnMapping = Dict[str, ExpressionOrColumn]
OptionalColumnMapping = Optional[ColumnMapping]
# Identity column configuration
@dataclass
class IdentityGenerator:
"""Identity column specification for auto-incrementing values."""
start: int = 1 # Starting value for identity sequence
step: int = 1 # Increment step for identity sequence