Python APIs for using Delta Lake with Apache Spark
npx @tessl/cli install tessl/pypi-delta-spark@4.0.00
# Delta Lake Python API
1
2
Delta Lake Python API provides comprehensive functionality for using Delta Lake with Apache Spark. It enables ACID transactions, scalable metadata handling, unified streaming and batch data processing, time travel capabilities, schema evolution, and concurrent read/write operations on data lakes.
3
4
## Package Information
5
6
- **Package Name**: delta-spark
7
- **Language**: Python
8
- **Installation**: `pip install delta-spark`
9
10
## Core Imports
11
12
```python
13
from delta import DeltaTable, configure_spark_with_delta_pip
14
from delta.tables import IdentityGenerator
15
```
16
17
For type annotations:
18
19
```python
20
from typing import Dict, List, Optional, Union
21
from pyspark.sql import Column, DataFrame, SparkSession
22
from pyspark.sql.types import DataType, StructType, StructField
23
```
24
25
## Basic Usage
26
27
Python:
28
29
```python
30
from pyspark.sql import SparkSession
31
from delta import DeltaTable, configure_spark_with_delta_pip
32
33
# Configure Spark with Delta Lake
34
builder = SparkSession.builder.appName("DeltaExample").master("local[*]")
35
spark = configure_spark_with_delta_pip(builder).getOrCreate()
36
37
# Create a Delta table from DataFrame
38
df = spark.range(10)
39
df.write.format("delta").mode("overwrite").save("/path/to/delta-table")
40
41
# Load Delta table
42
delta_table = DeltaTable.forPath(spark, "/path/to/delta-table")
43
44
# Query the table
45
delta_table.toDF().show()
46
47
# Update rows
48
delta_table.update(
49
condition="id > 5",
50
set={"id": "id + 100"}
51
)
52
53
# View history
54
delta_table.history().show()
55
```
56
57
## Architecture
58
59
Delta Lake Python API provides several key layers:
60
61
- **Table Operations Layer**: High-level APIs for table management, CRUD operations, and advanced features like merge, optimize, and time travel
62
- **Configuration Layer**: Utilities for configuring Spark sessions with Delta Lake dependencies and settings
63
- **Type System**: Complete type annotations and aliases for better IDE support and type safety
64
65
Key components include:
66
67
- **DeltaTable**: Main class for programmatic table operations
68
- **Builder Classes**: Fluent APIs for complex operations (merge, optimize, table creation, column specification)
69
- **Exception Handling**: Comprehensive error types for concurrent operations and conflicts
70
- **Utility Functions**: Configuration helpers and type definitions
71
72
## Capabilities
73
74
### Table Operations
75
76
Core table management including creation, reading, updating, deleting, and advanced operations like merge and time travel. Provides both path-based and catalog-based table access patterns.
77
78
```python { .api }
79
class DeltaTable:
80
# Table Access
81
@classmethod
82
def forPath(cls, spark: SparkSession, path: str, hadoop_conf: Dict[str, str] = None) -> DeltaTable: ...
83
@classmethod
84
def forName(cls, spark: SparkSession, table_name: str) -> DeltaTable: ...
85
@classmethod
86
def isDeltaTable(cls, spark: SparkSession, identifier: str) -> bool: ...
87
88
# Basic Operations
89
def toDF(self) -> DataFrame: ...
90
def alias(self, alias_name: str) -> DeltaTable: ...
91
def delete(self, condition: Optional[Union[str, Column]] = None) -> None: ...
92
def update(self, condition: Optional[Union[str, Column]] = None, set: Optional[Dict[str, Union[str, Column]]] = None) -> None: ...
93
94
# Table Conversion
95
@classmethod
96
def convertToDelta(cls, spark: SparkSession, identifier: str, partition_schema: Optional[Union[str, StructType]] = None) -> DeltaTable: ...
97
98
# Table Details
99
def detail(self) -> DataFrame: ...
100
```
101
102
[Table Operations](./table-operations.md)
103
104
### Merge Operations
105
106
Advanced merge functionality supporting complex upsert patterns with whenMatched, whenNotMatched, and whenNotMatchedBySource clauses. Enables schema evolution and handles concurrent modifications.
107
108
```python { .api }
109
class DeltaMergeBuilder:
110
def whenMatchedUpdate(self, condition: str = None, set: dict = None) -> DeltaMergeBuilder: ...
111
def whenMatchedDelete(self, condition: str = None) -> DeltaMergeBuilder: ...
112
def whenNotMatchedInsert(self, condition: str = None, values: dict = None) -> DeltaMergeBuilder: ...
113
def execute(self) -> DataFrame: ...
114
```
115
116
```scala { .api }
117
class DeltaMergeBuilder {
118
def whenMatchedUpdate(condition: Column, set: Map[String, Column]): DeltaMergeBuilder
119
def whenMatchedDelete(condition: Column): DeltaMergeBuilder
120
def whenNotMatchedInsert(values: Map[String, Column]): DeltaMergeBuilder
121
def execute(): DataFrame
122
}
123
```
124
125
[Merge Operations](./merge-operations.md)
126
127
### Table Management
128
129
Table creation, schema management, and configuration including table builders for programmatic table creation with columns, partitioning, clustering, and properties.
130
131
```python { .api }
132
class DeltaTableBuilder:
133
def tableName(self, identifier: str) -> DeltaTableBuilder: ...
134
def location(self, location: str) -> DeltaTableBuilder: ...
135
def addColumn(self, col_name: str, data_type: str, nullable: bool = True) -> DeltaTableBuilder: ...
136
def partitionedBy(self, *cols: str) -> DeltaTableBuilder: ...
137
def execute(self) -> DeltaTable: ...
138
```
139
140
```scala { .api }
141
class DeltaTableBuilder {
142
def tableName(identifier: String): DeltaTableBuilder
143
def location(location: String): DeltaTableBuilder
144
def addColumn(colName: String, dataType: DataType): DeltaTableBuilder
145
def partitionedBy(cols: String*): DeltaTableBuilder
146
def execute(): DeltaTable
147
}
148
```
149
150
[Table Management](./table-management.md)
151
152
### Time Travel and History
153
154
Version control capabilities including time travel queries, table restoration, and history exploration. Supports both version-based and timestamp-based operations.
155
156
```python { .api }
157
class DeltaTable:
158
def history(self, limit: int = None) -> DataFrame: ...
159
def restoreToVersion(self, version: int) -> DataFrame: ...
160
def restoreToTimestamp(self, timestamp: str) -> DataFrame: ...
161
```
162
163
```scala { .api }
164
class DeltaTable {
165
def history(limit: Int): DataFrame
166
def restoreToVersion(version: Long): DataFrame
167
def restoreToTimestamp(timestamp: String): DataFrame
168
}
169
```
170
171
[Time Travel and History](./time-travel.md)
172
173
### Optimization and Maintenance
174
175
Performance optimization including file compaction, Z-ordering, vacuum operations, and table maintenance. Provides fine-grained control over data layout and storage optimization.
176
177
```python { .api }
178
class DeltaTable:
179
def optimize(self) -> DeltaOptimizeBuilder: ...
180
def vacuum(self, retention_hours: float = None) -> DataFrame: ...
181
182
class DeltaOptimizeBuilder:
183
def where(self, partition_filter: str) -> DeltaOptimizeBuilder: ...
184
def executeCompaction(self) -> DataFrame: ...
185
def executeZOrderBy(self, *cols: str) -> DataFrame: ...
186
```
187
188
```scala { .api }
189
class DeltaTable {
190
def optimize(): DeltaOptimizeBuilder
191
def vacuum(retentionHours: Double): DataFrame
192
}
193
class DeltaOptimizeBuilder {
194
def where(partitionFilter: String): DeltaOptimizeBuilder
195
def executeCompaction(): DataFrame
196
def executeZOrderBy(cols: String*): DataFrame
197
}
198
```
199
200
[Optimization and Maintenance](./optimization.md)
201
202
### Clone Operations
203
204
Table cloning functionality for creating copies at specific versions or timestamps. Supports both shallow and deep clones.
205
206
```python { .api }
207
class DeltaTable:
208
def clone(self, target: str, is_shallow: bool, replace: bool = False, properties: Dict[str, str] = None) -> DeltaTable: ...
209
def cloneAtVersion(self, version: int, target: str, is_shallow: bool, replace: bool = False, properties: Dict[str, str] = None) -> DeltaTable: ...
210
def cloneAtTimestamp(self, timestamp: str, target: str, is_shallow: bool, replace: bool = False, properties: Dict[str, str] = None) -> DeltaTable: ...
211
```
212
213
[Time Travel and History](./time-travel.md)
214
215
### Configuration and Setup
216
217
Utilities for configuring Spark sessions, managing Delta Lake dependencies, and handling table features and protocol versions.
218
219
```python { .api }
220
def configure_spark_with_delta_pip(
221
spark_session_builder: SparkSession.Builder,
222
extra_packages: Optional[List[str]] = None
223
) -> SparkSession.Builder: ...
224
225
class DeltaTable:
226
def upgradeTableProtocol(self, reader_version: int, writer_version: int) -> None: ...
227
def addFeatureSupport(self, feature_name: str) -> None: ...
228
def dropFeatureSupport(self, feature_name: str, truncate_history: Optional[bool] = None) -> None: ...
229
```
230
231
[Configuration and Setup](./configuration.md)
232
233
## Exception Handling
234
235
```python { .api }
236
# Import from pyspark.errors
237
from pyspark.errors import PySparkException
238
239
# Base exception for concurrent modification errors
240
class DeltaConcurrentModificationException(PySparkException): ...
241
242
# Specific concurrent operation exceptions
243
class ConcurrentWriteException(PySparkException): ...
244
class MetadataChangedException(PySparkException): ...
245
class ProtocolChangedException(PySparkException): ...
246
class ConcurrentAppendException(PySparkException): ...
247
class ConcurrentDeleteReadException(PySparkException): ...
248
class ConcurrentDeleteDeleteException(PySparkException): ...
249
class ConcurrentTransactionException(PySparkException): ...
250
```
251
252
## Type Definitions
253
254
Python:
255
256
```python { .api }
257
from typing import Dict, Optional, Union, List
258
from pyspark.sql import Column, DataFrame, SparkSession
259
from pyspark.sql.types import DataType, StructType, StructField
260
from dataclasses import dataclass
261
262
# Expression and Column types
263
ExpressionOrColumn = Union[str, Column]
264
OptionalExpressionOrColumn = Optional[ExpressionOrColumn]
265
ColumnMapping = Dict[str, ExpressionOrColumn]
266
OptionalColumnMapping = Optional[ColumnMapping]
267
268
# Identity column configuration
269
@dataclass
270
class IdentityGenerator:
271
"""Identity column specification for auto-incrementing values."""
272
start: int = 1 # Starting value for identity sequence
273
step: int = 1 # Increment step for identity sequence
274
```