or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

cli-driver.mdcli-services.mdindex.mdmetadata-operations.mdoperation-management.mdserver-management.mdsession-management.mdsql-execution.mdweb-ui.md
tile.json

operation-management.mddocs/

Operation Management

SQL operations and metadata operations management with session context mapping for the Spark Hive Thrift Server.

Capabilities

SparkSQLOperationManager

Manages the lifecycle of SQL operations and metadata operations, maintaining session-to-context mappings.

/**
 * Executes queries using Spark SQL and maintains a list of handles to active queries
 */
class SparkSQLOperationManager extends OperationManager {
  /**
   * Map from session handles to their associated SQL contexts
   */
  val sessionToContexts: ConcurrentHashMap[SessionHandle, SQLContext]
  
  /**
   * Create a new SQL execution operation
   * @param parentSession The session that will own this operation
   * @param statement SQL statement to execute
   * @param confOverlay Configuration overrides for this operation
   * @param async Whether to run asynchronously
   * @param queryTimeout Timeout for query execution in milliseconds
   * @return ExecuteStatementOperation handle
   */
  override def newExecuteStatementOperation(
    parentSession: HiveSession,
    statement: String,
    confOverlay: java.util.Map[String, String],
    async: Boolean,
    queryTimeout: Long
  ): ExecuteStatementOperation
  
  /**
   * Create a new get tables metadata operation
   * @param parentSession The session that will own this operation
   * @param catalogName Catalog name pattern (null for all catalogs)
   * @param schemaName Schema name pattern (null for all schemas)
   * @param tableName Table name pattern (null for all tables)
   * @param tableTypes List of table types to include
   * @return MetadataOperation handle
   */
  override def newGetTablesOperation(
    parentSession: HiveSession,
    catalogName: String,
    schemaName: String,
    tableName: String,
    tableTypes: java.util.List[String]
  ): MetadataOperation
  
  /**
   * Create a new get columns metadata operation
   * @param parentSession The session that will own this operation
   * @param catalogName Catalog name pattern
   * @param schemaName Schema name pattern  
   * @param tableName Table name pattern
   * @param columnName Column name pattern
   * @return GetColumnsOperation handle
   */
  override def newGetColumnsOperation(
    parentSession: HiveSession,
    catalogName: String,
    schemaName: String,
    tableName: String,
    columnName: String
  ): GetColumnsOperation
  
  /**
   * Create a new get schemas metadata operation
   * @param parentSession The session that will own this operation
   * @param catalogName Catalog name pattern
   * @param schemaName Schema name pattern
   * @return GetSchemasOperation handle
   */
  override def newGetSchemasOperation(
    parentSession: HiveSession,
    catalogName: String,
    schemaName: String
  ): GetSchemasOperation
  
  /**
   * Create a new get functions metadata operation
   * @param parentSession The session that will own this operation
   * @param catalogName Catalog name pattern
   * @param schemaName Schema name pattern
   * @param functionName Function name pattern
   * @return GetFunctionsOperation handle
   */
  override def newGetFunctionsOperation(
    parentSession: HiveSession,
    catalogName: String,
    schemaName: String,
    functionName: String
  ): GetFunctionsOperation
  
  /**
   * Create a new get type info metadata operation
   * @param parentSession The session that will own this operation
   * @return GetTypeInfoOperation handle
   */
  override def newGetTypeInfoOperation(parentSession: HiveSession): GetTypeInfoOperation
  
  /**
   * Create a new get catalogs metadata operation
   * @param parentSession The session that will own this operation
   * @return GetCatalogsOperation handle
   */
  override def newGetCatalogsOperation(parentSession: HiveSession): GetCatalogsOperation
  
  /**
   * Create a new get table types metadata operation
   * @param parentSession The session that will own this operation
   * @return GetTableTypesOperation handle
   */
  override def newGetTableTypesOperation(parentSession: HiveSession): GetTableTypesOperation
}

Usage Examples:

import org.apache.spark.sql.hive.thriftserver.server.SparkSQLOperationManager
import org.apache.hive.service.cli.session.HiveSession
import scala.collection.JavaConverters._

// Create operation manager
val operationManager = new SparkSQLOperationManager()

// Assuming we have a session and session handle
val session: HiveSession = // ... obtained from session manager
val sessionHandle = session.getSessionHandle()

// Associate a SQL context with the session
operationManager.sessionToContexts.put(sessionHandle, SparkSQLEnv.sqlContext)

// Create a SQL execution operation
val statement = "SELECT * FROM my_table LIMIT 10"
val confOverlay = Map("spark.sql.adaptive.enabled" -> "true").asJava
val executeOp = operationManager.newExecuteStatementOperation(
  session, statement, confOverlay, async = true, queryTimeout = 30000L
)

// Create metadata operations
val tablesOp = operationManager.newGetTablesOperation(
  session, null, "default", "%", List("TABLE", "VIEW").asJava
)

val columnsOp = operationManager.newGetColumnsOperation(
  session, null, "default", "my_table", null
)

Operation Lifecycle

Operations follow a standard lifecycle managed by the operation manager.

// Operation states and lifecycle
abstract class Operation {
  /**
   * Get the handle identifying this operation
   * @return OperationHandle for this operation
   */
  def getHandle(): OperationHandle
  
