or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

configuration.mdindex.mdmerge-operations.mdoptimization.mdtable-management.mdtable-operations.mdtime-travel.md

configuration.mddocs/

0

# Configuration and Setup

1

2

Utilities for configuring Spark sessions, managing Delta Lake dependencies, handling table features, and protocol version management. Provides comprehensive setup and configuration options for Delta Lake integration.

3

4

## Capabilities

5

6

### Spark Session Configuration

7

8

Configure Spark sessions with Delta Lake dependencies and settings.

9

10

```python { .api }

11

def configure_spark_with_delta_pip(

12

spark_session_builder: SparkSession.Builder,

13

extra_packages: Optional[List[str]] = None

14

) -> SparkSession.Builder:

15

"""

16

Configure SparkSession builder to automatically download Delta Lake JARs.

17

18

Required when using pip-installed delta-spark without spark-submit packages.

19

20

Parameters:

21

- spark_session_builder: SparkSession.Builder to configure

22

- extra_packages: Optional additional Maven packages to include

23

24

Returns:

25

Configured SparkSession.Builder with Delta dependencies

26

27

Example:

28

builder = SparkSession.builder.appName("DeltaApp").master("local[*]")

29

spark = configure_spark_with_delta_pip(builder).getOrCreate()

30

"""

31

```

32

33

### Manual Spark Configuration

34

35

```python

36

# Alternative manual configuration for Spark sessions

37

spark = SparkSession.builder \

38

.appName("DeltaLakeApp") \

39

.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \

40

.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \

41

.config("spark.jars.packages", "io.delta:delta-spark_2.13:4.0.0") \

42

.getOrCreate()

43

```

44

45

### Protocol Management

46

47

Manage table protocol versions and feature support.

48

49

```python { .api }

50

class DeltaTable:

51

def upgradeTableProtocol(

52

self,

53

reader_version: int,

54

writer_version: int

55

) -> None:

56

"""

57

Upgrade table protocol to support new features.

58

59

Parameters:

60

- reader_version: Minimum reader version required

61

- writer_version: Minimum writer version required

62

63

Note: Cannot downgrade protocol versions

64

"""

65

66

def addFeatureSupport(self, feature_name: str) -> None:

67

"""

68

Add support for specific table feature.

69

70

Parameters:

71

- feature_name: Name of feature to enable

72

73

Automatically upgrades protocol if needed:

74

- Writer-only features: upgrades to writer version 7

75

- Reader-writer features: upgrades to (3, 7)

76

"""

77

78

def dropFeatureSupport(

79

self,

80

feature_name: str,

81

truncate_history: Optional[bool] = None

82

) -> None:

83

"""

84

Drop support for table feature and normalize protocol.

85

86

Parameters:

87

- feature_name: Name of feature to drop

88

- truncate_history: Whether to truncate history before downgrade

89

90

Normalizes protocol to weakest possible form after dropping feature.

91

"""

92

```

93

94

```scala { .api }

95

class DeltaTable {

96

def upgradeTableProtocol(readerVersion: Int, writerVersion: Int): Unit

97

def addFeatureSupport(featureName: String): Unit

98

def dropFeatureSupport(featureName: String): Unit

99

def dropFeatureSupport(featureName: String, truncateHistory: Boolean): Unit

100

}

101

```

102

103

## Usage Examples

104

105

### Basic Spark Session Setup

106

107

```python

108

from pyspark.sql import SparkSession

109

from delta import configure_spark_with_delta_pip

110

111

# Method 1: Using configure_spark_with_delta_pip (recommended for pip installs)

112

builder = SparkSession.builder \

113

.appName("DeltaLakeApplication") \

114

.master("local[*]") \

115

.config("spark.sql.adaptive.enabled", "true") \

116

.config("spark.sql.adaptive.coalescePartitions.enabled", "true")

117

118

spark = configure_spark_with_delta_pip(builder).getOrCreate()

119

120

# Method 2: Manual configuration (for cluster deployments)

121

spark = SparkSession.builder \

122

.appName("DeltaLakeApplication") \

123

.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \

124

.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \

125

.getOrCreate()

126

```

127

128

### Adding Extra Packages

129

130

```python

131

# Include additional packages with Delta

132

extra_packages = [

133

"org.apache.spark:spark-sql-kafka-0-10_2.13:3.5.0",

134

"org.apache.spark:spark-avro_2.13:3.5.0"

135

]

136

137

builder = SparkSession.builder.appName("DeltaWithKafka").master("local[*]")

138

spark = configure_spark_with_delta_pip(builder, extra_packages).getOrCreate()

139

```

140

141

### Protocol Version Management

142

143

```python

144

# Check current protocol version

145

table_detail = delta_table.detail()

146

current_protocol = table_detail.select("minReaderVersion", "minWriterVersion").collect()[0]

147

print(f"Current protocol: reader={current_protocol[0]}, writer={current_protocol[1]}")

148

149

# Upgrade protocol to support new features

150

delta_table.upgradeTableProtocol(reader_version=3, writer_version=7)

151

152

# Add specific feature support

153

delta_table.addFeatureSupport("columnMapping")

154

delta_table.addFeatureSupport("changeDataFeed")

155

delta_table.addFeatureSupport("generatedColumns")

156

157

# Verify protocol upgrade

158

updated_detail = delta_table.detail()

159

new_protocol = updated_detail.select("minReaderVersion", "minWriterVersion").collect()[0]

160

print(f"Updated protocol: reader={new_protocol[0]}, writer={new_protocol[1]}")

161

```

