0
# Configuration and Setup
1
2
Utilities for configuring Spark sessions, managing Delta Lake dependencies, handling table features, and protocol version management. Provides comprehensive setup and configuration options for Delta Lake integration.
3
4
## Capabilities
5
6
### Spark Session Configuration
7
8
Configure Spark sessions with Delta Lake dependencies and settings.
9
10
```python { .api }
11
def configure_spark_with_delta_pip(
12
spark_session_builder: SparkSession.Builder,
13
extra_packages: Optional[List[str]] = None
14
) -> SparkSession.Builder:
15
"""
16
Configure SparkSession builder to automatically download Delta Lake JARs.
17
18
Required when using pip-installed delta-spark without spark-submit packages.
19
20
Parameters:
21
- spark_session_builder: SparkSession.Builder to configure
22
- extra_packages: Optional additional Maven packages to include
23
24
Returns:
25
Configured SparkSession.Builder with Delta dependencies
26
27
Example:
28
builder = SparkSession.builder.appName("DeltaApp").master("local[*]")
29
spark = configure_spark_with_delta_pip(builder).getOrCreate()
30
"""
31
```
32
33
### Manual Spark Configuration
34
35
```python
36
# Alternative manual configuration for Spark sessions
37
spark = SparkSession.builder \
38
.appName("DeltaLakeApp") \
39
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
40
.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
41
.config("spark.jars.packages", "io.delta:delta-spark_2.13:4.0.0") \
42
.getOrCreate()
43
```
44
45
### Protocol Management
46
47
Manage table protocol versions and feature support.
48
49
```python { .api }
50
class DeltaTable:
51
def upgradeTableProtocol(
52
self,
53
reader_version: int,
54
writer_version: int
55
) -> None:
56
"""
57
Upgrade table protocol to support new features.
58
59
Parameters:
60
- reader_version: Minimum reader version required
61
- writer_version: Minimum writer version required
62
63
Note: Cannot downgrade protocol versions
64
"""
65
66
def addFeatureSupport(self, feature_name: str) -> None:
67
"""
68
Add support for specific table feature.
69
70
Parameters:
71
- feature_name: Name of feature to enable
72
73
Automatically upgrades protocol if needed:
74
- Writer-only features: upgrades to writer version 7
75
- Reader-writer features: upgrades to (3, 7)
76
"""
77
78
def dropFeatureSupport(
79
self,
80
feature_name: str,
81
truncate_history: Optional[bool] = None
82
) -> None:
83
"""
84
Drop support for table feature and normalize protocol.
85
86
Parameters:
87
- feature_name: Name of feature to drop
88
- truncate_history: Whether to truncate history before downgrade
89
90
Normalizes protocol to weakest possible form after dropping feature.
91
"""
92
```
93
94
```scala { .api }
95
class DeltaTable {
96
def upgradeTableProtocol(readerVersion: Int, writerVersion: Int): Unit
97
def addFeatureSupport(featureName: String): Unit
98
def dropFeatureSupport(featureName: String): Unit
99
def dropFeatureSupport(featureName: String, truncateHistory: Boolean): Unit
100
}
101
```
102
103
## Usage Examples
104
105
### Basic Spark Session Setup
106
107
```python
108
from pyspark.sql import SparkSession
109
from delta import configure_spark_with_delta_pip
110
111
# Method 1: Using configure_spark_with_delta_pip (recommended for pip installs)
112
builder = SparkSession.builder \
113
.appName("DeltaLakeApplication") \
114
.master("local[*]") \
115
.config("spark.sql.adaptive.enabled", "true") \
116
.config("spark.sql.adaptive.coalescePartitions.enabled", "true")
117
118
spark = configure_spark_with_delta_pip(builder).getOrCreate()
119
120
# Method 2: Manual configuration (for cluster deployments)
121
spark = SparkSession.builder \
122
.appName("DeltaLakeApplication") \
123
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
124
.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
125
.getOrCreate()
126
```
127
128
### Adding Extra Packages
129
130
```python
131
# Include additional packages with Delta
132
extra_packages = [
133
"org.apache.spark:spark-sql-kafka-0-10_2.13:3.5.0",
134
"org.apache.spark:spark-avro_2.13:3.5.0"
135
]
136
137
builder = SparkSession.builder.appName("DeltaWithKafka").master("local[*]")
138
spark = configure_spark_with_delta_pip(builder, extra_packages).getOrCreate()
139
```
140
141
### Protocol Version Management
142
143
```python
144
# Check current protocol version
145
table_detail = delta_table.detail()
146
current_protocol = table_detail.select("minReaderVersion", "minWriterVersion").collect()[0]
147
print(f"Current protocol: reader={current_protocol[0]}, writer={current_protocol[1]}")
148
149
# Upgrade protocol to support new features
150
delta_table.upgradeTableProtocol(reader_version=3, writer_version=7)
151
152
# Add specific feature support
153
delta_table.addFeatureSupport("columnMapping")
154
delta_table.addFeatureSupport("changeDataFeed")
155
delta_table.addFeatureSupport("generatedColumns")
156
157
# Verify protocol upgrade
158
updated_detail = delta_table.detail()
159
new_protocol = updated_detail.select("minReaderVersion", "minWriterVersion").collect()[0]
160
print(f"Updated protocol: reader={new_protocol[0]}, writer={new_protocol[1]}")
161
```
162
163
### Feature Management
164
165
```python
166
# Enable change data feed
167
delta_table.addFeatureSupport("changeDataFeed")
168
169
# Enable column mapping for schema evolution
170
delta_table.addFeatureSupport("columnMapping")
171
172
# Check supported features (requires protocol v7+)
173
table_features = spark.sql(f"DESCRIBE DETAIL delta.`{table_path}`") \
174
.select("tableFeatures").collect()[0][0]
175
print(f"Enabled features: {table_features}")
176
177
# Drop feature support (with history truncation)
178
delta_table.dropFeatureSupport("changeDataFeed", truncate_history=True)
179
```
180
181
### Advanced Configuration Options
182
183
```python
184
# Production Spark session with Delta optimizations
185
spark = SparkSession.builder \
186
.appName("ProductionDeltaApp") \
187
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
188
.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
189
.config("spark.databricks.delta.optimizeWrite.enabled", "true") \
190
.config("spark.databricks.delta.autoCompact.enabled", "true") \
191
.config("spark.databricks.delta.properties.defaults.autoOptimize.optimizeWrite", "true") \
192
.config("spark.databricks.delta.properties.defaults.autoOptimize.autoCompact", "true") \
193
.config("spark.sql.adaptive.enabled", "true") \
194
.config("spark.sql.adaptive.coalescePartitions.enabled", "true") \
195
.config("spark.sql.adaptive.advisoryPartitionSizeInBytes", "128MB") \
196
.getOrCreate()
197
```
198
199
### Environment-Specific Configuration
200
201
```python
202
import os
203
204
# Development environment
205
if os.getenv("ENV") == "development":
206
builder = SparkSession.builder \
207
.appName("DeltaDev") \
208
.master("local[*]") \
209
.config("spark.ui.port", "4041") \
210
.config("spark.sql.warehouse.dir", "/tmp/spark-warehouse")
211
212
# Production environment
213
elif os.getenv("ENV") == "production":
214
builder = SparkSession.builder \
215
.appName("DeltaProd") \
216
.config("spark.sql.adaptive.enabled", "true") \
217
.config("spark.sql.adaptive.coalescePartitions.enabled", "true") \
218
.config("spark.databricks.delta.optimizeWrite.enabled", "true")
219
220
spark = configure_spark_with_delta_pip(builder).getOrCreate()
221
```
222
223
### Table Property Configuration
224
225
```python
226
# Set default table properties for new tables
227
spark.conf.set("spark.databricks.delta.properties.defaults.logRetentionDuration", "interval 30 days")
228
spark.conf.set("spark.databricks.delta.properties.defaults.deletedFileRetentionDuration", "interval 7 days")
229
spark.conf.set("spark.databricks.delta.properties.defaults.enableChangeDataFeed", "true")
230
231
# Configure specific table properties
232
delta_table_builder = DeltaTable.create(spark) \
233
.tableName("events") \
234
.addColumn("id", "BIGINT") \
235
.addColumn("timestamp", "TIMESTAMP") \
236
.addColumn("event_type", "STRING") \
237
.property("delta.logRetentionDuration", "interval 90 days") \
238
.property("delta.enableChangeDataFeed", "true") \
239
.property("delta.autoOptimize.optimizeWrite", "true") \
240
.property("delta.autoOptimize.autoCompact", "true")
241
242
delta_table = delta_table_builder.execute()
243
```
244
245
## Common Configuration Properties
246
247
### Spark Configuration
248
- `spark.sql.extensions`: Delta SQL extensions
249
- `spark.sql.catalog.spark_catalog`: Delta catalog implementation
250
- `spark.databricks.delta.optimizeWrite.enabled`: Write optimization
251
- `spark.databricks.delta.autoCompact.enabled`: Auto-compaction
252
253
### Table Properties
254
- `delta.logRetentionDuration`: Transaction log retention
255
- `delta.deletedFileRetentionDuration`: Deleted file retention
256
- `delta.enableChangeDataFeed`: Change data capture
257
- `delta.autoOptimize.optimizeWrite`: Write optimization per table
258
- `delta.autoOptimize.autoCompact`: Auto-compaction per table
259
- `delta.columnMapping.mode`: Schema evolution mode
260
261
### Protocol Versions
262
- Reader Version 1: Basic Delta functionality
263
- Reader Version 2: Column mapping support
264
- Reader Version 3: Table features support
265
- Writer Version 2: Basic write operations
266
- Writer Version 4: Change data feed, generated columns
267
- Writer Version 7: Table features framework
268
269
## Feature Dependencies
270
271
Table features and their protocol requirements:
272
273
- **appendOnly**: (1, 2) - Append-only tables
274
- **invariants**: (1, 2) - CHECK constraints
275
- **checkConstraints**: (1, 3) - Named CHECK constraints
276
- **changeDataFeed**: (1, 4) - Change data capture
277
- **generatedColumns**: (1, 4) - Generated/computed columns
278
- **columnMapping**: (2, 5) - Column name mapping
279
- **identityColumns**: (1, 6) - Identity/auto-increment columns
280
- **deletionVectors**: (3, 7) - Efficient row-level deletes
281
- **rowTracking**: (1, 7) - Row-level change tracking