  /**
   * Get the current state of this operation
   * @return OperationState indicating current status
   */
  def getStatus(): OperationStatus
  
  /**
   * Cancel this operation if it's running
   */
  def cancel(): Unit
  
  /**
   * Close this operation and clean up resources
   */
  def close(): Unit
}

/**
 * Base class for metadata operations
 */
abstract class MetadataOperation extends Operation {
  /**
   * Get the result set for this metadata operation
   * @return RowSet containing the metadata results
   */
  def getResultSet(): RowSet
  
  /**
   * Get the schema for the result set
   * @return TableSchema describing the result columns
   */
  def getResultSetSchema(): TableSchema
}

Session Context Management

The operation manager maintains the critical mapping between sessions and their SQL contexts.

// Session to context mapping
val sessionToContexts: ConcurrentHashMap[SessionHandle, SQLContext]

// Context lifecycle management
def associateContext(sessionHandle: SessionHandle, sqlContext: SQLContext): Unit = {
  sessionToContexts.put(sessionHandle, sqlContext)
}

def getContextForSession(sessionHandle: SessionHandle): SQLContext = {
  val context = sessionToContexts.get(sessionHandle)
  require(context != null, s"Session handle: $sessionHandle has not been initialized or had already closed.")
  context
}

def removeContextForSession(sessionHandle: SessionHandle): SQLContext = {
  sessionToContexts.remove(sessionHandle)
}

Usage Examples:

// Session context lifecycle
val sessionHandle = // ... from session manager
val sqlContext = SparkSQLEnv.sqlContext.newSession()

// Associate context when session opens
operationManager.sessionToContexts.put(sessionHandle, sqlContext)

// Use context for operations
val context = operationManager.sessionToContexts.get(sessionHandle)
require(context != null, "Session not found")

// Clean up when session closes
operationManager.sessionToContexts.remove(sessionHandle)

Background Execution

Operations can be executed asynchronously when configured appropriately.

// Asynchronous execution configuration
val runInBackground = async && conf.getConf(HiveUtils.HIVE_THRIFT_SERVER_ASYNC)

// Background execution is determined by:
// 1. Client request (async parameter)
// 2. Server configuration (spark.sql.hive.thriftServer.async)
// 3. Operation type (some operations are always synchronous)

Configuration Examples:

// Enable async execution globally
spark.conf.set("spark.sql.hive.thriftServer.async", "true")

// Client requests async execution
val executeOp = operationManager.newExecuteStatementOperation(
  session, statement, confOverlay, 
  async = true,  // Request async execution
  queryTimeout = 30000L
)

// Check if operation is running in background
val status = executeOp.getStatus()
if (status.getState() == OperationState.RUNNING) {
  println("Operation is running asynchronously")
}

Error Handling

Comprehensive error handling for operation lifecycle management.

/**
 * Exception thrown during operation management
 */
class HiveSQLException extends SQLException {
  public HiveSQLException(String reason, String sqlState, int vendorCode)
  public HiveSQLException(String reason, String sqlState, Throwable cause)
}

// Common error scenarios
public static final String INVALID_SESSION_HANDLE = "HY000";
public static final String OPERATION_NOT_FOUND = "HY000";
public static final String OPERATION_ALREADY_CLOSED = "HY010";
public static final String QUERY_TIMEOUT = "HYT00";

Error Handling Examples:

import org.apache.hive.service.cli.HiveSQLException

try {
  val executeOp = operationManager.newExecuteStatementOperation(
    session, statement, confOverlay, async = false, queryTimeout = 30000L
  )
  
  // Operation execution...
  
} catch {
  case e: HiveSQLException if e.getSqlState == "HY000" =>
    println("Invalid session or operation handle")
  case e: HiveSQLException if e.getSqlState == "HYT00" =>
    println("Query timeout exceeded")
  case e: HiveSQLException =>
    println(s"Operation error: ${e.getMessage} (${e.getSqlState})")
  case e: IllegalArgumentException =>
    println(s"Session not initialized: ${e.getMessage}")
}

Operation Handle Management

The operation manager maintains handles to all active operations.

// Handle to operation mapping (inherited from OperationManager)
val handleToOperation: java.util.Map[OperationHandle, Operation]

// Operation handle lifecycle
def addOperation(operation: Operation): Unit = {
  handleToOperation.put(operation.getHandle(), operation)
}

def getOperation(operationHandle: OperationHandle): Operation = {
  handleToOperation.get(operationHandle)
}

def removeOperation(operationHandle: OperationHandle): Operation = {
  handleToOperation.remove(operationHandle)
}

Operation Tracking Examples:

// Track all operations for a session
def getOperationsForSession(sessionHandle: SessionHandle): List[Operation] = {
  handleToOperation.values().asScala.filter { op =>
    op.getParentSession().getSessionHandle() == sessionHandle
  }.toList
}

// Clean up operations when session closes
def closeAllOperationsForSession(sessionHandle: SessionHandle): Unit = {
  getOperationsForSession(sessionHandle).foreach { operation =>
    try {
      operation.cancel()
      operation.close()
    } catch {
      case e: Exception => 
        logWarning(s"Error closing operation ${operation.getHandle()}", e)
    }
  }
}