0
# Session Management
1
2
Session management handles client connections, SQL context lifecycle, and session-specific configuration for the Spark Hive Thrift Server.
3
4
## Session Manager
5
6
### SparkSQLSessionManager
7
8
Core session manager that extends Hive's session management with Spark SQL context integration.
9
10
```scala { .api }
11
private[hive] class SparkSQLSessionManager(hiveServer: HiveServer2, sqlContext: SQLContext)
12
extends SessionManager(hiveServer) with ReflectedCompositeService {
13
14
def init(hiveConf: HiveConf): Unit
15
def openSession(protocol: TProtocolVersion, username: String, passwd: String,
16
ipAddress: String, sessionConf: java.util.Map[String, String],
17
withImpersonation: Boolean, delegationToken: String): SessionHandle
18
def closeSession(sessionHandle: SessionHandle): Unit
19
}
20
```
21
22
#### Initialization
23
24
The `init` method sets up the operation manager for handling SQL queries within sessions:
25
26
```scala
27
override def init(hiveConf: HiveConf) {
28
setSuperField(this, "operationManager", sparkSqlOperationManager)
29
super.init(hiveConf)
30
}
31
```
32
33
This creates a `SparkSQLOperationManager` instance that manages query execution operations for all sessions.
34
35
#### Session Creation
36
37
The `openSession` method creates new client sessions and associates them with Spark SQL contexts:
38
39
**Usage Example:**
40
41
```scala
42
import org.apache.hive.service.cli.thrift.TProtocolVersion
43
import java.util.{Map => JMap}
44
45
val sessionConf: JMap[String, String] = new java.util.HashMap()
46
sessionConf.put("use:database", "my_database")
47
48
val sessionHandle = sessionManager.openSession(
49
TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V10,
50
username = "user1",
51
passwd = "password",
52
ipAddress = "192.168.1.100",
53
sessionConf = sessionConf,
54
withImpersonation = false,
55
delegationToken = null
56
)
57
```
58
59
**Session Creation Process:**
60
61
1. **Base Session**: Creates Hive session handle using parent implementation
62
2. **Event Notification**: Triggers session creation event for monitoring
63
3. **Context Assignment**: Associates session with SQL context (shared or isolated)
64
4. **Configuration**: Applies session-specific configuration
65
5. **Database Selection**: Sets initial database if specified
66
6. **Registration**: Registers session for operation management
67
68
#### Context Management
69
70
Sessions can use either shared or isolated SQL contexts:
71
72
```scala
73
val ctx = if (sqlContext.conf.hiveThriftServerSingleSession) {
74
sqlContext // Shared context across all sessions
75
} else {
76
sqlContext.newSession() // Isolated context per session
77
}
78
```
79
80
**Shared Context Mode:**
81
- All sessions share the same SQL context
82
- Better resource utilization
83
- Potential for cross-session interference
84
85
**Isolated Context Mode:**
86
- Each session gets its own SQL context
87
- Session isolation and independence
88
- Higher memory overhead
89
90
#### Database Context Setup
91
92
Sessions can specify an initial database to use:
93
94
```scala
95
if (sessionConf != null && sessionConf.containsKey("use:database")) {
96
ctx.sql(s"use ${sessionConf.get("use:database")}")
97
}
98
```
99
100
This automatically executes a `USE database` statement when the session is created.
101
102
#### Session Closure
103
104
The `closeSession` method properly cleans up session resources:
105
106
```scala
107
override def closeSession(sessionHandle: SessionHandle) {
108
HiveThriftServer2.listener.onSessionClosed(sessionHandle.getSessionId.toString)
109
super.closeSession(sessionHandle)
110
sparkSqlOperationManager.sessionToActivePool.remove(sessionHandle)
111
sparkSqlOperationManager.sessionToContexts.remove(sessionHandle)
112
}
113
```
114
115
**Cleanup Process:**
116
1. **Event Notification**: Triggers session closure event for monitoring
117
2. **Base Cleanup**: Calls parent session cleanup logic
118
3. **Pool Removal**: Removes session from active execution pools
119
4. **Context Cleanup**: Removes SQL context association
120
121
## Session Information Tracking
122
123
### SessionInfo
124
125
Data structure tracking session metadata and statistics:
126
127
```scala { .api }
128
private[thriftserver] class SessionInfo(
129
sessionId: String,
130
startTimestamp: Long,
131
ip: String,
132
userName: String
133
) {
134
var finishTimestamp: Long = 0L
135
var totalExecution: Int = 0
136
def totalTime: Long
137
}
138
```
139
140
#### Session Lifecycle Tracking
141
142
**Session Creation:**
143
```scala
144
def onSessionCreated(ip: String, sessionId: String, userName: String = "UNKNOWN"): Unit = {
145
synchronized {
146
val info = new SessionInfo(sessionId, System.currentTimeMillis, ip, userName)
147
sessionList.put(sessionId, info)
148
onlineSessionNum += 1
149
trimSessionIfNecessary()
150
}
151
}
152
```
153
154
**Session Closure:**
155
```scala
156
def onSessionClosed(sessionId: String): Unit = synchronized {
157
sessionList(sessionId).finishTimestamp = System.currentTimeMillis
158
onlineSessionNum -= 1
159
trimSessionIfNecessary()
160
}
161
```
162
163
#### Session Statistics
164
165
The `totalTime` method calculates session duration:
166
167
```scala
168
def totalTime: Long = {
169
if (finishTimestamp == 0L) {
170
System.currentTimeMillis - startTimestamp // Active session
171
} else {
172
finishTimestamp - startTimestamp // Completed session
173
}
174
}
175
```
176
177
## Authentication and Security
178
179
### Credential Management
180
181
Sessions support various authentication mechanisms:
182
183
**Parameter Authentication:**
184
- Username/password pairs
185
- IP address validation
186
- Session configuration parameters
187
188
**Token-based Authentication:**
189
- Delegation tokens for secure cluster access
190
- Token renewal and validation
191
- Integration with Kerberos infrastructure
192
193
**Impersonation Support:**
194
- User impersonation for proxy scenarios
195
- Security context switching
196
- Authorization delegation
197
198
### Session Configuration
199
200
Sessions accept configuration parameters that override server defaults:
201
202
```scala
203
val sessionConf: JMap[String, String] = new java.util.HashMap()
204
sessionConf.put("hive.exec.dynamic.partition", "true")
205
sessionConf.put("spark.sql.adaptive.enabled", "false")
206
sessionConf.put("use:database", "analytics")
207
```
208
209
**Configuration Hierarchy:**
210
1. **Server Defaults**: Base Hive and Spark configuration
211
2. **Session Overrides**: Client-specified parameters
212
3. **Operation Overrides**: Query-specific configuration
213
214
## Performance and Resource Management
215
216
### Session Limits
217
218
The system maintains configurable limits for resource management:
219
220
```scala
221
private val retainedSessions = conf.getConf(SQLConf.THRIFTSERVER_UI_SESSION_LIMIT)
222
223
private def trimSessionIfNecessary() = {
224
if (sessionList.size > retainedSessions) {
225
val toRemove = math.max(retainedSessions / 10, 1)
226
sessionList.filter(_._2.finishTimestamp != 0).take(toRemove).foreach { s =>
227
sessionList.remove(s._1)
228
}
229
}
230
}
231
```
232
233
This prevents memory leaks by automatically removing old session records.
234
235
### Connection Pooling
236
237
Sessions are managed through connection pools for efficient resource utilization:
238
239
- **Pool Assignment**: Sessions assigned to execution pools based on workload
240
- **Load Balancing**: Distribution of sessions across available resources
241
- **Pool Monitoring**: Tracking of pool utilization and performance
242
243
### Memory Management
244
245
- **Context Isolation**: Prevents memory leaks between sessions
246
- **Resource Cleanup**: Automatic cleanup of session resources
247
- **Garbage Collection**: Efficient cleanup of temporary objects