162

163

### Feature Management

164

165

```python

166

# Enable change data feed

167

delta_table.addFeatureSupport("changeDataFeed")

168

169

# Enable column mapping for schema evolution

170

delta_table.addFeatureSupport("columnMapping")

171

172

# Check supported features (requires protocol v7+)

173

table_features = spark.sql(f"DESCRIBE DETAIL delta.`{table_path}`") \

174

.select("tableFeatures").collect()[0][0]

175

print(f"Enabled features: {table_features}")

176

177

# Drop feature support (with history truncation)

178

delta_table.dropFeatureSupport("changeDataFeed", truncate_history=True)

179

```

180

181

### Advanced Configuration Options

182

183

```python

184

# Production Spark session with Delta optimizations

185

spark = SparkSession.builder \

186

.appName("ProductionDeltaApp") \

187

.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \

188

.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \

189

.config("spark.databricks.delta.optimizeWrite.enabled", "true") \

190

.config("spark.databricks.delta.autoCompact.enabled", "true") \

191

.config("spark.databricks.delta.properties.defaults.autoOptimize.optimizeWrite", "true") \

192

.config("spark.databricks.delta.properties.defaults.autoOptimize.autoCompact", "true") \

193

.config("spark.sql.adaptive.enabled", "true") \

194

.config("spark.sql.adaptive.coalescePartitions.enabled", "true") \

195

.config("spark.sql.adaptive.advisoryPartitionSizeInBytes", "128MB") \

196

.getOrCreate()

197

```

198

199

### Environment-Specific Configuration

200

201

```python

202

import os

203

204

# Development environment

205

if os.getenv("ENV") == "development":

206

builder = SparkSession.builder \

207

.appName("DeltaDev") \

208

.master("local[*]") \

209

.config("spark.ui.port", "4041") \

210

.config("spark.sql.warehouse.dir", "/tmp/spark-warehouse")

211

212

# Production environment

213

elif os.getenv("ENV") == "production":

214

builder = SparkSession.builder \

215

.appName("DeltaProd") \

216

.config("spark.sql.adaptive.enabled", "true") \

217

.config("spark.sql.adaptive.coalescePartitions.enabled", "true") \

218

.config("spark.databricks.delta.optimizeWrite.enabled", "true")

219

220

spark = configure_spark_with_delta_pip(builder).getOrCreate()

221

```

222

223

### Table Property Configuration

224

225

```python

226

# Set default table properties for new tables

227

spark.conf.set("spark.databricks.delta.properties.defaults.logRetentionDuration", "interval 30 days")

228

spark.conf.set("spark.databricks.delta.properties.defaults.deletedFileRetentionDuration", "interval 7 days")

229

spark.conf.set("spark.databricks.delta.properties.defaults.enableChangeDataFeed", "true")

230

231

# Configure specific table properties

232

delta_table_builder = DeltaTable.create(spark) \

233

.tableName("events") \

234

.addColumn("id", "BIGINT") \

235

.addColumn("timestamp", "TIMESTAMP") \

236

.addColumn("event_type", "STRING") \

237

.property("delta.logRetentionDuration", "interval 90 days") \

238

.property("delta.enableChangeDataFeed", "true") \

239

.property("delta.autoOptimize.optimizeWrite", "true") \

240

.property("delta.autoOptimize.autoCompact", "true")

241

242

delta_table = delta_table_builder.execute()

243

```

244

245

## Common Configuration Properties

246

247

### Spark Configuration

248

- `spark.sql.extensions`: Delta SQL extensions

249

- `spark.sql.catalog.spark_catalog`: Delta catalog implementation

250

- `spark.databricks.delta.optimizeWrite.enabled`: Write optimization

251

- `spark.databricks.delta.autoCompact.enabled`: Auto-compaction

252

253

### Table Properties

254

- `delta.logRetentionDuration`: Transaction log retention

255

- `delta.deletedFileRetentionDuration`: Deleted file retention

256

- `delta.enableChangeDataFeed`: Change data capture

257

- `delta.autoOptimize.optimizeWrite`: Write optimization per table

258

- `delta.autoOptimize.autoCompact`: Auto-compaction per table

259

- `delta.columnMapping.mode`: Schema evolution mode

260

261

### Protocol Versions

262

- Reader Version 1: Basic Delta functionality

263

- Reader Version 2: Column mapping support

264

- Reader Version 3: Table features support

265

- Writer Version 2: Basic write operations

266

- Writer Version 4: Change data feed, generated columns

267

- Writer Version 7: Table features framework

268

269

## Feature Dependencies

270

271

Table features and their protocol requirements:

272

273

- **appendOnly**: (1, 2) - Append-only tables

274

- **invariants**: (1, 2) - CHECK constraints

275

- **checkConstraints**: (1, 3) - Named CHECK constraints

276

- **changeDataFeed**: (1, 4) - Change data capture

277

- **generatedColumns**: (1, 4) - Generated/computed columns

278

- **columnMapping**: (2, 5) - Column name mapping

279

- **identityColumns**: (1, 6) - Identity/auto-increment columns

280

- **deletionVectors**: (3, 7) - Efficient row-level deletes

281

- **rowTracking**: (1, 7) - Row-level change tracking