CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-delta-spark

Python APIs for using Delta Lake with Apache Spark

Pending
Overview
Eval results
Files

table-management.mddocs/

Table Management

Programmatic table creation, schema management, and configuration for Delta Lake tables. Provides fluent builder APIs for creating tables with custom schemas, partitioning, clustering, and properties.

Capabilities

Table Builders

Create table builders for different creation patterns.

class DeltaTable:
    @classmethod
    def create(cls, spark: SparkSession = None) -> DeltaTableBuilder:
        """
        Create a new table (equivalent to CREATE TABLE).
        
        Parameters:
        - spark: Optional SparkSession (uses active session if None)
        
        Returns:
        DeltaTableBuilder for table configuration
        """
    
    @classmethod
    def createIfNotExists(cls, spark: SparkSession = None) -> DeltaTableBuilder:
        """
        Create table if it doesn't exist (CREATE TABLE IF NOT EXISTS).
        
        Parameters:
        - spark: Optional SparkSession
        
        Returns:
        DeltaTableBuilder for table configuration
        """
    
    @classmethod
    def replace(cls, spark: SparkSession = None) -> DeltaTableBuilder:
        """
        Replace existing table (REPLACE TABLE).
        
        Parameters:
        - spark: Optional SparkSession
        
        Returns:
        DeltaTableBuilder for table configuration
        """
    
    @classmethod
    def createOrReplace(cls, spark: SparkSession = None) -> DeltaTableBuilder:
        """
        Create or replace table (CREATE OR REPLACE TABLE).
        
        Parameters:
        - spark: Optional SparkSession
        
        Returns:
        DeltaTableBuilder for table configuration
        """
object DeltaTable {
  def create(): DeltaTableBuilder
  def create(spark: SparkSession): DeltaTableBuilder
  def createIfNotExists(): DeltaTableBuilder
  def createIfNotExists(spark: SparkSession): DeltaTableBuilder
  def replace(): DeltaTableBuilder  
  def replace(spark: SparkSession): DeltaTableBuilder
  def createOrReplace(): DeltaTableBuilder
  def createOrReplace(spark: SparkSession): DeltaTableBuilder
}

Table Configuration

Configure table name, location, and metadata.

class DeltaTableBuilder:
    def tableName(self, identifier: str) -> DeltaTableBuilder:
        """
        Set table name, optionally qualified with database.
        
        Parameters:
        - identifier: Table name (e.g., "my_table" or "db.my_table")
        
        Returns:
        DeltaTableBuilder for method chaining
        """
    
    def location(self, location: str) -> DeltaTableBuilder:
        """
        Set table data location path.
        
        Parameters:
        - location: Path where table data will be stored
        
        Returns:
        DeltaTableBuilder for method chaining
        """
    
    def comment(self, comment: str) -> DeltaTableBuilder:
        """
        Add table comment/description.
        
        Parameters:
        - comment: Table description
        
        Returns:
        DeltaTableBuilder for method chaining
        """
class DeltaTableBuilder {
  def tableName(identifier: String): DeltaTableBuilder
  def location(location: String): DeltaTableBuilder
  def comment(comment: String): DeltaTableBuilder
}

Column Definitions

Define table schema with columns, data types, and constraints.

class DeltaTableBuilder:
    def addColumn(
        self,
        col_name: str,
        data_type: Union[str, DataType],
        nullable: bool = True,
        generated_always_as: Optional[Union[str, IdentityGenerator]] = None,
        generated_by_default_as: Optional[IdentityGenerator] = None,
        comment: Optional[str] = None
    ) -> DeltaTableBuilder:
        """
        Add column to table schema.
        
        Parameters:
        - col_name: Column name
        - data_type: Data type as string or DataType object
        - nullable: Whether column accepts null values
        - generated_always_as: SQL expression or IdentityGenerator for computed column
        - generated_by_default_as: IdentityGenerator for identity column with defaults
        - comment: Column description
        
        Returns:
        DeltaTableBuilder for method chaining
        """
    
    def addColumns(
        self,
        cols: Union[StructType, List[StructField]]
    ) -> DeltaTableBuilder:
        """
        Add multiple columns from existing schema.
        
        Parameters:
        - cols: StructType schema or list of StructField objects
        
        Returns:
        DeltaTableBuilder for method chaining
        """
