or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

cli-interface.mdenvironment-management.mdindex.mdquery-operations.mdserver-management.mdsession-management.mdweb-ui-monitoring.md
tile.json

session-management.mddocs/

Session Management

Session management handles client connections, SQL context lifecycle, and session-specific configuration for the Spark Hive Thrift Server.

Session Manager

SparkSQLSessionManager

Core session manager that extends Hive's session management with Spark SQL context integration.

private[hive] class SparkSQLSessionManager(hiveServer: HiveServer2, sqlContext: SQLContext) 
  extends SessionManager(hiveServer) with ReflectedCompositeService {
  
  def init(hiveConf: HiveConf): Unit
  def openSession(protocol: TProtocolVersion, username: String, passwd: String, 
                 ipAddress: String, sessionConf: java.util.Map[String, String], 
                 withImpersonation: Boolean, delegationToken: String): SessionHandle
  def closeSession(sessionHandle: SessionHandle): Unit
}

Initialization

The init method sets up the operation manager for handling SQL queries within sessions:

override def init(hiveConf: HiveConf) {
  setSuperField(this, "operationManager", sparkSqlOperationManager)
  super.init(hiveConf)
}

This creates a SparkSQLOperationManager instance that manages query execution operations for all sessions.

Session Creation

The openSession method creates new client sessions and associates them with Spark SQL contexts:

Usage Example:

import org.apache.hive.service.cli.thrift.TProtocolVersion
import java.util.{Map => JMap}

val sessionConf: JMap[String, String] = new java.util.HashMap()
sessionConf.put("use:database", "my_database")

val sessionHandle = sessionManager.openSession(
  TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V10,
  username = "user1", 
  passwd = "password",
  ipAddress = "192.168.1.100",
  sessionConf = sessionConf,
  withImpersonation = false,
  delegationToken = null
)

Session Creation Process:

  1. Base Session: Creates Hive session handle using parent implementation
  2. Event Notification: Triggers session creation event for monitoring
  3. Context Assignment: Associates session with SQL context (shared or isolated)
  4. Configuration: Applies session-specific configuration
  5. Database Selection: Sets initial database if specified
  6. Registration: Registers session for operation management

Context Management

Sessions can use either shared or isolated SQL contexts:

val ctx = if (sqlContext.conf.hiveThriftServerSingleSession) {
  sqlContext  // Shared context across all sessions
} else {
  sqlContext.newSession()  // Isolated context per session
}

Shared Context Mode:

  • All sessions share the same SQL context
  • Better resource utilization
  • Potential for cross-session interference

Isolated Context Mode:

  • Each session gets its own SQL context
  • Session isolation and independence
  • Higher memory overhead

Database Context Setup

Sessions can specify an initial database to use:

if (sessionConf != null && sessionConf.containsKey("use:database")) {
  ctx.sql(s"use ${sessionConf.get("use:database")}")
}

This automatically executes a USE database statement when the session is created.

Session Closure

The closeSession method properly cleans up session resources:

override def closeSession(sessionHandle: SessionHandle) {
  HiveThriftServer2.listener.onSessionClosed(sessionHandle.getSessionId.toString)
  super.closeSession(sessionHandle)
  sparkSqlOperationManager.sessionToActivePool.remove(sessionHandle)
  sparkSqlOperationManager.sessionToContexts.remove(sessionHandle)
}

Cleanup Process:

  1. Event Notification: Triggers session closure event for monitoring
  2. Base Cleanup: Calls parent session cleanup logic
  3. Pool Removal: Removes session from active execution pools
  4. Context Cleanup: Removes SQL context association

Session Information Tracking

SessionInfo

Data structure tracking session metadata and statistics:

private[thriftserver] class SessionInfo(
  sessionId: String,
  startTimestamp: Long, 
  ip: String,
  userName: String
) {
  var finishTimestamp: Long = 0L
  var totalExecution: Int = 0
  def totalTime: Long
}

Session Lifecycle Tracking

Session Creation:

def onSessionCreated(ip: String, sessionId: String, userName: String = "UNKNOWN"): Unit = {
  synchronized {
    val info = new SessionInfo(sessionId, System.currentTimeMillis, ip, userName)
    sessionList.put(sessionId, info)
    onlineSessionNum += 1
    trimSessionIfNecessary()
  }
}

Session Closure:

def onSessionClosed(sessionId: String): Unit = synchronized {
  sessionList(sessionId).finishTimestamp = System.currentTimeMillis
  onlineSessionNum -= 1
  trimSessionIfNecessary()
}

Session Statistics

The totalTime method calculates session duration:

def totalTime: Long = {
  if (finishTimestamp == 0L) {
    System.currentTimeMillis - startTimestamp  // Active session
  } else {
    finishTimestamp - startTimestamp           // Completed session
  }
}

Authentication and Security

Credential Management

Sessions support various authentication mechanisms:

Parameter Authentication:

  • Username/password pairs
  • IP address validation
  • Session configuration parameters

Token-based Authentication:

  • Delegation tokens for secure cluster access
  • Token renewal and validation
  • Integration with Kerberos infrastructure

Impersonation Support:

  • User impersonation for proxy scenarios
  • Security context switching
  • Authorization delegation

Session Configuration

Sessions accept configuration parameters that override server defaults:

val sessionConf: JMap[String, String] = new java.util.HashMap()
sessionConf.put("hive.exec.dynamic.partition", "true")
sessionConf.put("spark.sql.adaptive.enabled", "false")
sessionConf.put("use:database", "analytics")

Configuration Hierarchy:

  1. Server Defaults: Base Hive and Spark configuration
  2. Session Overrides: Client-specified parameters
  3. Operation Overrides: Query-specific configuration

Performance and Resource Management

Session Limits

The system maintains configurable limits for resource management:

private val retainedSessions = conf.getConf(SQLConf.THRIFTSERVER_UI_SESSION_LIMIT)

private def trimSessionIfNecessary() = {
  if (sessionList.size > retainedSessions) {
    val toRemove = math.max(retainedSessions / 10, 1)
    sessionList.filter(_._2.finishTimestamp != 0).take(toRemove).foreach { s =>
      sessionList.remove(s._1)
    }
  }
}

This prevents memory leaks by automatically removing old session records.

Connection Pooling

Sessions are managed through connection pools for efficient resource utilization:

  • Pool Assignment: Sessions assigned to execution pools based on workload
  • Load Balancing: Distribution of sessions across available resources
  • Pool Monitoring: Tracking of pool utilization and performance

Memory Management

  • Context Isolation: Prevents memory leaks between sessions
  • Resource Cleanup: Automatic cleanup of session resources
  • Garbage Collection: Efficient cleanup of temporary objects