0
# Session Management
1
2
Client session lifecycle management with authentication, authorization, and per-session SQL context handling.
3
4
## Capabilities
5
6
### SparkSQLSessionManager
7
8
Session manager for Spark SQL thrift server, handling client session lifecycle and context management.
9
10
```scala { .api }
11
/**
12
* Session manager for Spark SQL thrift server
13
* @param hiveServer The HiveServer2 instance
14
* @param sqlContext The base SQL context
15
*/
16
private[hive] class SparkSQLSessionManager(hiveServer: HiveServer2, sqlContext: SQLContext)
17
extends SessionManager(hiveServer) with ReflectedCompositeService {
18
19
/**
20
* Initialize the session manager with Hive configuration
21
* @param hiveConf Hive configuration object
22
*/
23
def init(hiveConf: HiveConf): Unit
24
25
/**
26
* Open a new client session
27
* @param protocol Thrift protocol version
28
* @param username Client username
29
* @param passwd Client password
30
* @param ipAddress Client IP address
31
* @param sessionConf Session-specific configuration
32
* @param withImpersonation Whether to enable user impersonation
33
* @param delegationToken Delegation token for authentication
34
* @return Session handle for the new session
35
*/
36
def openSession(
37
protocol: TProtocolVersion,
38
username: String,
39
passwd: String,
40
ipAddress: String,
41
sessionConf: java.util.Map[String, String],
42
withImpersonation: Boolean,
43
delegationToken: String
44
): SessionHandle
45
46
/**
47
* Close an existing session
48
* @param sessionHandle Handle of the session to close
49
*/
50
def closeSession(sessionHandle: SessionHandle): Unit
51
}
52
```
53
54
**Usage Example:**
55
56
```scala
57
import org.apache.spark.sql.hive.thriftserver.{SparkSQLSessionManager, SparkSQLEnv}
58
import org.apache.hive.service.cli.thrift.TProtocolVersion
59
import java.util.HashMap
60
61
// Initialize environment and create session manager
62
SparkSQLEnv.init()
63
val sessionManager = new SparkSQLSessionManager(hiveServer, SparkSQLEnv.sqlContext)
64
sessionManager.init(hiveConf)
65
66
// Open a new session
67
val sessionConf = new HashMap[String, String]()
68
sessionConf.put("use:database", "my_database")
69
70
val sessionHandle = sessionManager.openSession(
71
TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V8,
72
"user123",
73
"password",
74
"192.168.1.100",
75
sessionConf,
76
false, // withImpersonation
77
null // delegationToken
78
)
79
80
// Session is now active and can be used for operations
81
// ...
82
83
// Close the session when done
84
sessionManager.closeSession(sessionHandle)
85
```
86
87
### Session Lifecycle
88
89
**Session Creation Process:**
90
1. **Authentication**: Validates user credentials and delegation tokens
91
2. **Context Creation**: Creates isolated or shared SQL context
92
3. **Configuration**: Applies session-specific configuration
93
4. **Database Context**: Sets initial database if specified
94
5. **Listener Notification**: Notifies monitoring listeners
95
6. **Handle Generation**: Returns unique session handle
96
97
**Session Context Management:**
98
```scala
99
// Determines whether to use shared or isolated contexts
100
val ctx = if (sqlContext.conf.hiveThriftServerSingleSession) {
101
sqlContext // Shared context across all sessions
102
} else {
103
sqlContext.newSession() // Isolated context per session
104
}
105
106
// Set Hive version compatibility
107
ctx.setConf(HiveUtils.FAKE_HIVE_VERSION.key, HiveUtils.builtinHiveVersion)
108
109
// Set initial database if specified
110
if (sessionConf != null && sessionConf.containsKey("use:database")) {
111
ctx.sql(s"use ${sessionConf.get("use:database")}")
112
}
113
```
114
115
**Session Cleanup Process:**
116
1. **Listener Notification**: Notifies session closure
117
2. **Operation Cleanup**: Closes pending operations
118
3. **Context Cleanup**: Removes session context mapping
119
4. **Resource Cleanup**: Cleans up session resources
120
121
### Authentication and Security
122
123
**Authentication Methods:**
124
- **NONE**: No authentication required
125
- **KERBEROS**: Kerberos-based authentication
126
- **CUSTOM**: Custom authentication providers
127
- **DELEGATION_TOKEN**: Token-based authentication
128
129
**Security Configuration:**
130
```scala
131
// Kerberos configuration
132
hiveConf.set("hive.server2.authentication", "KERBEROS")
133
hiveConf.set("hive.server2.authentication.kerberos.principal", "spark/_HOST@REALM")
134
hiveConf.set("hive.server2.authentication.kerberos.keytab", "/path/to/keytab")
135
136
// User impersonation
137
hiveConf.set("hive.server2.enable.doAs", "true")
138
```
139
140
**User Impersonation:**
141
```scala
142
// When impersonation is enabled, operations run as the authenticated user
143
// rather than the server process user
144
val sessionHandle = sessionManager.openSession(
145
protocol, username, passwd, ipAddress, sessionConf,
146
true, // withImpersonation = true
147
delegationToken
148
)
149
```
150
151
### Session Configuration
152
153
**Global Configuration:**
154
- Applied to all sessions
155
- Set through Spark configuration
156
- Includes resource limits and timeouts
157
158
**Session-Specific Configuration:**
159
```scala
160
// Configuration passed during session creation
161
val sessionConf = new HashMap[String, String]()
162
sessionConf.put("use:database", "analytics")
163
sessionConf.put("spark.sql.adaptive.enabled", "true")
164
sessionConf.put("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
165
```
166
167
**Runtime Configuration Updates:**
168
- Sessions can modify configuration during execution
169
- Changes are isolated per session (unless using single session mode)
170
- Configuration inheritance from parent context
171
172
### Context Isolation
173
174
**Single Session Mode:**
175
```scala
176
// All sessions share the same SQL context
177
spark.conf.set("spark.sql.hive.thriftServer.singleSession", "true")
178
179
// Benefits: Resource sharing, faster session creation
180
// Drawbacks: No isolation between sessions
181
```
182
183
**Multi-Session Mode (Default):**
184
```scala
185
// Each session gets its own SQL context
186
spark.conf.set("spark.sql.hive.thriftServer.singleSession", "false")
187
188
// Benefits: Session isolation, independent configuration
189
// Drawbacks: Higher resource usage per session
190
```
191
192
### Session Monitoring
193
194
**Session Information Tracking:**
195
```scala
196
// Sessions are tracked by the HiveThriftServer2Listener
197
HiveThriftServer2.listener.onSessionCreated(
198
session.getIpAddress,
199
sessionHandle.getSessionId.toString,
200
session.getUsername
201
)
202
203
// Session metrics available through listener
204
val onlineCount = HiveThriftServer2.listener.getOnlineSessionNum
205
val sessionList = HiveThriftServer2.listener.getSessionList
206
```
207
208
**Session Metrics:**
209
- Active session count
210
- Session duration tracking
211
- Operation counts per session
212
- Resource usage per session
213
214
### Integration with Operation Management
215
216
**Operation Context Mapping:**
217
```scala
218
// Sessions are mapped to SQL contexts for operation execution
219
sparkSqlOperationManager.sessionToContexts.put(sessionHandle, ctx)
220
221
// Active thread pool assignments
222
sparkSqlOperationManager.sessionToActivePool.put(sessionHandle, poolName)
223
```
224
225
**Session-Operation Lifecycle:**
226
1. Session creates operations through operation manager
227
2. Operations execute in session's SQL context
228
3. Results are returned to session
229
4. Session cleanup removes all associated operations
230
231
### Error Handling
232
233
**Session Creation Errors:**
234
- Authentication failures
235
- Resource allocation failures
236
- Configuration validation errors
237
- Database access errors
238
239
**Session Runtime Errors:**
240
- Connection timeouts
241
- Resource exhaustion
242
- Authentication token expiration
243
- Permission violations
244
245
**Error Recovery:**
246
```scala
247
try {
248
sessionManager.openSession(...)
249
} catch {
250
case authError: HiveSQLException =>
251
// Handle authentication errors
252
case configError: IllegalArgumentException =>
253
// Handle configuration errors
254
case resourceError: SQLException =>
255
// Handle resource allocation errors
256
}
257
```
258
259
### Resource Management
260
261
**Session Resource Limits:**
262
- Maximum concurrent sessions
263
- Memory limits per session
264
- Operation timeouts
265
- Connection idle timeouts
266
267
**Cleanup Policies:**
268
```scala
269
// Session timeout configuration
270
hiveConf.set("hive.server2.idle.session.timeout", "7200000") // 2 hours
271
hiveConf.set("hive.server2.idle.operation.timeout", "300000") // 5 minutes
272
273
// Session cleanup on server shutdown
274
ShutdownHookManager.addShutdownHook { () =>
275
// Close all active sessions
276
sessionManager.closeAllSessions()
277
}
278
```