0
# Apache Spark SQL - Session Management
1
2
## Capabilities
3
4
### SparkSession Creation and Configuration
5
- Create SparkSession instances using the builder pattern with flexible configuration options
6
- Configure Spark applications with runtime settings, SQL configurations, and custom properties
7
- Access application metadata, configuration state, and runtime information
8
- Manage application lifecycle including stopping sessions gracefully
9
10
### Runtime Configuration Management
11
- Set and retrieve runtime configuration properties during application execution
12
- Check configuration property mutability and reset configurations to defaults
13
- Handle both Spark SQL specific and general Spark configuration parameters
14
- Validate configuration values and handle configuration-related errors
15
16
### Application Context Access
17
- Access Spark application details including application ID, name, and start time
18
- Retrieve SparkContext for lower-level Spark functionality when needed
19
- Monitor application state and resource allocation information
20
21
## API Reference
22
23
### SparkSession Class
24
```scala { .api }
25
abstract class SparkSession extends Serializable with Closeable {
26
// Session properties
27
def sparkContext: SparkContext
28
def conf: RuntimeConfig
29
def version: String
30
31
// Data access interfaces
32
def read: DataFrameReader
33
def readStream: DataStreamReader
34
def streams: StreamingQueryManager
35
def catalog: Catalog
36
def udf: UDFRegistration
37
38
// SQL execution
39
def sql(sqlText: String): DataFrame
40
def table(tableName: String): DataFrame
41
42
// DataFrame/Dataset creation
43
def createDataFrame(rowRDD: RDD[Row], schema: StructType): DataFrame
44
def createDataFrame(rows: java.util.List[Row], schema: StructType): DataFrame
45
def createDataFrame[A <: Product : TypeTag](data: Seq[A]): DataFrame
46
def createDataset[T : Encoder](data: Seq[T]): Dataset[T]
47
def createDataset[T : Encoder](data: RDD[T]): Dataset[T]
48
def createDataset[T : Encoder](data: java.util.List[T]): Dataset[T]
49
50
// Session lifecycle
51
def stop(): Unit
52
def close(): Unit
53
def newSession(): SparkSession
54
}
55
```
56
57
### SparkSession.Builder Class
58
```scala { .api }
59
class Builder {
60
// Basic configuration
61
def appName(name: String): Builder
62
def master(master: String): Builder
63
def config(key: String, value: String): Builder
64
def config(key: String, value: Long): Builder
65
def config(key: String, value: Double): Builder
66
def config(key: String, value: Boolean): Builder
67
def config(conf: SparkConf): Builder
68
69
// Advanced configuration
70
def enableHiveSupport(): Builder
71
def withExtensions(f: SparkSessionExtensions => Unit): Builder
72
73
// Session creation
74
def getOrCreate(): SparkSession
75
}
76
```
77
78
### RuntimeConfig Class
79
```scala { .api }
80
abstract class RuntimeConfig {
81
// Configuration getters
82
def get(key: String): String
83
def get(key: String, default: String): String
84
def getOption(key: String): Option[String]
85
def getAll: Map[String, String]
86
87
// Configuration setters
88
def set(key: String, value: String): Unit
89
def set(key: String, value: Boolean): Unit
90
def set(key: String, value: Long): Unit
91
92
// Configuration management
93
def unset(key: String): Unit
94
def isModifiable(key: String): Boolean
95
}
96
```
97
98
### Supporting Types
99
100
#### SparkConf
101
```scala { .api }
102
class SparkConf(loadDefaults: Boolean = true) extends Cloneable with Serializable {
103
def set(key: String, value: String): SparkConf
104
def setAppName(name: String): SparkConf
105
def setMaster(master: String): SparkConf
106
def get(key: String): String
107
def get(key: String, defaultValue: String): String
108
def getOption(key: String): Option[String]
109
def getAll: Array[(String, String)]
110
def remove(key: String): SparkConf
111
}
112
```
113
114
#### SparkContext
115
```scala { .api }
116
class SparkContext(config: SparkConf) extends Logging {
117
def appName: String
118
def applicationId: String
119
def master: String
120
def deployMode: String
121
def version: String
122
def startTime: Long
123
def defaultParallelism: Int
124
def getConf: SparkConf
125
def isLocal: Boolean
126
def isStopped: Boolean
127
def stop(): Unit
128
}
129
```
130
131
## Usage Examples
132
133
### Creating a Basic SparkSession
134
```scala
135
import org.apache.spark.sql.SparkSession
136
137
val spark = SparkSession.builder()
138
.appName("My Spark Application")
139
.master("local[*]")
140
.config("spark.sql.warehouse.dir", "/path/to/warehouse")
141
.getOrCreate()
142
```
143
144
### Creating SparkSession with Custom Configuration
145
```scala
146
import org.apache.spark.SparkConf
147
import org.apache.spark.sql.SparkSession
148
149
val conf = new SparkConf()
150
.setAppName("Advanced Spark App")
151
.setMaster("yarn")
152
.set("spark.sql.adaptive.enabled", "true")
153
.set("spark.sql.adaptive.coalescePartitions.enabled", "true")
154
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
155
156
val spark = SparkSession.builder()
157
.config(conf)
158
.enableHiveSupport()
159
.getOrCreate()
160
```
161
162
### Runtime Configuration Management
163
```scala
164
// Get current configuration
165
val currentWarehouse = spark.conf.get("spark.sql.warehouse.dir")
166
167
// Set configuration at runtime
168
spark.conf.set("spark.sql.shuffle.partitions", "200")
169
spark.conf.set("spark.sql.adaptive.enabled", true)
170
171
// Check if configuration can be modified
172
val canModify = spark.conf.isModifiable("spark.sql.shuffle.partitions")
173
174
// Get all configurations
175
val allConfigs = spark.conf.getAll
176
allConfigs.foreach { case (key, value) =>
177
println(s"$key = $value")
178
}
179
180
// Unset configuration (reset to default)
181
spark.conf.unset("spark.sql.shuffle.partitions")
182
```
183
184
### Session Extensions and Custom Configuration
185
```scala
186
import org.apache.spark.sql.SparkSessionExtensions
187
188
val spark = SparkSession.builder()
189
.appName("Extended Spark Session")
190
.withExtensions { extensions =>
191
// Register custom rules, functions, etc.
192
extensions.injectResolutionRule { session =>
193
// Custom resolution rule implementation
194
new CustomResolutionRule(session)
195
}
196
}
197
.config("spark.sql.extensions", "com.example.MySparkExtension")
198
.getOrCreate()
199
```
200
201
### Accessing Application Information
202
```scala
203
// Get application details
204
println(s"Application ID: ${spark.sparkContext.applicationId}")
205
println(s"Application Name: ${spark.sparkContext.appName}")
206
println(s"Spark Version: ${spark.version}")
207
println(s"Master URL: ${spark.sparkContext.master}")
208
println(s"Deploy Mode: ${spark.sparkContext.deployMode}")
209
println(s"Default Parallelism: ${spark.sparkContext.defaultParallelism}")
210
211
// Check application state
212
if (!spark.sparkContext.isStopped) {
213
println("Application is running")
214
}
215
```
216
217
### Managing Multiple Sessions
218
```scala
219
// Create primary session
220
val primarySpark = SparkSession.builder()
221
.appName("Primary Session")
222
.getOrCreate()
223
224
// Create isolated session with different configuration
225
val isolatedSpark = primarySpark.newSession()
226
isolatedSpark.conf.set("spark.sql.shuffle.partitions", "100")
227
228
// Both sessions share the same SparkContext but have independent configurations
229
println(s"Primary partitions: ${primarySpark.conf.get('spark.sql.shuffle.partitions')}")
230
println(s"Isolated partitions: ${isolatedSpark.conf.get('spark.sql.shuffle.partitions')}")
231
232
// Clean shutdown
233
isolatedSpark.close()
234
primarySpark.stop()
235
```
236
237
### Configuration Best Practices
238
```scala
239
import org.apache.spark.sql.SparkSession
240
241
// Production-ready configuration
242
val spark = SparkSession.builder()
243
.appName("Production Data Pipeline")
244
// Resource allocation
245
.config("spark.executor.memory", "4g")
246
.config("spark.executor.cores", "2")
247
.config("spark.executor.instances", "10")
248
249
// SQL optimizations
250
.config("spark.sql.adaptive.enabled", "true")
251
.config("spark.sql.adaptive.coalescePartitions.enabled", "true")
252
.config("spark.sql.adaptive.skewJoin.enabled", "true")
253
254
// Serialization
255
.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
256
.config("spark.sql.execution.arrow.pyspark.enabled", "true")
257
258
// Caching and checkpointing
259
.config("spark.sql.cache.serializer", "org.apache.spark.serializer.KryoSerializer")
260
.config("spark.checkpoint.compress", "true")
261
262
// Monitoring
263
.config("spark.sql.execution.arrow.maxRecordsPerBatch", "10000")
264
.config("spark.sql.sources.parallelPartitionDiscovery.threshold", "32")
265
266
.getOrCreate()
267
268
// Set runtime configurations for specific workloads
269
if (isLargeDataset) {
270
spark.conf.set("spark.sql.shuffle.partitions", "400")
271
} else {
272
spark.conf.set("spark.sql.shuffle.partitions", "200")
273
}
274
```