or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

cli-interface.mdenvironment-management.mdindex.mdquery-operations.mdserver-management.mdsession-management.mdweb-ui-monitoring.md
tile.json

query-operations.mddocs/

Query Operations

Query operations manage SQL statement execution, result handling, and asynchronous processing within the Spark Hive Thrift Server.

Operation Manager

SparkSQLOperationManager

Central manager for all SQL query execution operations with session context management.

private[thriftserver] class SparkSQLOperationManager extends OperationManager with Logging {
  val handleToOperation: JMap[OperationHandle, Operation]
  val sessionToActivePool: ConcurrentHashMap[SessionHandle, String]
  val sessionToContexts: ConcurrentHashMap[SessionHandle, SQLContext]
  
  def newExecuteStatementOperation(parentSession: HiveSession, statement: String,
                                  confOverlay: JMap[String, String], async: Boolean): ExecuteStatementOperation
  def setConfMap(conf: SQLConf, confMap: java.util.Map[String, String]): Unit
}

Operation Creation

The newExecuteStatementOperation method creates new query execution operations:

Usage Example:

import java.util.{Map => JMap}
import org.apache.hive.service.cli.session.HiveSession

val confOverlay: JMap[String, String] = new java.util.HashMap()
confOverlay.put("spark.sql.adaptive.enabled", "true")

val operation = operationManager.newExecuteStatementOperation(
  parentSession = hiveSession,
  statement = "SELECT * FROM users WHERE age > 21", 
  confOverlay = confOverlay,
  async = true
)

Operation Creation Process:

  1. Session Validation: Ensures session has valid SQL context
  2. Configuration Merging: Applies session and query-specific configuration
  3. Background Execution: Determines if query should run asynchronously
  4. Operation Creation: Creates SparkExecuteStatementOperation instance
  5. Registration: Registers operation handle for tracking

Configuration Management

The setConfMap method applies configuration overrides to SQL contexts:

def setConfMap(conf: SQLConf, confMap: java.util.Map[String, String]): Unit = {
  val iterator = confMap.entrySet().iterator()
  while (iterator.hasNext) {
    val kv = iterator.next()
    conf.setConfString(kv.getKey, kv.getValue)
  }
}

Configuration Sources:

  • Session State: Hive session override configurations
  • Session Variables: User-set Hive variables
  • Query Overlay: Statement-specific configuration parameters

Statement Execution

SparkExecuteStatementOperation

Individual query execution operation with result management and data type handling.

private[hive] class SparkExecuteStatementOperation(
  parentSession: HiveSession,
  statement: String, 
  confOverlay: JMap[String, String],
  runInBackground: Boolean = true
)(sqlContext: SQLContext, sessionToActivePool: JMap[SessionHandle, String])
extends ExecuteStatementOperation with Logging {

  def close(): Unit
  def addNonNullColumnValue(from: SparkRow, to: ArrayBuffer[Any], ordinal: Int): Unit
}

Query Execution Process

Synchronous Execution:

// Execute immediately and wait for results
val operation = new SparkExecuteStatementOperation(
  session, "SELECT COUNT(*) FROM large_table", confOverlay, runInBackground = false
)(sqlContext, sessionToActivePool)

Asynchronous Execution:

// Execute in background thread
val operation = new SparkExecuteStatementOperation(
  session, "SELECT * FROM large_table", confOverlay, runInBackground = true
)(sqlContext, sessionToActivePool)

Result Processing

The operation handles different data types in query results:

def addNonNullColumnValue(from: SparkRow, to: ArrayBuffer[Any], ordinal: Int): Unit = {
  dataTypes(ordinal) match {
    case StringType => to += from.getString(ordinal)
    case IntegerType => to += from.getInt(ordinal) 
    case BooleanType => to += from.getBoolean(ordinal)
    case DoubleType => to += from.getDouble(ordinal)
    case FloatType => to += from.getFloat(ordinal)
    case DecimalType() => to += from.getDecimal(ordinal)
    case LongType => to += from.getLong(ordinal)
    case ByteType => to += from.getByte(ordinal)
    case ShortType => to += from.getShort(ordinal)
    case DateType => to += from.getAs[Date](ordinal)
    case TimestampType => to += from.getAs[Timestamp](ordinal)
    // Additional type handling...
  }
}

