0
# Session Management
1
2
SparkSession is the unified entry point for all Spark SQL functionality. It provides access to DataFrames, SQL execution, configuration management, and catalog operations. SparkSession replaces the legacy SQLContext and HiveContext, offering a single interface for all Spark SQL operations.
3
4
## SparkSession Creation
5
6
```scala { .api }
7
object SparkSession {
8
def builder(): SparkSession.Builder
9
def getActiveSession: Option[SparkSession]
10
def getDefaultSession: Option[SparkSession]
11
def setActiveSession(session: SparkSession): Unit
12
def setDefaultSession(session: SparkSession): Unit
13
def clearActiveSession(): Unit
14
def clearDefaultSession(): Unit
15
}
16
17
class SparkSession.Builder {
18
def appName(name: String): Builder
19
def master(master: String): Builder
20
def config(key: String, value: String): Builder
21
def config(key: String, value: Long): Builder
22
def config(key: String, value: Double): Builder
23
def config(key: String, value: Boolean): Builder
24
def config(conf: SparkConf): Builder
25
def enableHiveSupport(): Builder
26
def getOrCreate(): SparkSession
27
}
28
```
29
30
**Usage Example:**
31
32
```scala
33
import org.apache.spark.sql.SparkSession
34
35
// Create SparkSession with builder
36
val spark = SparkSession.builder()
37
.appName("My Spark Application")
38
.master("local[4]")
39
.config("spark.sql.adaptive.enabled", "true")
40
.config("spark.sql.adaptive.coalescePartitions.enabled", "true")
41
.enableHiveSupport()
42
.getOrCreate()
43
44
// Get existing session
45
val existingSession = SparkSession.getActiveSession
46
```
47
48
## Core Session Operations
49
50
```scala { .api }
51
class SparkSession {
52
def sql(sqlText: String): DataFrame
53
def read: DataFrameReader
54
def readStream: DataStreamReader
55
def catalog: Catalog
56
def conf: RuntimeConfig
57
def sparkContext: SparkContext
58
def version: String
59
def sessionState: SessionState
60
def sharedState: SharedState
61
def stop(): Unit
62
def close(): Unit
63
}
64
```
65
66
**Usage Examples:**
67
68
```scala
69
// Execute SQL queries
70
val result = spark.sql("SELECT * FROM my_table WHERE age > 25")
71
result.show()
72
73
// Access configuration
74
spark.conf.set("spark.sql.shuffle.partitions", "200")
75
val partitions = spark.conf.get("spark.sql.shuffle.partitions")
76
77
// Get Spark version
78
println(s"Spark version: ${spark.version}")
79
80
// Stop session
81
spark.stop()
82
```
83
84
## Data Creation Methods
85
86
```scala { .api }
87
class SparkSession {
88
def table(tableName: String): DataFrame
89
def range(end: Long): Dataset[Long]
90
def range(start: Long, end: Long): Dataset[Long]
91
def range(start: Long, end: Long, step: Long): Dataset[Long]
92
def range(start: Long, end: Long, step: Long, numPartitions: Int): Dataset[Long]
93
94
def createDataFrame(rowRDD: RDD[Row], schema: StructType): DataFrame
95
def createDataFrame(rows: java.util.List[Row], schema: StructType): DataFrame
96
def createDataFrame[A <: Product : TypeTag](rdd: RDD[A]): DataFrame
97
def createDataFrame[A <: Product : TypeTag](data: Seq[A]): DataFrame
98
99
def createDataset[T : Encoder](data: Seq[T]): Dataset[T]
100
def createDataset[T : Encoder](data: RDD[T]): Dataset[T]
101
def createDataset[T : Encoder](data: java.util.List[T]): Dataset[T]
102
103
def emptyDataFrame: DataFrame
104
def emptyDataset[T : Encoder]: Dataset[T]
105
}
106
```
107
108
**Usage Examples:**
109
110
```scala
111
import org.apache.spark.sql.types._
112
import org.apache.spark.sql.Row
113
114
// Create DataFrame from sequence
115
case class Person(name: String, age: Int)
116
val people = Seq(Person("Alice", 25), Person("Bob", 30))
117
val peopleDF = spark.createDataFrame(people)
118
119
// Create DataFrame with explicit schema
120
val schema = StructType(Array(
121
StructField("name", StringType, nullable = false),
122
StructField("age", IntegerType, nullable = false)
123
))
124
val rows = Seq(Row("Charlie", 35), Row("Diana", 28))
125
val df = spark.createDataFrame(spark.sparkContext.parallelize(rows), schema)
126
127
// Create Dataset with type safety
128
val names = Seq("Alice", "Bob", "Charlie")
129
val namesDS = spark.createDataset(names)
130
131
// Create range Dataset
132
val numbers = spark.range(1, 1000000, 2, numPartitions = 100)
133
134
// Load existing table
135
val tableDF = spark.table("my_database.my_table")
136
```
137
138
## Session Configuration
139
140
```scala { .api }
141
class RuntimeConfig {
142
def set(key: String, value: String): Unit
143
def set(key: String, value: Boolean): Unit
144
def set(key: String, value: Long): Unit
145
def get(key: String): String
146
def get(key: String, default: String): String
147
def getOption(key: String): Option[String]
148
def unset(key: String): Unit
149
def getAll: Map[String, String]
150
def isModifiable(key: String): Boolean
151
}
152
```
153
154
**Common Configuration Properties:**
155
156
```scala
157
// Adaptive Query Execution
158
spark.conf.set("spark.sql.adaptive.enabled", "true")
159
spark.conf.set("spark.sql.adaptive.coalescePartitions.enabled", "true")
160
161
// Shuffle partitions
162
spark.conf.set("spark.sql.shuffle.partitions", "200")
163
164
// Broadcast join threshold
165
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", "10MB")
166
167
// Dynamic allocation
168
spark.conf.set("spark.dynamicAllocation.enabled", "true")
169
spark.conf.set("spark.dynamicAllocation.minExecutors", "1")
170
spark.conf.set("spark.dynamicAllocation.maxExecutors", "20")
171
172
// Serialization
173
spark.conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
174
175
// Check if property is modifiable
176
val canModify = spark.conf.isModifiable("spark.sql.shuffle.partitions")
177
```
178
179
## Session Lifecycle Management
180
181
```scala
182
// Multiple sessions (advanced usage)
183
val session1 = SparkSession.builder()
184
.appName("Session1")
185
.master("local[2]")
186
.getOrCreate()
187
188
val session2 = SparkSession.builder()
189
.appName("Session2")
190
.master("local[2]")
191
.getOrCreate()
192
193
// Set active session
194
SparkSession.setActiveSession(session1)
195
196
// Clean shutdown
197
session1.stop()
198
session2.stop()
199
200
// Or use try-with-resources pattern
201
def withSparkSession[T](appName: String)(f: SparkSession => T): T = {
202
val spark = SparkSession.builder()
203
.appName(appName)
204
.master("local[*]")
205
.getOrCreate()
206
try {
207
f(spark)
208
} finally {
209
spark.stop()
210
}
211
}
212
213
// Usage
214
val result = withSparkSession("MyApp") { spark =>
215
spark.sql("SELECT 1 as test").collect()
216
}
217
```
218
219
## Session State and Shared State
220
221
```scala { .api }
222
// Session-specific state (per SparkSession)
223
trait SessionState {
224
def catalog: SessionCatalog
225
def analyzer: Analyzer
226
def optimizer: Optimizer
227
def planner: SparkPlanner
228
def conf: SQLConf
229
}
230
231
// Shared state (across SparkSessions in same SparkContext)
232
trait SharedState {
233
def sparkContext: SparkContext
234
def externalCatalog: ExternalCatalog
235
def globalTempViewManager: GlobalTempViewManager
236
def cacheManager: CacheManager
237
}
238
```
239
240
The session and shared state provide access to internal Spark SQL components, primarily used for advanced use cases and debugging.
241
242
## Integration with Hive
243
244
```scala
245
// Enable Hive support during session creation
246
val spark = SparkSession.builder()
247
.appName("Hive Integration")
248
.enableHiveSupport()
249
.getOrCreate()
250
251
// Access Hive tables
252
val hiveTable = spark.table("hive_database.hive_table")
253
254
// Execute HiveQL
255
val result = spark.sql("SHOW TABLES IN hive_database")
256
257
// Use Hive SerDes and file formats
258
val df = spark.read
259
.format("hive")
260
.option("inputFormat", "org.apache.hadoop.mapred.TextInputFormat")
261
.option("outputFormat", "org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat")
262
.option("serde", "org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe")
263
.load("/path/to/hive/table")
264
```