Spark Project Hive Thrift Server - A Thrift server implementation that provides JDBC/ODBC access to Spark SQL
Query operations manage SQL statement execution, result handling, and asynchronous processing within the Spark Hive Thrift Server.
Central manager for all SQL query execution operations with session context management.
private[thriftserver] class SparkSQLOperationManager extends OperationManager with Logging {
val handleToOperation: JMap[OperationHandle, Operation]
val sessionToActivePool: ConcurrentHashMap[SessionHandle, String]
val sessionToContexts: ConcurrentHashMap[SessionHandle, SQLContext]
def newExecuteStatementOperation(parentSession: HiveSession, statement: String,
confOverlay: JMap[String, String], async: Boolean): ExecuteStatementOperation
def setConfMap(conf: SQLConf, confMap: java.util.Map[String, String]): Unit
}The newExecuteStatementOperation method creates new query execution operations:
Usage Example:
import java.util.{Map => JMap}
import org.apache.hive.service.cli.session.HiveSession
val confOverlay: JMap[String, String] = new java.util.HashMap()
confOverlay.put("spark.sql.adaptive.enabled", "true")
val operation = operationManager.newExecuteStatementOperation(
parentSession = hiveSession,
statement = "SELECT * FROM users WHERE age > 21",
confOverlay = confOverlay,
async = true
)Operation Creation Process:
SparkExecuteStatementOperation instanceThe setConfMap method applies configuration overrides to SQL contexts:
def setConfMap(conf: SQLConf, confMap: java.util.Map[String, String]): Unit = {
val iterator = confMap.entrySet().iterator()
while (iterator.hasNext) {
val kv = iterator.next()
conf.setConfString(kv.getKey, kv.getValue)
}
}Configuration Sources:
Individual query execution operation with result management and data type handling.
private[hive] class SparkExecuteStatementOperation(
parentSession: HiveSession,
statement: String,
confOverlay: JMap[String, String],
runInBackground: Boolean = true
)(sqlContext: SQLContext, sessionToActivePool: JMap[SessionHandle, String])
extends ExecuteStatementOperation with Logging {
def close(): Unit
def addNonNullColumnValue(from: SparkRow, to: ArrayBuffer[Any], ordinal: Int): Unit
}Synchronous Execution:
// Execute immediately and wait for results
val operation = new SparkExecuteStatementOperation(
session, "SELECT COUNT(*) FROM large_table", confOverlay, runInBackground = false
)(sqlContext, sessionToActivePool)Asynchronous Execution:
// Execute in background thread
val operation = new SparkExecuteStatementOperation(
session, "SELECT * FROM large_table", confOverlay, runInBackground = true
)(sqlContext, sessionToActivePool)The operation handles different data types in query results:
def addNonNullColumnValue(from: SparkRow, to: ArrayBuffer[Any], ordinal: Int): Unit = {
dataTypes(ordinal) match {
case StringType => to += from.getString(ordinal)
case IntegerType => to += from.getInt(ordinal)
case BooleanType => to += from.getBoolean(ordinal)
case DoubleType => to += from.getDouble(ordinal)
case FloatType => to += from.getFloat(ordinal)
case DecimalType() => to += from.getDecimal(ordinal)
case LongType => to += from.getLong(ordinal)
case ByteType => to += from.getByte(ordinal)
case ShortType => to += from.getShort(ordinal)
case DateType => to += from.getAs[Date](ordinal)
case TimestampType => to += from.getAs[Timestamp](ordinal)
// Additional type handling...
}
}Supported Data Types:
Operations include comprehensive resource cleanup:
def close(): Unit = {
// RDDs will be cleaned automatically upon garbage collection
logDebug(s"CLOSING $statementId")
cleanup(OperationState.CLOSED)
sqlContext.sparkContext.clearJobGroup()
}Cleanup Process:
Detailed tracking of query execution lifecycle and performance metrics.
private[thriftserver] class ExecutionInfo(
statement: String,
sessionId: String,
startTimestamp: Long,
userName: String
) {
var finishTimestamp: Long = 0L
var executePlan: String = ""
var detail: String = ""
var state: ExecutionState.Value = ExecutionState.STARTED
val jobId: ArrayBuffer[String] = ArrayBuffer[String]()
var groupId: String = ""
def totalTime: Long
}private[thriftserver] object ExecutionState extends Enumeration {
val STARTED, COMPILED, FAILED, FINISHED = Value
type ExecutionState = Value
}State Transitions:
Query Start:
def onStatementStart(id: String, sessionId: String, statement: String,
groupId: String, userName: String = "UNKNOWN"): Unit = synchronized {
val info = new ExecutionInfo(statement, sessionId, System.currentTimeMillis, userName)
info.state = ExecutionState.STARTED
executionList.put(id, info)
trimExecutionIfNecessary()
sessionList(sessionId).totalExecution += 1
executionList(id).groupId = groupId
totalRunning += 1
}Query Completion:
def onStatementFinish(id: String): Unit = synchronized {
executionList(id).finishTimestamp = System.currentTimeMillis
executionList(id).state = ExecutionState.FINISHED
totalRunning -= 1
trimExecutionIfNecessary()
}Error Handling:
def onStatementError(id: String, errorMessage: String, errorTrace: String): Unit = {
synchronized {
executionList(id).finishTimestamp = System.currentTimeMillis
executionList(id).detail = errorMessage
executionList(id).state = ExecutionState.FAILED
totalRunning -= 1
trimExecutionIfNecessary()
}
}Operations support different result collection strategies:
Incremental Collection:
// Enable incremental result fetching
spark.sql.thriftServer.incrementalCollect=trueFull Collection:
// Cache complete results (default)
spark.sql.thriftServer.incrementalCollect=falseBackground execution prevents client blocking:
val runInBackground = async && conf.getConf(HiveUtils.HIVE_THRIFT_SERVER_ASYNC)Benefits:
Operations use Spark job groups for resource management:
sqlContext.sparkContext.setJobGroup(statementId, statement, interruptOnCancel = true)Features:
Install with Tessl CLI
npx tessl i tessl/maven-org-apache-spark--spark-hive-thriftserver-2-11