The Spark Hive Thrift Server provides two primary entry points for server lifecycle management: programmatic initialization and command-line startup.
Central singleton object managing server lifecycle, monitoring, and event handling.
object HiveThriftServer2 extends Logging {
var uiTab: Option[ThriftServerTab]
var listener: HiveThriftServer2Listener
@DeveloperApi
def startWithContext(sqlContext: SQLContext): Unit
def main(args: Array[String]): Unit
}Programmatically starts a new thrift server with the provided SQL context. This method is annotated with @DeveloperApi and intended for embedding the server in custom applications.
Usage Example:
import org.apache.spark.sql.hive.thriftserver.HiveThriftServer2
import org.apache.spark.sql.SQLContext
import org.apache.spark.{SparkConf, SparkContext}
val conf = new SparkConf()
.setAppName("MyThriftServer")
.setMaster("local[*]")
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)
// Start the server
HiveThriftServer2.startWithContext(sqlContext)
// Server is now running and accepting connectionsThe method performs the following initialization:
HiveThriftServer2 instanceCommand-line entry point that initializes the Spark environment and starts the server with default configuration.
Usage Example:
# Direct invocation (typically done via scripts)
scala -cp $SPARK_CLASSPATH org.apache.spark.sql.hive.thriftserver.HiveThriftServer2The main method:
ServerOptionsProcessorSparkSQLEnvInternal server implementation extending Hive's HiveServer2 with Spark SQL integration.
private[hive] class HiveThriftServer2(sqlContext: SQLContext) extends HiveServer2 with ReflectedCompositeService {
def init(hiveConf: HiveConf): Unit
def start(): Unit
def stop(): Unit
}The init method configures transport layer and service components:
SparkSQLCLIService with SQL context integrationThe server automatically selects transport mode based on Hive configuration:
private def isHTTPTransportMode(hiveConf: HiveConf): Boolean = {
val transportMode = hiveConf.getVar(ConfVars.HIVE_SERVER2_TRANSPORT_MODE)
transportMode.toLowerCase(Locale.ROOT).equals("http")
}Binary Transport: Direct TCP connections using Thrift binary protocol (default) HTTP Transport: HTTP-based connections for firewall-friendly access
Spark listener that tracks server lifecycle and provides monitoring data for the web UI.
private[thriftserver] class HiveThriftServer2Listener(server: HiveServer2, conf: SQLConf) extends SparkListener {
def getOnlineSessionNum: Int
def getTotalRunning: Int
def getSessionList: Seq[SessionInfo]
def getSession(sessionId: String): Option[SessionInfo]
def getExecutionList: Seq[ExecutionInfo]
// Event handlers
def onSessionCreated(ip: String, sessionId: String, userName: String = "UNKNOWN"): Unit
def onSessionClosed(sessionId: String): Unit
def onStatementStart(id: String, sessionId: String, statement: String, groupId: String, userName: String = "UNKNOWN"): Unit
def onStatementParsed(id: String, executionPlan: String): Unit
def onStatementError(id: String, errorMessage: String, errorTrace: String): Unit
def onStatementFinish(id: String): Unit
}The listener maintains runtime statistics and session/query tracking:
The server uses atomic state tracking to ensure clean startup and shutdown:
private val started = new AtomicBoolean(false)
override def start(): Unit = {
super.start()
started.set(true)
}
override def stop(): Unit = {
if (started.getAndSet(false)) {
super.stop()
}
}This prevents double-start/stop operations and ensures resources are properly released.
The server includes comprehensive error handling:
Example Error Scenarios:
try {
val server = new HiveThriftServer2(SparkSQLEnv.sqlContext)
server.init(executionHive.conf)
server.start()
// ... setup monitoring
if (SparkSQLEnv.sparkContext.stopped.get()) {
logError("SparkContext has stopped even if HiveServer2 has started, so exit")
System.exit(-1)
}
} catch {
case e: Exception =>
logError("Error starting HiveThriftServer2", e)
System.exit(-1)
}