or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

cli-driver.mdcli-services.mdindex.mdmetadata-operations.mdoperation-management.mdserver-management.mdsession-management.mdsql-execution.mdweb-ui.md
tile.json

session-management.mddocs/

Session Management

Client session management with SQL context association, configuration handling, and comprehensive session lifecycle support for the Spark Hive Thrift Server.

Capabilities

SparkSQLSessionManager

Manages client sessions and their associated SQL contexts, providing session isolation and configuration management.

/**
 * Session manager for Spark SQL with HiveServer2 compatibility
 * Manages client sessions and their SQL contexts
 */
class SparkSQLSessionManager(hiveServer: HiveServer2, sqlContext: SQLContext) extends SessionManager(hiveServer) {
  /**
   * Open a new client session with optional impersonation
   * @param protocol Thrift protocol version
   * @param username Client username
   * @param passwd Client password
   * @param ipAddress Client IP address
   * @param sessionConf Session-specific configuration
   * @param withImpersonation Whether to enable user impersonation
   * @param delegationToken Delegation token for authentication
   * @return SessionHandle identifying the new session
   */
  def openSession(
    protocol: TProtocolVersion,
    username: String,
    passwd: String,
    ipAddress: String,
    sessionConf: java.util.Map[String, String],
    withImpersonation: Boolean,
    delegationToken: String
  ): SessionHandle
  
  /**
   * Close an existing session and clean up resources
   * @param sessionHandle Handle identifying the session to close
   */
  def closeSession(sessionHandle: SessionHandle): Unit
  
  /**
   * Set configuration parameters for a SQL context
   * @param conf The SQL context to configure
   * @param confMap Configuration key-value pairs
   */
  def setConfMap(conf: SQLContext, confMap: java.util.Map[String, String]): Unit
}

Usage Examples:

import org.apache.spark.sql.hive.thriftserver.SparkSQLSessionManager
import org.apache.hive.service.rpc.thrift.TProtocolVersion
import scala.collection.JavaConverters._

// Create session manager
val sessionManager = new SparkSQLSessionManager(hiveServer, SparkSQLEnv.sqlContext)

// Open a new session
val sessionConf = Map(
  "spark.sql.adaptive.enabled" -> "true",
  "spark.sql.adaptive.coalescePartitions.enabled" -> "true"
).asJava

val sessionHandle = sessionManager.openSession(
  protocol = TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V10,
  username = "spark_user",
  passwd = "",
  ipAddress = "192.168.1.100",
  sessionConf = sessionConf,
  withImpersonation = false,
  delegationToken = null
)

// Use the session...
// Close the session when done
sessionManager.closeSession(sessionHandle)

Session Handles

Session handles uniquely identify client sessions throughout their lifecycle.

/**
 * Handle identifying a client session
 */
class SessionHandle extends Handle {
  /**
   * Create a new session handle
   * @param handleId Unique handle identifier
   * @param protocol Thrift protocol version
   */
  public SessionHandle(HandleIdentifier handleId, TProtocolVersion protocol)
  
  /**
   * Get the protocol version for this session
   * @return TProtocolVersion used by the session
   */
  public TProtocolVersion getProtocolVersion()
  
  /**
   * Check if this session handle equals another
   * @param other Handle to compare with
   * @return true if handles are equal
   */
  public boolean equals(Object other)
  
  /**
   * Get string representation of the handle
   * @return String representation for logging/debugging
   */
  public String toString()
}

/**
 * Base handle class providing unique identification
 */
abstract class Handle {
  /**
   * Get the unique identifier for this handle
   * @return HandleIdentifier containing UUID and secret
   */
  public HandleIdentifier getHandleIdentifier()
}

/**
 * Unique identifier containing UUID and secret
 */
class HandleIdentifier {
  /**
   * Get the public UUID part of the identifier
   * @return UUID for this identifier
   */
  public UUID getPublicId()
  
  /**
   * Get the secret part of the identifier
   * @return UUID secret for security
   */
  public UUID getSecretId()
}

Protocol Version Management

The session manager supports multiple Thrift protocol versions for backward compatibility.

/**
 * Supported Thrift protocol versions
 */
enum TProtocolVersion {
  HIVE_CLI_SERVICE_PROTOCOL_V1,   // Original HiveServer2 protocol
  HIVE_CLI_SERVICE_PROTOCOL_V2,   // Added async operations
  HIVE_CLI_SERVICE_PROTOCOL_V3,   // Added get/set operations
  HIVE_CLI_SERVICE_PROTOCOL_V4,   // Added delegation tokens
  HIVE_CLI_SERVICE_PROTOCOL_V5,   // Added primary keys support
  HIVE_CLI_SERVICE_PROTOCOL_V6,   // Added cross reference support
  HIVE_CLI_SERVICE_PROTOCOL_V7,   // Enhanced metadata operations
  HIVE_CLI_SERVICE_PROTOCOL_V8,   // Added result set schema info
  HIVE_CLI_SERVICE_PROTOCOL_V9,   // Added query ID support
  HIVE_CLI_SERVICE_PROTOCOL_V10   // Latest version with all features
}

Usage Examples:

// Check protocol version capabilities
val sessionHandle = sessionManager.openSession(
  TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V10, // Use latest version
  "username", "", "127.0.0.1", Map.empty.asJava, false, null
)

