or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

cli-interface.mdenvironment-management.mdindex.mdquery-operations.mdserver-management.mdsession-management.mdweb-ui-monitoring.md
tile.json

server-management.mddocs/

Server Management

The Spark Hive Thrift Server provides two primary entry points for server lifecycle management: programmatic initialization and command-line startup.

Main Server Object

HiveThriftServer2

Central singleton object managing server lifecycle, monitoring, and event handling.

object HiveThriftServer2 extends Logging {
  var uiTab: Option[ThriftServerTab]
  var listener: HiveThriftServer2Listener
  
  @DeveloperApi
  def startWithContext(sqlContext: SQLContext): Unit
  def main(args: Array[String]): Unit
}

startWithContext

Programmatically starts a new thrift server with the provided SQL context. This method is annotated with @DeveloperApi and intended for embedding the server in custom applications.

Usage Example:

import org.apache.spark.sql.hive.thriftserver.HiveThriftServer2
import org.apache.spark.sql.SQLContext
import org.apache.spark.{SparkConf, SparkContext}

val conf = new SparkConf()
  .setAppName("MyThriftServer")
  .setMaster("local[*]")

val sc = new SparkContext(conf)  
val sqlContext = new SQLContext(sc)

// Start the server
HiveThriftServer2.startWithContext(sqlContext)

// Server is now running and accepting connections

The method performs the following initialization:

  1. Creates a new HiveThriftServer2 instance
  2. Initializes Hive execution client
  3. Starts the server services
  4. Sets up event listener for monitoring
  5. Attaches web UI tab if enabled

main

Command-line entry point that initializes the Spark environment and starts the server with default configuration.

Usage Example:

# Direct invocation (typically done via scripts)
scala -cp $SPARK_CLASSPATH org.apache.spark.sql.hive.thriftserver.HiveThriftServer2

The main method:

  1. Processes command-line options using Hive's ServerOptionsProcessor
  2. Initializes SparkSQLEnv
  3. Sets up shutdown hooks for cleanup
  4. Creates and starts the server
  5. Registers monitoring components

Server Implementation

HiveThriftServer2 Class

Internal server implementation extending Hive's HiveServer2 with Spark SQL integration.

private[hive] class HiveThriftServer2(sqlContext: SQLContext) extends HiveServer2 with ReflectedCompositeService {
  def init(hiveConf: HiveConf): Unit
  def start(): Unit  
  def stop(): Unit
}

Initialization Process

The init method configures transport layer and service components:

  1. CLI Service Setup: Creates SparkSQLCLIService with SQL context integration
  2. Transport Configuration: Selects HTTP or binary transport based on configuration
  3. Service Registration: Registers CLI and transport services
  4. Composite Initialization: Initializes all registered services

Transport Mode Selection

The server automatically selects transport mode based on Hive configuration:

private def isHTTPTransportMode(hiveConf: HiveConf): Boolean = {
  val transportMode = hiveConf.getVar(ConfVars.HIVE_SERVER2_TRANSPORT_MODE)
  transportMode.toLowerCase(Locale.ROOT).equals("http")
}

Binary Transport: Direct TCP connections using Thrift binary protocol (default) HTTP Transport: HTTP-based connections for firewall-friendly access

Event Monitoring

HiveThriftServer2Listener

Spark listener that tracks server lifecycle and provides monitoring data for the web UI.

private[thriftserver] class HiveThriftServer2Listener(server: HiveServer2, conf: SQLConf) extends SparkListener {
  def getOnlineSessionNum: Int
  def getTotalRunning: Int
  def getSessionList: Seq[SessionInfo]
  def getSession(sessionId: String): Option[SessionInfo]
  def getExecutionList: Seq[ExecutionInfo]
  
  // Event handlers
  def onSessionCreated(ip: String, sessionId: String, userName: String = "UNKNOWN"): Unit
  def onSessionClosed(sessionId: String): Unit
  def onStatementStart(id: String, sessionId: String, statement: String, groupId: String, userName: String = "UNKNOWN"): Unit
  def onStatementParsed(id: String, executionPlan: String): Unit
  def onStatementError(id: String, errorMessage: String, errorTrace: String): Unit
  def onStatementFinish(id: String): Unit
}

The listener maintains runtime statistics and session/query tracking:

  • Session Management: Tracks active sessions, connection details, and session duration
  • Query Execution: Monitors SQL statement lifecycle from start to completion
  • Performance Metrics: Provides data for web UI dashboards and monitoring
  • Resource Cleanup: Automatically trims old session and execution records

Server State Management

The server uses atomic state tracking to ensure clean startup and shutdown:

private val started = new AtomicBoolean(false)

override def start(): Unit = {
  super.start()
  started.set(true)
}

override def stop(): Unit = {
  if (started.getAndSet(false)) {
     super.stop()
  }
}

This prevents double-start/stop operations and ensures resources are properly released.

Error Handling

The server includes comprehensive error handling:

  1. Startup Failures: Catches exceptions during initialization and exits with error code
  2. Context Validation: Checks if SparkContext is stopped during startup
  3. Shutdown Hooks: Ensures cleanup even on unexpected termination
  4. Service Dependencies: Validates all required services are available

Example Error Scenarios:

try {
  val server = new HiveThriftServer2(SparkSQLEnv.sqlContext)
  server.init(executionHive.conf)
  server.start()
  // ... setup monitoring
  if (SparkSQLEnv.sparkContext.stopped.get()) {
    logError("SparkContext has stopped even if HiveServer2 has started, so exit")
    System.exit(-1)
  }
} catch {
  case e: Exception =>
    logError("Error starting HiveThriftServer2", e)
    System.exit(-1)
}