Supported Data Types:

  • Primitive Types: String, Integer, Boolean, Double, Float, Long, Byte, Short
  • Decimal Types: High-precision decimal numbers
  • Date/Time Types: Date and Timestamp values
  • Complex Types: Arrays, Maps, Structs (via nested processing)

Resource Management

Operations include comprehensive resource cleanup:

def close(): Unit = {
  // RDDs will be cleaned automatically upon garbage collection
  logDebug(s"CLOSING $statementId")
  cleanup(OperationState.CLOSED)
  sqlContext.sparkContext.clearJobGroup()
}

Cleanup Process:

  1. State Update: Marks operation as closed
  2. Job Cancellation: Cancels any running Spark jobs
  3. Memory Cleanup: Releases cached result data
  4. Resource Release: Frees system resources

Execution Tracking

ExecutionInfo

Detailed tracking of query execution lifecycle and performance metrics.

private[thriftserver] class ExecutionInfo(
  statement: String,
  sessionId: String, 
  startTimestamp: Long,
  userName: String
) {
  var finishTimestamp: Long = 0L
  var executePlan: String = ""
  var detail: String = ""
  var state: ExecutionState.Value = ExecutionState.STARTED
  val jobId: ArrayBuffer[String] = ArrayBuffer[String]()
  var groupId: String = ""
  def totalTime: Long
}

Execution States

private[thriftserver] object ExecutionState extends Enumeration {
  val STARTED, COMPILED, FAILED, FINISHED = Value
  type ExecutionState = Value
}

State Transitions:

  1. STARTED: Query execution begins
  2. COMPILED: SQL parsed and execution plan generated
  3. FAILED: Query execution encountered error
  4. FINISHED: Query completed successfully

Event Tracking

Query Start:

def onStatementStart(id: String, sessionId: String, statement: String, 
                    groupId: String, userName: String = "UNKNOWN"): Unit = synchronized {
  val info = new ExecutionInfo(statement, sessionId, System.currentTimeMillis, userName)
  info.state = ExecutionState.STARTED
  executionList.put(id, info)
  trimExecutionIfNecessary()
  sessionList(sessionId).totalExecution += 1
  executionList(id).groupId = groupId
  totalRunning += 1
}

Query Completion:

def onStatementFinish(id: String): Unit = synchronized {
  executionList(id).finishTimestamp = System.currentTimeMillis
  executionList(id).state = ExecutionState.FINISHED
  totalRunning -= 1
  trimExecutionIfNecessary()
}

Error Handling:

def onStatementError(id: String, errorMessage: String, errorTrace: String): Unit = {
  synchronized {
    executionList(id).finishTimestamp = System.currentTimeMillis
    executionList(id).detail = errorMessage
    executionList(id).state = ExecutionState.FAILED
    totalRunning -= 1
    trimExecutionIfNecessary()
  }
}

Performance Optimization

Result Caching

Operations support different result collection strategies:

Incremental Collection:

// Enable incremental result fetching
spark.sql.thriftServer.incrementalCollect=true
  • Results streamed as they become available
  • Lower memory usage for large result sets
  • Better responsiveness for interactive queries

Full Collection:

// Cache complete results (default)
spark.sql.thriftServer.incrementalCollect=false
  • All results cached in memory
  • Supports FETCH_FIRST operations
  • Higher memory usage but better performance for small results

Asynchronous Processing

Background execution prevents client blocking:

val runInBackground = async && conf.getConf(HiveUtils.HIVE_THRIFT_SERVER_ASYNC)

Benefits:

  • Client remains responsive during long queries
  • Multiple concurrent query execution
  • Better resource utilization

Job Group Management

Operations use Spark job groups for resource management:

sqlContext.sparkContext.setJobGroup(statementId, statement, interruptOnCancel = true)

Features:

  • Query Identification: Link Spark jobs to SQL statements
  • Cancellation Support: Enable query interruption
  • Resource Tracking: Monitor resource usage per query
  • Performance Monitoring: Collect execution metrics