0
# Session Management
1
2
Client session management with SQL context association, configuration handling, and comprehensive session lifecycle support for the Spark Hive Thrift Server.
3
4
## Capabilities
5
6
### SparkSQLSessionManager
7
8
Manages client sessions and their associated SQL contexts, providing session isolation and configuration management.
9
10
```scala { .api }
11
/**
12
* Session manager for Spark SQL with HiveServer2 compatibility
13
* Manages client sessions and their SQL contexts
14
*/
15
class SparkSQLSessionManager(hiveServer: HiveServer2, sqlContext: SQLContext) extends SessionManager(hiveServer) {
16
/**
17
* Open a new client session with optional impersonation
18
* @param protocol Thrift protocol version
19
* @param username Client username
20
* @param passwd Client password
21
* @param ipAddress Client IP address
22
* @param sessionConf Session-specific configuration
23
* @param withImpersonation Whether to enable user impersonation
24
* @param delegationToken Delegation token for authentication
25
* @return SessionHandle identifying the new session
26
*/
27
def openSession(
28
protocol: TProtocolVersion,
29
username: String,
30
passwd: String,
31
ipAddress: String,
32
sessionConf: java.util.Map[String, String],
33
withImpersonation: Boolean,
34
delegationToken: String
35
): SessionHandle
36
37
/**
38
* Close an existing session and clean up resources
39
* @param sessionHandle Handle identifying the session to close
40
*/
41
def closeSession(sessionHandle: SessionHandle): Unit
42
43
/**
44
* Set configuration parameters for a SQL context
45
* @param conf The SQL context to configure
46
* @param confMap Configuration key-value pairs
47
*/
48
def setConfMap(conf: SQLContext, confMap: java.util.Map[String, String]): Unit
49
}
50
```
51
52
**Usage Examples:**
53
54
```scala
55
import org.apache.spark.sql.hive.thriftserver.SparkSQLSessionManager
56
import org.apache.hive.service.rpc.thrift.TProtocolVersion
57
import scala.collection.JavaConverters._
58
59
// Create session manager
60
val sessionManager = new SparkSQLSessionManager(hiveServer, SparkSQLEnv.sqlContext)
61
62
// Open a new session
63
val sessionConf = Map(
64
"spark.sql.adaptive.enabled" -> "true",
65
"spark.sql.adaptive.coalescePartitions.enabled" -> "true"
66
).asJava
67
68
val sessionHandle = sessionManager.openSession(
69
protocol = TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V10,
70
username = "spark_user",
71
passwd = "",
72
ipAddress = "192.168.1.100",
73
sessionConf = sessionConf,
74
withImpersonation = false,
75
delegationToken = null
76
)
77
78
// Use the session...
79
// Close the session when done
80
sessionManager.closeSession(sessionHandle)
81
```
82
83
### Session Handles
84
85
Session handles uniquely identify client sessions throughout their lifecycle.
86
87
```java { .api }
88
/**
89
* Handle identifying a client session
90
*/
91
class SessionHandle extends Handle {
92
/**
93
* Create a new session handle
94
* @param handleId Unique handle identifier
95
* @param protocol Thrift protocol version
96
*/
97
public SessionHandle(HandleIdentifier handleId, TProtocolVersion protocol)
98
99
/**
100
* Get the protocol version for this session
101
* @return TProtocolVersion used by the session
102
*/
103
public TProtocolVersion getProtocolVersion()
104
105
/**
106
* Check if this session handle equals another
107
* @param other Handle to compare with
108
* @return true if handles are equal
109
*/
110
public boolean equals(Object other)
111
112
/**
113
* Get string representation of the handle
114
* @return String representation for logging/debugging
115
*/
116
public String toString()
117
}
118
119
/**
120
* Base handle class providing unique identification
121
*/
122
abstract class Handle {
123
/**
124
* Get the unique identifier for this handle
125
* @return HandleIdentifier containing UUID and secret
126
*/
127
public HandleIdentifier getHandleIdentifier()
128
}
129
130
/**
131
* Unique identifier containing UUID and secret
132
*/
133
class HandleIdentifier {
134
/**
135
* Get the public UUID part of the identifier
136
* @return UUID for this identifier
137
*/
138
public UUID getPublicId()
139
140
/**
141
* Get the secret part of the identifier
142
* @return UUID secret for security
143
*/
144
public UUID getSecretId()
145
}
146
```
147
148
### Protocol Version Management
149
150
The session manager supports multiple Thrift protocol versions for backward compatibility.
151
152
```java { .api }
153
/**
154
* Supported Thrift protocol versions
155
*/
156
enum TProtocolVersion {
157
HIVE_CLI_SERVICE_PROTOCOL_V1, // Original HiveServer2 protocol
158
HIVE_CLI_SERVICE_PROTOCOL_V2, // Added async operations
159
HIVE_CLI_SERVICE_PROTOCOL_V3, // Added get/set operations
160
HIVE_CLI_SERVICE_PROTOCOL_V4, // Added delegation tokens
161
HIVE_CLI_SERVICE_PROTOCOL_V5, // Added primary keys support
162
HIVE_CLI_SERVICE_PROTOCOL_V6, // Added cross reference support
163
HIVE_CLI_SERVICE_PROTOCOL_V7, // Enhanced metadata operations
164
HIVE_CLI_SERVICE_PROTOCOL_V8, // Added result set schema info
165
HIVE_CLI_SERVICE_PROTOCOL_V9, // Added query ID support
166
HIVE_CLI_SERVICE_PROTOCOL_V10 // Latest version with all features
167
}
168
```
169
170
**Usage Examples:**
171
172
```scala
173
// Check protocol version capabilities
174
val sessionHandle = sessionManager.openSession(
175
TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V10, // Use latest version
176
"username", "", "127.0.0.1", Map.empty.asJava, false, null
177
)
178
179
val protocolVersion = sessionHandle.getProtocolVersion()
180
println(s"Session using protocol: $protocolVersion")
181
182
// Handle different protocol versions
183
protocolVersion match {
184
case TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V10 =>
185
// All features available
186
println("Using latest protocol with full feature support")
187
case v if v.getValue >= TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V6.getValue =>
188
// Cross reference support available
189
println("Cross reference operations supported")
190
case _ =>
191
// Limited functionality
192
println("Using older protocol with limited features")
193
}
194
```
195
196
### Session Configuration
197
198
Sessions support extensive configuration through Spark SQL and Hive parameters.
199
200
```scala { .api }
201
// Common session configuration parameters
202
val sessionConfig = Map(
203
// Spark SQL configuration
204
"spark.sql.adaptive.enabled" -> "true",
205
"spark.sql.adaptive.coalescePartitions.enabled" -> "true",
206
"spark.sql.adaptive.skewJoin.enabled" -> "true",
207
"spark.sql.execution.arrow.pyspark.enabled" -> "true",
208
209
// Hive compatibility settings
210
"hive.exec.dynamic.partition" -> "true",
211
"hive.exec.dynamic.partition.mode" -> "nonstrict",
212
"hive.support.concurrency" -> "false",
213
214
// Session-specific settings
215
"spark.sql.session.timeZone" -> "UTC",
216
"spark.sql.warehouse.dir" -> "/spark-warehouse",
217
"spark.sql.catalogImplementation" -> "hive"
218
).asJava
219
```
220
221
### Session Lifecycle Management
222
223
Comprehensive session lifecycle with proper resource cleanup and error handling.
224
225
```scala { .api }
226
// Session states and lifecycle
227
abstract class SessionManager {
228
/**
229
* Initialize the session manager
230
* @param hiveConf Hive configuration
231
*/
232
def init(hiveConf: HiveConf): Unit
233
234
/**
235
* Start the session manager service
236
*/
237
def start(): Unit
238
239
/**
240
* Stop the session manager and close all sessions
241
*/
242
def stop(): Unit
243
244
/**
245
* Get session by handle
246
* @param sessionHandle Handle identifying the session
247
* @return HiveSession instance or null if not found
248
*/
249
def getSession(sessionHandle: SessionHandle): HiveSession
250
}
251
```
252
253
**Usage Examples:**
254
255
```scala
256
import org.apache.hive.service.cli.session.HiveSession
257
258
// Session lifecycle management
259
val sessionManager = new SparkSQLSessionManager(hiveServer, sqlContext)
260
261
try {
262
// Initialize and start
263
sessionManager.init(hiveConf)
264
sessionManager.start()
265
266
// Open session
267
val sessionHandle = sessionManager.openSession(
268
TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V10,
269
"user", "pass", "127.0.0.1", sessionConf, false, null
270
)
271
272
// Get session object for advanced operations
273
val session = sessionManager.getSession(sessionHandle)
274
if (session != null) {
275
println(s"Session user: ${session.getUserName}")
276
println(s"Session database: ${session.getCurrentDatabase}")
277
}
278
279
// Close session
280
sessionManager.closeSession(sessionHandle)
281
282
} finally {
283
// Clean shutdown
284
sessionManager.stop()
285
}
286
```
287
288
### User Impersonation
289
290
Support for user impersonation in secure environments.
291
292
```scala { .api }
293
// User impersonation configuration
294
val impersonationConfig = Map(
295
"hive.server2.enable.doAs" -> "true",
296
"hadoop.proxyuser.spark.hosts" -> "*",
297
"hadoop.proxyuser.spark.groups" -> "*"
298
).asJava
299
300
// Open session with impersonation
301
val sessionHandle = sessionManager.openSession(
302
protocol = TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V10,
303
username = "spark", // Service user
304
passwd = "",
305
ipAddress = "127.0.0.1",
306
sessionConf = impersonationConfig,
307
withImpersonation = true, // Enable impersonation
308
delegationToken = "actual_user_token" // Token for actual user
309
)
310
```
311
312
### Error Handling
313
314
Comprehensive error handling for session management operations.
315
316
```java { .api }
317
/**
318
* Exception thrown during session operations
319
*/
320
class HiveSQLException extends SQLException {
321
public HiveSQLException(String reason, String sqlState, int vendorCode)
322
public HiveSQLException(String reason, String sqlState, Throwable cause)
323
}
324
325
/**
326
* Common error scenarios in session management
327
*/
328
// Session creation errors
329
public static final String SESSION_CREATION_ERROR = "08006";
330
public static final String INVALID_SESSION_HANDLE = "HY000";
331
public static final String SESSION_ALREADY_CLOSED = "HY010";
332
public static final String AUTHENTICATION_FAILED = "28000";
333
```
334
335
**Error Handling Examples:**
336
337
```scala
338
import org.apache.hive.service.cli.HiveSQLException
339
340
try {
341
val sessionHandle = sessionManager.openSession(
342
TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V10,
343
username, password, ipAddress, sessionConf, false, null
344
)
345
346
// Session operations...
347
348
} catch {
349
case e: HiveSQLException if e.getSqlState == "28000" =>
350
println("Authentication failed")
351
case e: HiveSQLException if e.getSqlState == "08006" =>
352
println("Session creation failed")
353
case e: HiveSQLException =>
354
println(s"Session error: ${e.getMessage} (${e.getSqlState})")
355
}
356
```
357
358
### Session Monitoring
359
360
Session monitoring and metrics collection for operational visibility.
361
362
```scala { .api }
363
// Session monitoring through Spark metrics
364
val activeSessionCount = sessionManager.getOpenSessionCount()
365
val sessionInfo = sessionManager.getSessionInfo(sessionHandle)
366
367
// Access session details
368
session.getCreationTime() // When session was created
369
session.getLastAccessTime() // Last activity timestamp
370
session.getUserName() // Session username
371
session.getCurrentDatabase() // Current database context
372
session.getSessionConf() // Session configuration
373
```