class DeltaTableBuilder {
  def addColumn(colName: String, dataType: DataType): DeltaTableBuilder
  def addColumn(
    colName: String, 
    dataType: DataType, 
    nullable: Boolean,
    generatedAlwaysAs: String,
    comment: String
  ): DeltaTableBuilder
  def addColumns(cols: StructType): DeltaTableBuilder
}

Partitioning and Clustering

Configure table partitioning and clustering for performance optimization.

class DeltaTableBuilder:
    def partitionedBy(self, *cols: str) -> DeltaTableBuilder:
        """
        Specify partitioning columns.
        
        Parameters:
        - cols: Column names for partitioning
        
        Returns:
        DeltaTableBuilder for method chaining
        """
    
    def clusterBy(self, *cols: str) -> DeltaTableBuilder:
        """
        Specify clustering columns for data layout optimization.
        
        Parameters:
        - cols: Column names for clustering
        
        Returns:
        DeltaTableBuilder for method chaining
        """
class DeltaTableBuilder {
  def partitionedBy(cols: String*): DeltaTableBuilder
  def clusterBy(cols: String*): DeltaTableBuilder
}

Table Properties

Set custom table properties and configuration.

class DeltaTableBuilder:
    def property(self, key: str, value: str) -> DeltaTableBuilder:
        """
        Set table property.
        
        Parameters:
        - key: Property name
        - value: Property value
        
        Returns:
        DeltaTableBuilder for method chaining
        """
class DeltaTableBuilder {
  def property(key: String, value: String): DeltaTableBuilder
}

Table Creation

Execute table creation with configured settings.

class DeltaTableBuilder:
    def execute(self) -> DeltaTable:
        """
        Execute table creation.
        
        Returns:
        DeltaTable instance for the created table
        """
class DeltaTableBuilder {
  def execute(): DeltaTable
}

Identity Columns

Configure identity columns for auto-incrementing values.

@dataclass
class IdentityGenerator:
    """Identity column configuration for auto-incrementing values."""
    start: int = 1  # Starting value for identity sequence
    step: int = 1   # Increment step for identity sequence

Column Builders

Create detailed column specifications.

class DeltaTable:
    @classmethod
    def columnBuilder(cls, col_name: str, spark: Optional[SparkSession] = None) -> DeltaColumnBuilder:
        """
        Create column builder for detailed column configuration.
        
        Parameters:
        - col_name: Column name
        - spark: Optional SparkSession
        
        Returns:
        DeltaColumnBuilder for column configuration
        """

class DeltaColumnBuilder:
    def dataType(self, data_type: Union[str, DataType]) -> DeltaColumnBuilder:
        """
        Set column data type.
        
        Parameters:
        - data_type: Data type as string or DataType object
        
        Returns:
        DeltaColumnBuilder for method chaining
        """
    
    def nullable(self, nullable: bool) -> DeltaColumnBuilder:
        """
        Set column nullability.
        
        Parameters:
        - nullable: Whether column accepts null values
        
        Returns:
        DeltaColumnBuilder for method chaining
        """
    
    def generatedAlwaysAs(self, expression: str) -> DeltaColumnBuilder:
        """
        Set column as generated/computed column.
        
        Parameters:
        - expression: SQL expression for computed column
        
        Returns:
        DeltaColumnBuilder for method chaining
        """
    
    def generatedAlwaysAsIdentity(self, start: int, step: int) -> DeltaColumnBuilder:
        """
        Set column as identity column with GENERATED ALWAYS.
        
        Parameters:
        - start: Starting value for identity sequence
        - step: Increment step for identity sequence
        
        Returns:
        DeltaColumnBuilder for method chaining
        """
    
    def generatedByDefaultAsIdentity(self, start: int, step: int) -> DeltaColumnBuilder:
        """
        Set column as identity column with GENERATED BY DEFAULT.
        
        Parameters:
        - start: Starting value for identity sequence
        - step: Increment step for identity sequence
        
        Returns:
        DeltaColumnBuilder for method chaining
        """
    
    def comment(self, comment: str) -> DeltaColumnBuilder:
        """
        Add column comment/description.
        
        Parameters:
        - comment: Column description
        
        Returns:
        DeltaColumnBuilder for method chaining
        """
    
    def build(self) -> StructField:
        """
        Build and return the StructField for this column.
        
        Returns:
        StructField representing the configured column
        """
