Client session management with SQL context association, configuration handling, and comprehensive session lifecycle support for the Spark Hive Thrift Server.
Manages client sessions and their associated SQL contexts, providing session isolation and configuration management.
/**
* Session manager for Spark SQL with HiveServer2 compatibility
* Manages client sessions and their SQL contexts
*/
class SparkSQLSessionManager(hiveServer: HiveServer2, sqlContext: SQLContext) extends SessionManager(hiveServer) {
/**
* Open a new client session with optional impersonation
* @param protocol Thrift protocol version
* @param username Client username
* @param passwd Client password
* @param ipAddress Client IP address
* @param sessionConf Session-specific configuration
* @param withImpersonation Whether to enable user impersonation
* @param delegationToken Delegation token for authentication
* @return SessionHandle identifying the new session
*/
def openSession(
protocol: TProtocolVersion,
username: String,
passwd: String,
ipAddress: String,
sessionConf: java.util.Map[String, String],
withImpersonation: Boolean,
delegationToken: String
): SessionHandle
/**
* Close an existing session and clean up resources
* @param sessionHandle Handle identifying the session to close
*/
def closeSession(sessionHandle: SessionHandle): Unit
/**
* Set configuration parameters for a SQL context
* @param conf The SQL context to configure
* @param confMap Configuration key-value pairs
*/
def setConfMap(conf: SQLContext, confMap: java.util.Map[String, String]): Unit
}Usage Examples:
import org.apache.spark.sql.hive.thriftserver.SparkSQLSessionManager
import org.apache.hive.service.rpc.thrift.TProtocolVersion
import scala.collection.JavaConverters._
// Create session manager
val sessionManager = new SparkSQLSessionManager(hiveServer, SparkSQLEnv.sqlContext)
// Open a new session
val sessionConf = Map(
"spark.sql.adaptive.enabled" -> "true",
"spark.sql.adaptive.coalescePartitions.enabled" -> "true"
).asJava
val sessionHandle = sessionManager.openSession(
protocol = TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V10,
username = "spark_user",
passwd = "",
ipAddress = "192.168.1.100",
sessionConf = sessionConf,
withImpersonation = false,
delegationToken = null
)
// Use the session...
// Close the session when done
sessionManager.closeSession(sessionHandle)Session handles uniquely identify client sessions throughout their lifecycle.
/**
* Handle identifying a client session
*/
class SessionHandle extends Handle {
/**
* Create a new session handle
* @param handleId Unique handle identifier
* @param protocol Thrift protocol version
*/
public SessionHandle(HandleIdentifier handleId, TProtocolVersion protocol)
/**
* Get the protocol version for this session
* @return TProtocolVersion used by the session
*/
public TProtocolVersion getProtocolVersion()
/**
* Check if this session handle equals another
* @param other Handle to compare with
* @return true if handles are equal
*/
public boolean equals(Object other)
/**
* Get string representation of the handle
* @return String representation for logging/debugging
*/
public String toString()
}
/**
* Base handle class providing unique identification
*/
abstract class Handle {
/**
* Get the unique identifier for this handle
* @return HandleIdentifier containing UUID and secret
*/
public HandleIdentifier getHandleIdentifier()
}
/**
* Unique identifier containing UUID and secret
*/
class HandleIdentifier {
/**
* Get the public UUID part of the identifier
* @return UUID for this identifier
*/
public UUID getPublicId()
/**
* Get the secret part of the identifier
* @return UUID secret for security
*/
public UUID getSecretId()
}The session manager supports multiple Thrift protocol versions for backward compatibility.
/**
* Supported Thrift protocol versions
*/
enum TProtocolVersion {
HIVE_CLI_SERVICE_PROTOCOL_V1, // Original HiveServer2 protocol
HIVE_CLI_SERVICE_PROTOCOL_V2, // Added async operations
HIVE_CLI_SERVICE_PROTOCOL_V3, // Added get/set operations
HIVE_CLI_SERVICE_PROTOCOL_V4, // Added delegation tokens
HIVE_CLI_SERVICE_PROTOCOL_V5, // Added primary keys support
HIVE_CLI_SERVICE_PROTOCOL_V6, // Added cross reference support
HIVE_CLI_SERVICE_PROTOCOL_V7, // Enhanced metadata operations
HIVE_CLI_SERVICE_PROTOCOL_V8, // Added result set schema info
HIVE_CLI_SERVICE_PROTOCOL_V9, // Added query ID support
HIVE_CLI_SERVICE_PROTOCOL_V10 // Latest version with all features
}Usage Examples:
// Check protocol version capabilities
val sessionHandle = sessionManager.openSession(
TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V10, // Use latest version
"username", "", "127.0.0.1", Map.empty.asJava, false, null
)
val protocolVersion = sessionHandle.getProtocolVersion()
println(s"Session using protocol: $protocolVersion")
// Handle different protocol versions
protocolVersion match {
case TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V10 =>
// All features available
println("Using latest protocol with full feature support")
case v if v.getValue >= TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V6.getValue =>
// Cross reference support available
println("Cross reference operations supported")
case _ =>
// Limited functionality
println("Using older protocol with limited features")
}Sessions support extensive configuration through Spark SQL and Hive parameters.
// Common session configuration parameters
val sessionConfig = Map(
// Spark SQL configuration
"spark.sql.adaptive.enabled" -> "true",
"spark.sql.adaptive.coalescePartitions.enabled" -> "true",
"spark.sql.adaptive.skewJoin.enabled" -> "true",
"spark.sql.execution.arrow.pyspark.enabled" -> "true",
// Hive compatibility settings
"hive.exec.dynamic.partition" -> "true",
"hive.exec.dynamic.partition.mode" -> "nonstrict",
"hive.support.concurrency" -> "false",
// Session-specific settings
"spark.sql.session.timeZone" -> "UTC",
"spark.sql.warehouse.dir" -> "/spark-warehouse",
"spark.sql.catalogImplementation" -> "hive"
).asJavaComprehensive session lifecycle with proper resource cleanup and error handling.
// Session states and lifecycle
abstract class SessionManager {
/**
* Initialize the session manager
* @param hiveConf Hive configuration
*/
def init(hiveConf: HiveConf): Unit
/**
* Start the session manager service
*/
def start(): Unit
/**
* Stop the session manager and close all sessions
*/
def stop(): Unit
/**
* Get session by handle
* @param sessionHandle Handle identifying the session
* @return HiveSession instance or null if not found
*/
def getSession(sessionHandle: SessionHandle): HiveSession
}Usage Examples:
import org.apache.hive.service.cli.session.HiveSession
// Session lifecycle management
val sessionManager = new SparkSQLSessionManager(hiveServer, sqlContext)
try {
// Initialize and start
sessionManager.init(hiveConf)
sessionManager.start()
// Open session
val sessionHandle = sessionManager.openSession(
TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V10,
"user", "pass", "127.0.0.1", sessionConf, false, null
)
// Get session object for advanced operations
val session = sessionManager.getSession(sessionHandle)
if (session != null) {
println(s"Session user: ${session.getUserName}")
println(s"Session database: ${session.getCurrentDatabase}")
}
// Close session
sessionManager.closeSession(sessionHandle)
} finally {
// Clean shutdown
sessionManager.stop()
}Support for user impersonation in secure environments.
// User impersonation configuration
val impersonationConfig = Map(
"hive.server2.enable.doAs" -> "true",
"hadoop.proxyuser.spark.hosts" -> "*",
"hadoop.proxyuser.spark.groups" -> "*"
).asJava
// Open session with impersonation
val sessionHandle = sessionManager.openSession(
protocol = TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V10,
username = "spark", // Service user
passwd = "",
ipAddress = "127.0.0.1",
sessionConf = impersonationConfig,
withImpersonation = true, // Enable impersonation
delegationToken = "actual_user_token" // Token for actual user
)Comprehensive error handling for session management operations.
/**
* Exception thrown during session operations
*/
class HiveSQLException extends SQLException {
public HiveSQLException(String reason, String sqlState, int vendorCode)
public HiveSQLException(String reason, String sqlState, Throwable cause)
}
/**
* Common error scenarios in session management
*/
// Session creation errors
public static final String SESSION_CREATION_ERROR = "08006";
public static final String INVALID_SESSION_HANDLE = "HY000";
public static final String SESSION_ALREADY_CLOSED = "HY010";
public static final String AUTHENTICATION_FAILED = "28000";Error Handling Examples:
import org.apache.hive.service.cli.HiveSQLException
try {
val sessionHandle = sessionManager.openSession(
TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V10,
username, password, ipAddress, sessionConf, false, null
)
// Session operations...
} catch {
case e: HiveSQLException if e.getSqlState == "28000" =>
println("Authentication failed")
case e: HiveSQLException if e.getSqlState == "08006" =>
println("Session creation failed")
case e: HiveSQLException =>
println(s"Session error: ${e.getMessage} (${e.getSqlState})")
}Session monitoring and metrics collection for operational visibility.
// Session monitoring through Spark metrics
val activeSessionCount = sessionManager.getOpenSessionCount()
val sessionInfo = sessionManager.getSessionInfo(sessionHandle)
// Access session details
session.getCreationTime() // When session was created
session.getLastAccessTime() // Last activity timestamp
session.getUserName() // Session username
session.getCurrentDatabase() // Current database context
session.getSessionConf() // Session configuration