val protocolVersion = sessionHandle.getProtocolVersion()
println(s"Session using protocol: $protocolVersion")

// Handle different protocol versions
protocolVersion match {
  case TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V10 =>
    // All features available
    println("Using latest protocol with full feature support")
  case v if v.getValue >= TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V6.getValue =>
    // Cross reference support available
    println("Cross reference operations supported")
  case _ =>
    // Limited functionality
    println("Using older protocol with limited features")
}

Session Configuration

Sessions support extensive configuration through Spark SQL and Hive parameters.

// Common session configuration parameters
val sessionConfig = Map(
  // Spark SQL configuration
  "spark.sql.adaptive.enabled" -> "true",
  "spark.sql.adaptive.coalescePartitions.enabled" -> "true", 
  "spark.sql.adaptive.skewJoin.enabled" -> "true",
  "spark.sql.execution.arrow.pyspark.enabled" -> "true",
  
  // Hive compatibility settings
  "hive.exec.dynamic.partition" -> "true",
  "hive.exec.dynamic.partition.mode" -> "nonstrict",
  "hive.support.concurrency" -> "false",
  
  // Session-specific settings
  "spark.sql.session.timeZone" -> "UTC",
  "spark.sql.warehouse.dir" -> "/spark-warehouse",
  "spark.sql.catalogImplementation" -> "hive"
).asJava

Session Lifecycle Management

Comprehensive session lifecycle with proper resource cleanup and error handling.

// Session states and lifecycle
abstract class SessionManager {
  /**
   * Initialize the session manager
   * @param hiveConf Hive configuration
   */
  def init(hiveConf: HiveConf): Unit
  
  /**
   * Start the session manager service
   */
  def start(): Unit
  
  /**
   * Stop the session manager and close all sessions
   */
  def stop(): Unit
  
  /**
   * Get session by handle
   * @param sessionHandle Handle identifying the session
   * @return HiveSession instance or null if not found
   */
  def getSession(sessionHandle: SessionHandle): HiveSession
}

Usage Examples:

import org.apache.hive.service.cli.session.HiveSession

// Session lifecycle management
val sessionManager = new SparkSQLSessionManager(hiveServer, sqlContext)

try {
  // Initialize and start
  sessionManager.init(hiveConf)
  sessionManager.start()
  
  // Open session
  val sessionHandle = sessionManager.openSession(
    TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V10,
    "user", "pass", "127.0.0.1", sessionConf, false, null
  )
  
  // Get session object for advanced operations
  val session = sessionManager.getSession(sessionHandle)
  if (session != null) {
    println(s"Session user: ${session.getUserName}")
    println(s"Session database: ${session.getCurrentDatabase}")
  }
  
  // Close session
  sessionManager.closeSession(sessionHandle)
  
} finally {
  // Clean shutdown
  sessionManager.stop()
}

User Impersonation

Support for user impersonation in secure environments.

// User impersonation configuration
val impersonationConfig = Map(
  "hive.server2.enable.doAs" -> "true",
  "hadoop.proxyuser.spark.hosts" -> "*",
  "hadoop.proxyuser.spark.groups" -> "*"
).asJava

// Open session with impersonation
val sessionHandle = sessionManager.openSession(
  protocol = TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V10,
  username = "spark",          // Service user
  passwd = "",
  ipAddress = "127.0.0.1",
  sessionConf = impersonationConfig,
  withImpersonation = true,    // Enable impersonation
  delegationToken = "actual_user_token"  // Token for actual user
)

Error Handling

Comprehensive error handling for session management operations.

/**
 * Exception thrown during session operations
 */
class HiveSQLException extends SQLException {
  public HiveSQLException(String reason, String sqlState, int vendorCode)
  public HiveSQLException(String reason, String sqlState, Throwable cause)
}

/**
 * Common error scenarios in session management
 */
// Session creation errors
public static final String SESSION_CREATION_ERROR = "08006";
public static final String INVALID_SESSION_HANDLE = "HY000";
public static final String SESSION_ALREADY_CLOSED = "HY010";
public static final String AUTHENTICATION_FAILED = "28000";

Error Handling Examples:

import org.apache.hive.service.cli.HiveSQLException

try {
  val sessionHandle = sessionManager.openSession(
    TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V10,
    username, password, ipAddress, sessionConf, false, null
  )
  
  // Session operations...
  
} catch {
  case e: HiveSQLException if e.getSqlState == "28000" =>
    println("Authentication failed")
  case e: HiveSQLException if e.getSqlState == "08006" =>
    println("Session creation failed")
  case e: HiveSQLException =>
    println(s"Session error: ${e.getMessage} (${e.getSqlState})")
}

Session Monitoring

Session monitoring and metrics collection for operational visibility.

// Session monitoring through Spark metrics
val activeSessionCount = sessionManager.getOpenSessionCount()
val sessionInfo = sessionManager.getSessionInfo(sessionHandle)

// Access session details
session.getCreationTime()     // When session was created
session.getLastAccessTime()   // Last activity timestamp  
session.getUserName()         // Session username
session.getCurrentDatabase()  // Current database context
session.getSessionConf()      // Session configuration