SQL operations and metadata operations management with session context mapping for the Spark Hive Thrift Server.
Manages the lifecycle of SQL operations and metadata operations, maintaining session-to-context mappings.
/**
* Executes queries using Spark SQL and maintains a list of handles to active queries
*/
class SparkSQLOperationManager extends OperationManager {
/**
* Map from session handles to their associated SQL contexts
*/
val sessionToContexts: ConcurrentHashMap[SessionHandle, SQLContext]
/**
* Create a new SQL execution operation
* @param parentSession The session that will own this operation
* @param statement SQL statement to execute
* @param confOverlay Configuration overrides for this operation
* @param async Whether to run asynchronously
* @param queryTimeout Timeout for query execution in milliseconds
* @return ExecuteStatementOperation handle
*/
override def newExecuteStatementOperation(
parentSession: HiveSession,
statement: String,
confOverlay: java.util.Map[String, String],
async: Boolean,
queryTimeout: Long
): ExecuteStatementOperation
/**
* Create a new get tables metadata operation
* @param parentSession The session that will own this operation
* @param catalogName Catalog name pattern (null for all catalogs)
* @param schemaName Schema name pattern (null for all schemas)
* @param tableName Table name pattern (null for all tables)
* @param tableTypes List of table types to include
* @return MetadataOperation handle
*/
override def newGetTablesOperation(
parentSession: HiveSession,
catalogName: String,
schemaName: String,
tableName: String,
tableTypes: java.util.List[String]
): MetadataOperation
/**
* Create a new get columns metadata operation
* @param parentSession The session that will own this operation
* @param catalogName Catalog name pattern
* @param schemaName Schema name pattern
* @param tableName Table name pattern
* @param columnName Column name pattern
* @return GetColumnsOperation handle
*/
override def newGetColumnsOperation(
parentSession: HiveSession,
catalogName: String,
schemaName: String,
tableName: String,
columnName: String
): GetColumnsOperation
/**
* Create a new get schemas metadata operation
* @param parentSession The session that will own this operation
* @param catalogName Catalog name pattern
* @param schemaName Schema name pattern
* @return GetSchemasOperation handle
*/
override def newGetSchemasOperation(
parentSession: HiveSession,
catalogName: String,
schemaName: String
): GetSchemasOperation
/**
* Create a new get functions metadata operation
* @param parentSession The session that will own this operation
* @param catalogName Catalog name pattern
* @param schemaName Schema name pattern
* @param functionName Function name pattern
* @return GetFunctionsOperation handle
*/
override def newGetFunctionsOperation(
parentSession: HiveSession,
catalogName: String,
schemaName: String,
functionName: String
): GetFunctionsOperation
/**
* Create a new get type info metadata operation
* @param parentSession The session that will own this operation
* @return GetTypeInfoOperation handle
*/
override def newGetTypeInfoOperation(parentSession: HiveSession): GetTypeInfoOperation
/**
* Create a new get catalogs metadata operation
* @param parentSession The session that will own this operation
* @return GetCatalogsOperation handle
*/
override def newGetCatalogsOperation(parentSession: HiveSession): GetCatalogsOperation
/**
* Create a new get table types metadata operation
* @param parentSession The session that will own this operation
* @return GetTableTypesOperation handle
*/
override def newGetTableTypesOperation(parentSession: HiveSession): GetTableTypesOperation
}Usage Examples:
import org.apache.spark.sql.hive.thriftserver.server.SparkSQLOperationManager
import org.apache.hive.service.cli.session.HiveSession
import scala.collection.JavaConverters._
// Create operation manager
val operationManager = new SparkSQLOperationManager()
// Assuming we have a session and session handle
val session: HiveSession = // ... obtained from session manager
val sessionHandle = session.getSessionHandle()
// Associate a SQL context with the session
operationManager.sessionToContexts.put(sessionHandle, SparkSQLEnv.sqlContext)
// Create a SQL execution operation
val statement = "SELECT * FROM my_table LIMIT 10"
val confOverlay = Map("spark.sql.adaptive.enabled" -> "true").asJava
val executeOp = operationManager.newExecuteStatementOperation(
session, statement, confOverlay, async = true, queryTimeout = 30000L
)
// Create metadata operations
val tablesOp = operationManager.newGetTablesOperation(
session, null, "default", "%", List("TABLE", "VIEW").asJava
)
val columnsOp = operationManager.newGetColumnsOperation(
session, null, "default", "my_table", null
)Operations follow a standard lifecycle managed by the operation manager.
// Operation states and lifecycle
abstract class Operation {
/**
* Get the handle identifying this operation
* @return OperationHandle for this operation
*/
def getHandle(): OperationHandle
/**
* Get the current state of this operation
* @return OperationState indicating current status
*/
def getStatus(): OperationStatus
/**
* Cancel this operation if it's running
*/
def cancel(): Unit
/**
* Close this operation and clean up resources
*/
def close(): Unit
}
/**
* Base class for metadata operations
*/
abstract class MetadataOperation extends Operation {
/**
* Get the result set for this metadata operation
* @return RowSet containing the metadata results
*/
def getResultSet(): RowSet
/**
* Get the schema for the result set
* @return TableSchema describing the result columns
*/
def getResultSetSchema(): TableSchema
}The operation manager maintains the critical mapping between sessions and their SQL contexts.
// Session to context mapping
val sessionToContexts: ConcurrentHashMap[SessionHandle, SQLContext]
// Context lifecycle management
def associateContext(sessionHandle: SessionHandle, sqlContext: SQLContext): Unit = {
sessionToContexts.put(sessionHandle, sqlContext)
}
def getContextForSession(sessionHandle: SessionHandle): SQLContext = {
val context = sessionToContexts.get(sessionHandle)
require(context != null, s"Session handle: $sessionHandle has not been initialized or had already closed.")
context
}
def removeContextForSession(sessionHandle: SessionHandle): SQLContext = {
sessionToContexts.remove(sessionHandle)
}Usage Examples:
// Session context lifecycle
val sessionHandle = // ... from session manager
val sqlContext = SparkSQLEnv.sqlContext.newSession()
// Associate context when session opens
operationManager.sessionToContexts.put(sessionHandle, sqlContext)
// Use context for operations
val context = operationManager.sessionToContexts.get(sessionHandle)
require(context != null, "Session not found")
// Clean up when session closes
operationManager.sessionToContexts.remove(sessionHandle)Operations can be executed asynchronously when configured appropriately.
// Asynchronous execution configuration
val runInBackground = async && conf.getConf(HiveUtils.HIVE_THRIFT_SERVER_ASYNC)
// Background execution is determined by:
// 1. Client request (async parameter)
// 2. Server configuration (spark.sql.hive.thriftServer.async)
// 3. Operation type (some operations are always synchronous)Configuration Examples:
// Enable async execution globally
spark.conf.set("spark.sql.hive.thriftServer.async", "true")
// Client requests async execution
val executeOp = operationManager.newExecuteStatementOperation(
session, statement, confOverlay,
async = true, // Request async execution
queryTimeout = 30000L
)
// Check if operation is running in background
val status = executeOp.getStatus()
if (status.getState() == OperationState.RUNNING) {
println("Operation is running asynchronously")
}Comprehensive error handling for operation lifecycle management.
/**
* Exception thrown during operation management
*/
class HiveSQLException extends SQLException {
public HiveSQLException(String reason, String sqlState, int vendorCode)
public HiveSQLException(String reason, String sqlState, Throwable cause)
}
// Common error scenarios
public static final String INVALID_SESSION_HANDLE = "HY000";
public static final String OPERATION_NOT_FOUND = "HY000";
public static final String OPERATION_ALREADY_CLOSED = "HY010";
public static final String QUERY_TIMEOUT = "HYT00";Error Handling Examples:
import org.apache.hive.service.cli.HiveSQLException
try {
val executeOp = operationManager.newExecuteStatementOperation(
session, statement, confOverlay, async = false, queryTimeout = 30000L
)
// Operation execution...
} catch {
case e: HiveSQLException if e.getSqlState == "HY000" =>
println("Invalid session or operation handle")
case e: HiveSQLException if e.getSqlState == "HYT00" =>
println("Query timeout exceeded")
case e: HiveSQLException =>
println(s"Operation error: ${e.getMessage} (${e.getSqlState})")
case e: IllegalArgumentException =>
println(s"Session not initialized: ${e.getMessage}")
}The operation manager maintains handles to all active operations.
// Handle to operation mapping (inherited from OperationManager)
val handleToOperation: java.util.Map[OperationHandle, Operation]
// Operation handle lifecycle
def addOperation(operation: Operation): Unit = {
handleToOperation.put(operation.getHandle(), operation)
}
def getOperation(operationHandle: OperationHandle): Operation = {
handleToOperation.get(operationHandle)
}
def removeOperation(operationHandle: OperationHandle): Operation = {
handleToOperation.remove(operationHandle)
}Operation Tracking Examples:
// Track all operations for a session
def getOperationsForSession(sessionHandle: SessionHandle): List[Operation] = {
handleToOperation.values().asScala.filter { op =>
op.getParentSession().getSessionHandle() == sessionHandle
}.toList
}
// Clean up operations when session closes
def closeAllOperationsForSession(sessionHandle: SessionHandle): Unit = {
getOperationsForSession(sessionHandle).foreach { operation =>
try {
operation.cancel()
operation.close()
} catch {
case e: Exception =>
logWarning(s"Error closing operation ${operation.getHandle()}", e)
}
}
}