Session management handles client connections, SQL context lifecycle, and session-specific configuration for the Spark Hive Thrift Server.
Core session manager that extends Hive's session management with Spark SQL context integration.
private[hive] class SparkSQLSessionManager(hiveServer: HiveServer2, sqlContext: SQLContext)
extends SessionManager(hiveServer) with ReflectedCompositeService {
def init(hiveConf: HiveConf): Unit
def openSession(protocol: TProtocolVersion, username: String, passwd: String,
ipAddress: String, sessionConf: java.util.Map[String, String],
withImpersonation: Boolean, delegationToken: String): SessionHandle
def closeSession(sessionHandle: SessionHandle): Unit
}The init method sets up the operation manager for handling SQL queries within sessions:
override def init(hiveConf: HiveConf) {
setSuperField(this, "operationManager", sparkSqlOperationManager)
super.init(hiveConf)
}This creates a SparkSQLOperationManager instance that manages query execution operations for all sessions.
The openSession method creates new client sessions and associates them with Spark SQL contexts:
Usage Example:
import org.apache.hive.service.cli.thrift.TProtocolVersion
import java.util.{Map => JMap}
val sessionConf: JMap[String, String] = new java.util.HashMap()
sessionConf.put("use:database", "my_database")
val sessionHandle = sessionManager.openSession(
TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V10,
username = "user1",
passwd = "password",
ipAddress = "192.168.1.100",
sessionConf = sessionConf,
withImpersonation = false,
delegationToken = null
)Session Creation Process:
Sessions can use either shared or isolated SQL contexts:
val ctx = if (sqlContext.conf.hiveThriftServerSingleSession) {
sqlContext // Shared context across all sessions
} else {
sqlContext.newSession() // Isolated context per session
}Shared Context Mode:
Isolated Context Mode:
Sessions can specify an initial database to use:
if (sessionConf != null && sessionConf.containsKey("use:database")) {
ctx.sql(s"use ${sessionConf.get("use:database")}")
}This automatically executes a USE database statement when the session is created.
The closeSession method properly cleans up session resources:
override def closeSession(sessionHandle: SessionHandle) {
HiveThriftServer2.listener.onSessionClosed(sessionHandle.getSessionId.toString)
super.closeSession(sessionHandle)
sparkSqlOperationManager.sessionToActivePool.remove(sessionHandle)
sparkSqlOperationManager.sessionToContexts.remove(sessionHandle)
}Cleanup Process:
Data structure tracking session metadata and statistics:
private[thriftserver] class SessionInfo(
sessionId: String,
startTimestamp: Long,
ip: String,
userName: String
) {
var finishTimestamp: Long = 0L
var totalExecution: Int = 0
def totalTime: Long
}Session Creation:
def onSessionCreated(ip: String, sessionId: String, userName: String = "UNKNOWN"): Unit = {
synchronized {
val info = new SessionInfo(sessionId, System.currentTimeMillis, ip, userName)
sessionList.put(sessionId, info)
onlineSessionNum += 1
trimSessionIfNecessary()
}
}Session Closure:
def onSessionClosed(sessionId: String): Unit = synchronized {
sessionList(sessionId).finishTimestamp = System.currentTimeMillis
onlineSessionNum -= 1
trimSessionIfNecessary()
}The totalTime method calculates session duration:
def totalTime: Long = {
if (finishTimestamp == 0L) {
System.currentTimeMillis - startTimestamp // Active session
} else {
finishTimestamp - startTimestamp // Completed session
}
}Sessions support various authentication mechanisms:
Parameter Authentication:
Token-based Authentication:
Impersonation Support:
Sessions accept configuration parameters that override server defaults:
val sessionConf: JMap[String, String] = new java.util.HashMap()
sessionConf.put("hive.exec.dynamic.partition", "true")
sessionConf.put("spark.sql.adaptive.enabled", "false")
sessionConf.put("use:database", "analytics")Configuration Hierarchy:
The system maintains configurable limits for resource management:
private val retainedSessions = conf.getConf(SQLConf.THRIFTSERVER_UI_SESSION_LIMIT)
private def trimSessionIfNecessary() = {
if (sessionList.size > retainedSessions) {
val toRemove = math.max(retainedSessions / 10, 1)
sessionList.filter(_._2.finishTimestamp != 0).take(toRemove).foreach { s =>
sessionList.remove(s._1)
}
}
}This prevents memory leaks by automatically removing old session records.
Sessions are managed through connection pools for efficient resource utilization: