or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

configuration.mdindex.mdmerge-operations.mdoptimization.mdtable-management.mdtable-operations.mdtime-travel.md
tile.json

tessl/pypi-delta-spark

Python APIs for using Delta Lake with Apache Spark

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
pypipkg:pypi/delta-spark@4.0.x

To install, run

npx @tessl/cli install tessl/pypi-delta-spark@4.0.0

index.mddocs/

Delta Lake Python API

Delta Lake Python API provides comprehensive functionality for using Delta Lake with Apache Spark. It enables ACID transactions, scalable metadata handling, unified streaming and batch data processing, time travel capabilities, schema evolution, and concurrent read/write operations on data lakes.

Package Information

  • Package Name: delta-spark
  • Language: Python
  • Installation: pip install delta-spark

Core Imports

from delta import DeltaTable, configure_spark_with_delta_pip
from delta.tables import IdentityGenerator

For type annotations:

from typing import Dict, List, Optional, Union
from pyspark.sql import Column, DataFrame, SparkSession
from pyspark.sql.types import DataType, StructType, StructField

Basic Usage

Python:

from pyspark.sql import SparkSession
from delta import DeltaTable, configure_spark_with_delta_pip

# Configure Spark with Delta Lake
builder = SparkSession.builder.appName("DeltaExample").master("local[*]")
spark = configure_spark_with_delta_pip(builder).getOrCreate()

# Create a Delta table from DataFrame
df = spark.range(10)
df.write.format("delta").mode("overwrite").save("/path/to/delta-table")

# Load Delta table
delta_table = DeltaTable.forPath(spark, "/path/to/delta-table")

# Query the table
delta_table.toDF().show()

# Update rows
delta_table.update(
    condition="id > 5",
    set={"id": "id + 100"}
)

# View history
delta_table.history().show()

Architecture

Delta Lake Python API provides several key layers:

  • Table Operations Layer: High-level APIs for table management, CRUD operations, and advanced features like merge, optimize, and time travel
  • Configuration Layer: Utilities for configuring Spark sessions with Delta Lake dependencies and settings
  • Type System: Complete type annotations and aliases for better IDE support and type safety

Key components include:

  • DeltaTable: Main class for programmatic table operations
  • Builder Classes: Fluent APIs for complex operations (merge, optimize, table creation, column specification)
  • Exception Handling: Comprehensive error types for concurrent operations and conflicts
  • Utility Functions: Configuration helpers and type definitions

Capabilities

Table Operations

Core table management including creation, reading, updating, deleting, and advanced operations like merge and time travel. Provides both path-based and catalog-based table access patterns.

class DeltaTable:
    # Table Access
    @classmethod
    def forPath(cls, spark: SparkSession, path: str, hadoop_conf: Dict[str, str] = None) -> DeltaTable: ...
    @classmethod
    def forName(cls, spark: SparkSession, table_name: str) -> DeltaTable: ...
    @classmethod
    def isDeltaTable(cls, spark: SparkSession, identifier: str) -> bool: ...
    
    # Basic Operations
    def toDF(self) -> DataFrame: ...
    def alias(self, alias_name: str) -> DeltaTable: ...
    def delete(self, condition: Optional[Union[str, Column]] = None) -> None: ...
    def update(self, condition: Optional[Union[str, Column]] = None, set: Optional[Dict[str, Union[str, Column]]] = None) -> None: ...
    
    # Table Conversion
    @classmethod
    def convertToDelta(cls, spark: SparkSession, identifier: str, partition_schema: Optional[Union[str, StructType]] = None) -> DeltaTable: ...
    
    # Table Details
    def detail(self) -> DataFrame: ...

Table Operations

Merge Operations

Advanced merge functionality supporting complex upsert patterns with whenMatched, whenNotMatched, and whenNotMatchedBySource clauses. Enables schema evolution and handles concurrent modifications.

class DeltaMergeBuilder:
    def whenMatchedUpdate(self, condition: str = None, set: dict = None) -> DeltaMergeBuilder: ...
    def whenMatchedDelete(self, condition: str = None) -> DeltaMergeBuilder: ...
    def whenNotMatchedInsert(self, condition: str = None, values: dict = None) -> DeltaMergeBuilder: ...
    def execute(self) -> DataFrame: ...
class DeltaMergeBuilder {
  def whenMatchedUpdate(condition: Column, set: Map[String, Column]): DeltaMergeBuilder
  def whenMatchedDelete(condition: Column): DeltaMergeBuilder
  def whenNotMatchedInsert(values: Map[String, Column]): DeltaMergeBuilder
  def execute(): DataFrame
}

Merge Operations

Table Management

Table creation, schema management, and configuration including table builders for programmatic table creation with columns, partitioning, clustering, and properties.

