Python APIs for using Delta Lake with Apache Spark
—
Programmatic table creation, schema management, and configuration for Delta Lake tables. Provides fluent builder APIs for creating tables with custom schemas, partitioning, clustering, and properties.
Create table builders for different creation patterns.
class DeltaTable:
@classmethod
def create(cls, spark: SparkSession = None) -> DeltaTableBuilder:
"""
Create a new table (equivalent to CREATE TABLE).
Parameters:
- spark: Optional SparkSession (uses active session if None)
Returns:
DeltaTableBuilder for table configuration
"""
@classmethod
def createIfNotExists(cls, spark: SparkSession = None) -> DeltaTableBuilder:
"""
Create table if it doesn't exist (CREATE TABLE IF NOT EXISTS).
Parameters:
- spark: Optional SparkSession
Returns:
DeltaTableBuilder for table configuration
"""
@classmethod
def replace(cls, spark: SparkSession = None) -> DeltaTableBuilder:
"""
Replace existing table (REPLACE TABLE).
Parameters:
- spark: Optional SparkSession
Returns:
DeltaTableBuilder for table configuration
"""
@classmethod
def createOrReplace(cls, spark: SparkSession = None) -> DeltaTableBuilder:
"""
Create or replace table (CREATE OR REPLACE TABLE).
Parameters:
- spark: Optional SparkSession
Returns:
DeltaTableBuilder for table configuration
"""object DeltaTable {
def create(): DeltaTableBuilder
def create(spark: SparkSession): DeltaTableBuilder
def createIfNotExists(): DeltaTableBuilder
def createIfNotExists(spark: SparkSession): DeltaTableBuilder
def replace(): DeltaTableBuilder
def replace(spark: SparkSession): DeltaTableBuilder
def createOrReplace(): DeltaTableBuilder
def createOrReplace(spark: SparkSession): DeltaTableBuilder
}Configure table name, location, and metadata.
class DeltaTableBuilder:
def tableName(self, identifier: str) -> DeltaTableBuilder:
"""
Set table name, optionally qualified with database.
Parameters:
- identifier: Table name (e.g., "my_table" or "db.my_table")
Returns:
DeltaTableBuilder for method chaining
"""
def location(self, location: str) -> DeltaTableBuilder:
"""
Set table data location path.
Parameters:
- location: Path where table data will be stored
Returns:
DeltaTableBuilder for method chaining
"""
def comment(self, comment: str) -> DeltaTableBuilder:
"""
Add table comment/description.
Parameters:
- comment: Table description
Returns:
DeltaTableBuilder for method chaining
"""class DeltaTableBuilder {
def tableName(identifier: String): DeltaTableBuilder
def location(location: String): DeltaTableBuilder
def comment(comment: String): DeltaTableBuilder
}Define table schema with columns, data types, and constraints.
class DeltaTableBuilder:
def addColumn(
self,
col_name: str,
data_type: Union[str, DataType],
nullable: bool = True,
generated_always_as: Optional[Union[str, IdentityGenerator]] = None,
generated_by_default_as: Optional[IdentityGenerator] = None,
comment: Optional[str] = None
) -> DeltaTableBuilder:
"""
Add column to table schema.
Parameters:
- col_name: Column name
- data_type: Data type as string or DataType object
- nullable: Whether column accepts null values
- generated_always_as: SQL expression or IdentityGenerator for computed column
- generated_by_default_as: IdentityGenerator for identity column with defaults
- comment: Column description
Returns:
DeltaTableBuilder for method chaining
"""
def addColumns(
self,
cols: Union[StructType, List[StructField]]
) -> DeltaTableBuilder:
"""
Add multiple columns from existing schema.
Parameters:
- cols: StructType schema or list of StructField objects
Returns:
DeltaTableBuilder for method chaining
"""class DeltaTableBuilder {
def addColumn(colName: String, dataType: DataType): DeltaTableBuilder
def addColumn(
colName: String,
dataType: DataType,
nullable: Boolean,
generatedAlwaysAs: String,
comment: String
): DeltaTableBuilder
def addColumns(cols: StructType): DeltaTableBuilder
}Configure table partitioning and clustering for performance optimization.
class DeltaTableBuilder:
def partitionedBy(self, *cols: str) -> DeltaTableBuilder:
"""
Specify partitioning columns.
Parameters:
- cols: Column names for partitioning
Returns:
DeltaTableBuilder for method chaining
"""
def clusterBy(self, *cols: str) -> DeltaTableBuilder:
"""
Specify clustering columns for data layout optimization.
Parameters:
- cols: Column names for clustering
Returns:
DeltaTableBuilder for method chaining
"""class DeltaTableBuilder {
def partitionedBy(cols: String*): DeltaTableBuilder
def clusterBy(cols: String*): DeltaTableBuilder
}Set custom table properties and configuration.
class DeltaTableBuilder:
def property(self, key: str, value: str) -> DeltaTableBuilder:
"""
Set table property.
Parameters:
- key: Property name
- value: Property value
Returns:
DeltaTableBuilder for method chaining
"""class DeltaTableBuilder {
def property(key: String, value: String): DeltaTableBuilder
}Execute table creation with configured settings.
class DeltaTableBuilder:
def execute(self) -> DeltaTable:
"""
Execute table creation.
Returns:
DeltaTable instance for the created table
"""class DeltaTableBuilder {
def execute(): DeltaTable
}Configure identity columns for auto-incrementing values.
@dataclass
class IdentityGenerator:
"""Identity column configuration for auto-incrementing values."""
start: int = 1 # Starting value for identity sequence
step: int = 1 # Increment step for identity sequenceCreate detailed column specifications.
class DeltaTable:
@classmethod
def columnBuilder(cls, col_name: str, spark: Optional[SparkSession] = None) -> DeltaColumnBuilder:
"""
Create column builder for detailed column configuration.
Parameters:
- col_name: Column name
- spark: Optional SparkSession
Returns:
DeltaColumnBuilder for column configuration
"""
class DeltaColumnBuilder:
def dataType(self, data_type: Union[str, DataType]) -> DeltaColumnBuilder:
"""
Set column data type.
Parameters:
- data_type: Data type as string or DataType object
Returns:
DeltaColumnBuilder for method chaining
"""
def nullable(self, nullable: bool) -> DeltaColumnBuilder:
"""
Set column nullability.
Parameters:
- nullable: Whether column accepts null values
Returns:
DeltaColumnBuilder for method chaining
"""
def generatedAlwaysAs(self, expression: str) -> DeltaColumnBuilder:
"""
Set column as generated/computed column.
Parameters:
- expression: SQL expression for computed column
Returns:
DeltaColumnBuilder for method chaining
"""
def generatedAlwaysAsIdentity(self, start: int, step: int) -> DeltaColumnBuilder:
"""
Set column as identity column with GENERATED ALWAYS.
Parameters:
- start: Starting value for identity sequence
- step: Increment step for identity sequence
Returns:
DeltaColumnBuilder for method chaining
"""
def generatedByDefaultAsIdentity(self, start: int, step: int) -> DeltaColumnBuilder:
"""
Set column as identity column with GENERATED BY DEFAULT.
Parameters:
- start: Starting value for identity sequence
- step: Increment step for identity sequence
Returns:
DeltaColumnBuilder for method chaining
"""
def comment(self, comment: str) -> DeltaColumnBuilder:
"""
Add column comment/description.
Parameters:
- comment: Column description
Returns:
DeltaColumnBuilder for method chaining
"""
def build(self) -> StructField:
"""
Build and return the StructField for this column.
Returns:
StructField representing the configured column
"""object DeltaTable {
def columnBuilder(colName: String): DeltaColumnBuilder
def columnBuilder(spark: SparkSession, colName: String): DeltaColumnBuilder
}
// DeltaColumnBuilder for detailed column specification
class DeltaColumnBuilder {
def dataType(dataType: String): DeltaColumnBuilder
def dataType(dataType: DataType): DeltaColumnBuilder
def nullable(nullable: Boolean): DeltaColumnBuilder
def generatedAlwaysAs(expression: String): DeltaColumnBuilder
def generatedAlwaysAsIdentity(start: Long, step: Long): DeltaColumnBuilder
def generatedByDefaultAsIdentity(start: Long, step: Long): DeltaColumnBuilder
def comment(comment: String): DeltaColumnBuilder
def build(): StructField
}# Create simple table with schema
delta_table = (DeltaTable.create(spark)
.tableName("employees")
.addColumn("id", "INT", nullable=False)
.addColumn("name", "STRING")
.addColumn("department", "STRING")
.addColumn("salary", "DOUBLE")
.addColumn("created_at", "TIMESTAMP")
.execute())from pyspark.sql.types import *
# Create partitioned table with properties
schema = StructType([
StructField("transaction_id", StringType(), False),
StructField("customer_id", LongType(), False),
StructField("amount", DoubleType(), False),
StructField("transaction_date", DateType(), False),
StructField("region", StringType(), False)
])
delta_table = (DeltaTable.create(spark)
.tableName("transactions")
.location("/path/to/transactions")
.addColumns(schema)
.partitionedBy("transaction_date", "region")
.property("delta.logRetentionDuration", "interval 30 days")
.property("delta.deletedFileRetentionDuration", "interval 7 days")
.comment("Customer transaction data partitioned by date and region")
.execute())from delta.tables import IdentityGenerator
# Create table with identity and computed columns
delta_table = (DeltaTable.create(spark)
.tableName("audit_log")
.addColumn("id", "BIGINT", nullable=False,
generated_always_as=IdentityGenerator(start=1, step=1))
.addColumn("event_type", "STRING", nullable=False)
.addColumn("event_data", "STRING")
.addColumn("created_at", "TIMESTAMP", nullable=False)
.addColumn("date_partition", "DATE", nullable=False,
generated_always_as="CAST(created_at AS DATE)")
.partitionedBy("date_partition")
.execute())# Create table only if it doesn't exist
delta_table = (DeltaTable.createIfNotExists(spark)
.tableName("user_preferences")
.addColumn("user_id", "BIGINT", nullable=False)
.addColumn("preferences", "MAP<STRING, STRING>")
.addColumn("updated_at", "TIMESTAMP")
.execute())
# Replace existing table
delta_table = (DeltaTable.replace(spark)
.tableName("temp_results")
.addColumn("result_id", "STRING")
.addColumn("value", "DOUBLE")
.execute())Common Delta table properties:
delta.logRetentionDuration: How long to keep transaction logsdelta.deletedFileRetentionDuration: Retention for deleted files (vacuum)delta.autoOptimize.optimizeWrite: Enable write optimizationdelta.autoOptimize.autoCompact: Enable auto-compactiondelta.enableChangeDataFeed: Enable change data capturedelta.columnMapping.mode: Column mapping mode for schema evolutionInstall with Tessl CLI
npx tessl i tessl/pypi-delta-spark