object DeltaTable {
  def columnBuilder(colName: String): DeltaColumnBuilder
  def columnBuilder(spark: SparkSession, colName: String): DeltaColumnBuilder
}

// DeltaColumnBuilder for detailed column specification
class DeltaColumnBuilder {
  def dataType(dataType: String): DeltaColumnBuilder
  def dataType(dataType: DataType): DeltaColumnBuilder
  def nullable(nullable: Boolean): DeltaColumnBuilder
  def generatedAlwaysAs(expression: String): DeltaColumnBuilder
  def generatedAlwaysAsIdentity(start: Long, step: Long): DeltaColumnBuilder
  def generatedByDefaultAsIdentity(start: Long, step: Long): DeltaColumnBuilder
  def comment(comment: String): DeltaColumnBuilder
  def build(): StructField
}

Usage Examples

Basic Table Creation

# Create simple table with schema
delta_table = (DeltaTable.create(spark)
    .tableName("employees")
    .addColumn("id", "INT", nullable=False)
    .addColumn("name", "STRING")
    .addColumn("department", "STRING")
    .addColumn("salary", "DOUBLE")
    .addColumn("created_at", "TIMESTAMP")
    .execute())

Table with Partitioning and Properties

from pyspark.sql.types import *

# Create partitioned table with properties
schema = StructType([
    StructField("transaction_id", StringType(), False),
    StructField("customer_id", LongType(), False),
    StructField("amount", DoubleType(), False),
    StructField("transaction_date", DateType(), False),
    StructField("region", StringType(), False)
])

delta_table = (DeltaTable.create(spark)
    .tableName("transactions")
    .location("/path/to/transactions")
    .addColumns(schema)
    .partitionedBy("transaction_date", "region")
    .property("delta.logRetentionDuration", "interval 30 days")
    .property("delta.deletedFileRetentionDuration", "interval 7 days")
    .comment("Customer transaction data partitioned by date and region")
    .execute())

Table with Generated Columns

from delta.tables import IdentityGenerator

# Create table with identity and computed columns
delta_table = (DeltaTable.create(spark)
    .tableName("audit_log")
    .addColumn("id", "BIGINT", nullable=False, 
               generated_always_as=IdentityGenerator(start=1, step=1))
    .addColumn("event_type", "STRING", nullable=False)
    .addColumn("event_data", "STRING")
    .addColumn("created_at", "TIMESTAMP", nullable=False)
    .addColumn("date_partition", "DATE", nullable=False,
               generated_always_as="CAST(created_at AS DATE)")
    .partitionedBy("date_partition")
    .execute())

Conditional Table Creation

# Create table only if it doesn't exist
delta_table = (DeltaTable.createIfNotExists(spark)
    .tableName("user_preferences")
    .addColumn("user_id", "BIGINT", nullable=False)
    .addColumn("preferences", "MAP<STRING, STRING>")
    .addColumn("updated_at", "TIMESTAMP")
    .execute())

# Replace existing table
delta_table = (DeltaTable.replace(spark)
    .tableName("temp_results") 
    .addColumn("result_id", "STRING")
    .addColumn("value", "DOUBLE")
    .execute())

Table Properties

Common Delta table properties:

  • delta.logRetentionDuration: How long to keep transaction logs
  • delta.deletedFileRetentionDuration: Retention for deleted files (vacuum)
  • delta.autoOptimize.optimizeWrite: Enable write optimization
  • delta.autoOptimize.autoCompact: Enable auto-compaction
  • delta.enableChangeDataFeed: Enable change data capture
  • delta.columnMapping.mode: Column mapping mode for schema evolution

Install with Tessl CLI

npx tessl i tessl/pypi-delta-spark

docs

configuration.md

index.md

merge-operations.md

optimization.md

table-management.md

table-operations.md

time-travel.md

tile.json