class DeltaTableBuilder:
    def tableName(self, identifier: str) -> DeltaTableBuilder: ...
    def location(self, location: str) -> DeltaTableBuilder: ...
    def addColumn(self, col_name: str, data_type: str, nullable: bool = True) -> DeltaTableBuilder: ...
    def partitionedBy(self, *cols: str) -> DeltaTableBuilder: ...
    def execute(self) -> DeltaTable: ...
class DeltaTableBuilder {
  def tableName(identifier: String): DeltaTableBuilder
  def location(location: String): DeltaTableBuilder  
  def addColumn(colName: String, dataType: DataType): DeltaTableBuilder
  def partitionedBy(cols: String*): DeltaTableBuilder
  def execute(): DeltaTable
}

Table Management

Time Travel and History

Version control capabilities including time travel queries, table restoration, and history exploration. Supports both version-based and timestamp-based operations.

class DeltaTable:
    def history(self, limit: int = None) -> DataFrame: ...
    def restoreToVersion(self, version: int) -> DataFrame: ...
    def restoreToTimestamp(self, timestamp: str) -> DataFrame: ...
class DeltaTable {
  def history(limit: Int): DataFrame
  def restoreToVersion(version: Long): DataFrame  
  def restoreToTimestamp(timestamp: String): DataFrame
}

Time Travel and History

Optimization and Maintenance

Performance optimization including file compaction, Z-ordering, vacuum operations, and table maintenance. Provides fine-grained control over data layout and storage optimization.

class DeltaTable:
    def optimize(self) -> DeltaOptimizeBuilder: ...
    def vacuum(self, retention_hours: float = None) -> DataFrame: ...

class DeltaOptimizeBuilder:
    def where(self, partition_filter: str) -> DeltaOptimizeBuilder: ...
    def executeCompaction(self) -> DataFrame: ...
    def executeZOrderBy(self, *cols: str) -> DataFrame: ...
class DeltaTable {
  def optimize(): DeltaOptimizeBuilder
  def vacuum(retentionHours: Double): DataFrame
}
class DeltaOptimizeBuilder {
  def where(partitionFilter: String): DeltaOptimizeBuilder
  def executeCompaction(): DataFrame
  def executeZOrderBy(cols: String*): DataFrame  
}

Optimization and Maintenance

Clone Operations

Table cloning functionality for creating copies at specific versions or timestamps. Supports both shallow and deep clones.

class DeltaTable:
    def clone(self, target: str, is_shallow: bool, replace: bool = False, properties: Dict[str, str] = None) -> DeltaTable: ...
    def cloneAtVersion(self, version: int, target: str, is_shallow: bool, replace: bool = False, properties: Dict[str, str] = None) -> DeltaTable: ...
    def cloneAtTimestamp(self, timestamp: str, target: str, is_shallow: bool, replace: bool = False, properties: Dict[str, str] = None) -> DeltaTable: ...

Time Travel and History

Configuration and Setup

Utilities for configuring Spark sessions, managing Delta Lake dependencies, and handling table features and protocol versions.

def configure_spark_with_delta_pip(
    spark_session_builder: SparkSession.Builder,
    extra_packages: Optional[List[str]] = None
) -> SparkSession.Builder: ...

class DeltaTable:
    def upgradeTableProtocol(self, reader_version: int, writer_version: int) -> None: ...
    def addFeatureSupport(self, feature_name: str) -> None: ...
    def dropFeatureSupport(self, feature_name: str, truncate_history: Optional[bool] = None) -> None: ...

Configuration and Setup

Exception Handling

# Import from pyspark.errors
from pyspark.errors import PySparkException

# Base exception for concurrent modification errors  
class DeltaConcurrentModificationException(PySparkException): ...

# Specific concurrent operation exceptions
class ConcurrentWriteException(PySparkException): ...
class MetadataChangedException(PySparkException): ...
class ProtocolChangedException(PySparkException): ...
class ConcurrentAppendException(PySparkException): ...
class ConcurrentDeleteReadException(PySparkException): ...
class ConcurrentDeleteDeleteException(PySparkException): ...
class ConcurrentTransactionException(PySparkException): ...

Type Definitions

Python:

from typing import Dict, Optional, Union, List
from pyspark.sql import Column, DataFrame, SparkSession
from pyspark.sql.types import DataType, StructType, StructField
from dataclasses import dataclass

# Expression and Column types
ExpressionOrColumn = Union[str, Column]
OptionalExpressionOrColumn = Optional[ExpressionOrColumn] 
ColumnMapping = Dict[str, ExpressionOrColumn]
OptionalColumnMapping = Optional[ColumnMapping]

# Identity column configuration
@dataclass
class IdentityGenerator:
    """Identity column specification for auto-incrementing values."""
    start: int = 1  # Starting value for identity sequence
    step: int = 1   # Increment step for identity sequence