Query operations manage SQL statement execution, result handling, and asynchronous processing within the Spark Hive Thrift Server.
Central manager for all SQL query execution operations with session context management.
private[thriftserver] class SparkSQLOperationManager extends OperationManager with Logging {
val handleToOperation: JMap[OperationHandle, Operation]
val sessionToActivePool: ConcurrentHashMap[SessionHandle, String]
val sessionToContexts: ConcurrentHashMap[SessionHandle, SQLContext]
def newExecuteStatementOperation(parentSession: HiveSession, statement: String,
confOverlay: JMap[String, String], async: Boolean): ExecuteStatementOperation
def setConfMap(conf: SQLConf, confMap: java.util.Map[String, String]): Unit
}The newExecuteStatementOperation method creates new query execution operations:
Usage Example:
import java.util.{Map => JMap}
import org.apache.hive.service.cli.session.HiveSession
val confOverlay: JMap[String, String] = new java.util.HashMap()
confOverlay.put("spark.sql.adaptive.enabled", "true")
val operation = operationManager.newExecuteStatementOperation(
parentSession = hiveSession,
statement = "SELECT * FROM users WHERE age > 21",
confOverlay = confOverlay,
async = true
)Operation Creation Process:
SparkExecuteStatementOperation instanceThe setConfMap method applies configuration overrides to SQL contexts:
def setConfMap(conf: SQLConf, confMap: java.util.Map[String, String]): Unit = {
val iterator = confMap.entrySet().iterator()
while (iterator.hasNext) {
val kv = iterator.next()
conf.setConfString(kv.getKey, kv.getValue)
}
}Configuration Sources:
Individual query execution operation with result management and data type handling.
private[hive] class SparkExecuteStatementOperation(
parentSession: HiveSession,
statement: String,
confOverlay: JMap[String, String],
runInBackground: Boolean = true
)(sqlContext: SQLContext, sessionToActivePool: JMap[SessionHandle, String])
extends ExecuteStatementOperation with Logging {
def close(): Unit
def addNonNullColumnValue(from: SparkRow, to: ArrayBuffer[Any], ordinal: Int): Unit
}Synchronous Execution:
// Execute immediately and wait for results
val operation = new SparkExecuteStatementOperation(
session, "SELECT COUNT(*) FROM large_table", confOverlay, runInBackground = false
)(sqlContext, sessionToActivePool)Asynchronous Execution:
// Execute in background thread
val operation = new SparkExecuteStatementOperation(
session, "SELECT * FROM large_table", confOverlay, runInBackground = true
)(sqlContext, sessionToActivePool)The operation handles different data types in query results:
def addNonNullColumnValue(from: SparkRow, to: ArrayBuffer[Any], ordinal: Int): Unit = {
dataTypes(ordinal) match {
case StringType => to += from.getString(ordinal)
case IntegerType => to += from.getInt(ordinal)
case BooleanType => to += from.getBoolean(ordinal)
case DoubleType => to += from.getDouble(ordinal)
case FloatType => to += from.getFloat(ordinal)
case DecimalType() => to += from.getDecimal(ordinal)
case LongType => to += from.getLong(ordinal)
case ByteType => to += from.getByte(ordinal)
case ShortType => to += from.getShort(ordinal)
case DateType => to += from.getAs[Date](ordinal)
case TimestampType => to += from.getAs[Timestamp](ordinal)
// Additional type handling...
}
}Supported Data Types:
Operations include comprehensive resource cleanup:
def close(): Unit = {
// RDDs will be cleaned automatically upon garbage collection
logDebug(s"CLOSING $statementId")
cleanup(OperationState.CLOSED)
sqlContext.sparkContext.clearJobGroup()
}Cleanup Process:
Detailed tracking of query execution lifecycle and performance metrics.
private[thriftserver] class ExecutionInfo(
statement: String,
sessionId: String,
startTimestamp: Long,
userName: String
) {
var finishTimestamp: Long = 0L
var executePlan: String = ""
var detail: String = ""
var state: ExecutionState.Value = ExecutionState.STARTED
val jobId: ArrayBuffer[String] = ArrayBuffer[String]()
var groupId: String = ""
def totalTime: Long
}private[thriftserver] object ExecutionState extends Enumeration {
val STARTED, COMPILED, FAILED, FINISHED = Value
type ExecutionState = Value
}State Transitions:
Query Start:
def onStatementStart(id: String, sessionId: String, statement: String,
groupId: String, userName: String = "UNKNOWN"): Unit = synchronized {
val info = new ExecutionInfo(statement, sessionId, System.currentTimeMillis, userName)
info.state = ExecutionState.STARTED
executionList.put(id, info)
trimExecutionIfNecessary()
sessionList(sessionId).totalExecution += 1
executionList(id).groupId = groupId
totalRunning += 1
}Query Completion:
def onStatementFinish(id: String): Unit = synchronized {
executionList(id).finishTimestamp = System.currentTimeMillis
executionList(id).state = ExecutionState.FINISHED
totalRunning -= 1
trimExecutionIfNecessary()
}Error Handling:
def onStatementError(id: String, errorMessage: String, errorTrace: String): Unit = {
synchronized {
executionList(id).finishTimestamp = System.currentTimeMillis
executionList(id).detail = errorMessage
executionList(id).state = ExecutionState.FAILED
totalRunning -= 1
trimExecutionIfNecessary()
}
}Operations support different result collection strategies:
Incremental Collection:
// Enable incremental result fetching
spark.sql.thriftServer.incrementalCollect=trueFull Collection:
// Cache complete results (default)
spark.sql.thriftServer.incrementalCollect=falseBackground execution prevents client blocking:
val runInBackground = async && conf.getConf(HiveUtils.HIVE_THRIFT_SERVER_ASYNC)Benefits:
Operations use Spark job groups for resource management:
sqlContext.sparkContext.setJobGroup(statementId, statement, interruptOnCancel = true)Features: