Spark SQL Thrift JDBC/ODBC Server - HiveServer2 compatible interface for Spark SQL
npx @tessl/cli install tessl/maven-org-apache-spark--spark-hive-thriftserver@1.6.0Spark Hive Thrift Server provides a Thrift-based JDBC/ODBC interface for Spark SQL, making it compatible with HiveServer2 clients. It enables remote access to Spark SQL through standard database connectivity protocols, allowing users to connect using JDBC drivers and execute SQL queries against Spark datasets and tables.
The server implements the HiveServer2 thrift interface but uses Spark SQL as the execution engine instead of Hive, providing better performance and broader data source support. It includes support for concurrent sessions, query execution management, and a web UI for monitoring active connections and queries.
import org.apache.spark.sql.hive.thriftserver.HiveThriftServer2
import org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver
import org.apache.spark.sql.hive.thriftserver.SparkSQLEnv
import org.apache.spark.sql.hive.thriftserver.SparkSQLCLIService
import org.apache.spark.sql.hive.thriftserver.ReflectionUtils
import org.apache.spark.sql.hive.HiveContext// Start thrift server standalone
object MyThriftServer extends App {
HiveThriftServer2.main(args)
}
// Or programmatically with existing context
import org.apache.spark.sql.hive.HiveContext
val hiveContext = new HiveContext(sparkContext)
HiveThriftServer2.startWithContext(hiveContext)// Start interactive SQL CLI
object MySQLCLI extends App {
SparkSQLCLIDriver.main(args)
}// Initialize Spark SQL environment
SparkSQLEnv.init()
// Access shared contexts
val sparkContext = SparkSQLEnv.sparkContext
val hiveContext = SparkSQLEnv.hiveContext
// Clean shutdown
SparkSQLEnv.stop()The Spark Hive Thrift Server is built around several key components:
HiveThriftServer2 and SparkSQLCLIDriver provide main application entry points for server and CLI modesSparkSQLEnv manages shared Spark and Hive contexts with optimal configurationsSparkSQLSessionManager handles client session lifecycle and isolationSparkExecuteStatementOperation and SparkSQLDriver process SQL statements and manage resultsSparkSQLCLIService implements the Thrift service interface compatible with HiveServer2ReflectionUtils provides compatibility layer for Hive integrationCore server lifecycle management including startup, configuration, and shutdown operations.
object HiveThriftServer2 {
def main(args: Array[String]): Unit
@DeveloperApi
def startWithContext(sqlContext: HiveContext): Unit
var LOG: Log
var uiTab: Option[ThriftServerTab]
var listener: HiveThriftServer2Listener
}Interactive command-line interface for executing SQL queries with Hive CLI compatibility.
object SparkSQLCLIDriver {
def main(args: Array[String]): Unit
def installSignalHandler(): Unit
}
private[hive] class SparkSQLCLIDriver extends CliDriver {
override def processCmd(cmd: String): Int
}Centralized management of Spark and Hive execution contexts with optimized configurations.
object SparkSQLEnv {
var hiveContext: HiveContext
var sparkContext: SparkContext
def init(): Unit
def stop(): Unit
}Client session lifecycle management with isolation and resource cleanup.
private[hive] class SparkSQLSessionManager(
hiveServer: HiveServer2,
hiveContext: HiveContext
) extends SessionManager {
override def openSession(...): SessionHandle
override def closeSession(sessionHandle: SessionHandle): Unit
}SQL statement execution with result management and schema introspection.
private[hive] class SparkExecuteStatementOperation(
parentSession: HiveSession,
statement: String,
confOverlay: JMap[String, String],
runInBackground: Boolean
) extends ExecuteStatementOperation {
def close(): Unit
def getNextRowSet(order: FetchOrientation, maxRowsL: Long): RowSet
def getResultSetSchema: TableSchema
def cancel(): Unit
}
private[hive] class SparkSQLDriver(
context: HiveContext = SparkSQLEnv.hiveContext
) extends Driver {
def init(): Unit
def run(command: String): CommandProcessorResponse
def close(): Int
def getResults(res: JList[_]): Boolean
def getSchema: Schema
def destroy(): Unit
}Web-based monitoring interface with session tracking and query statistics.
private[thriftserver] class HiveThriftServer2Listener(
server: HiveServer2,
conf: SQLConf
) extends SparkListener {
def getOnlineSessionNum: Int
def getTotalRunning: Int
def getSessionList: Seq[SessionInfo]
def getSession(sessionId: String): Option[SessionInfo]
def getExecutionList: Seq[ExecutionInfo]
}
private[thriftserver] class ThriftServerTab(
sparkContext: SparkContext
) extends SparkUITab {
def detach(): Unit
}Core Thrift service implementation providing HiveServer2 compatibility layer.
private[hive] class SparkSQLCLIService(
hiveServer: HiveServer2,
hiveContext: HiveContext
) extends CLIService(hiveServer) {
override def init(hiveConf: HiveConf): Unit
override def start(): Unit
override def stop(): Unit
}Utility methods for accessing private fields and methods in Hive classes for compatibility.
private[hive] object ReflectionUtils {
def setSuperField(obj: Object, fieldName: String, fieldValue: Object): Unit
def setAncestorField(obj: AnyRef, level: Int, fieldName: String, fieldValue: AnyRef): Unit
def getSuperField[T](obj: AnyRef, fieldName: String): T
def getAncestorField[T](clazz: Object, level: Int, fieldName: String): T
def invokeStatic(clazz: Class[_], methodName: String, args: (Class[_], AnyRef)*): AnyRef
def invoke(clazz: Class[_], obj: AnyRef, methodName: String, args: (Class[_], AnyRef)*): AnyRef
}spark.app.name - Application name (default: "SparkSQL::{hostname}")spark.serializer - Serializer class (default: KryoSerializer)spark.kryo.referenceTracking - Kryo reference tracking (default: false)spark.ui.enabled - Enable Spark Web UI (default: true)hive.server2.transport.mode - Transport mode ("binary" or "http")hive.server2.async.exec.threads - Background execution thread pool sizehive.server2.logging.operation.enabled - Enable operation loggingSQLConf.THRIFTSERVER_POOL.key - Scheduler pool for query executionSQLConf.THRIFTSERVER_UI_STATEMENT_LIMIT - Maximum statements retained in UISQLConf.THRIFTSERVER_UI_SESSION_LIMIT - Maximum sessions retained in UIprivate[thriftserver] class SessionInfo(
val sessionId: String,
val startTimestamp: Long,
val ip: String,
val userName: String
) {
var finishTimestamp: Long
var totalExecution: Int
def totalTime: Long
}
private[thriftserver] class ExecutionInfo(
val statement: String,
val sessionId: String,
val startTimestamp: Long,
val userName: String
) {
var finishTimestamp: Long
var executePlan: String
var detail: String
var state: ExecutionState.Value
val jobId: ArrayBuffer[String]
var groupId: String
def totalTime: Long
}
private[thriftserver] object ExecutionState extends Enumeration {
val STARTED, COMPILED, FAILED, FINISHED = Value
type ExecutionState = Value
}