or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

cli-driver.mdcli-services.mdindex.mdmetadata-operations.mdoperation-management.mdserver-management.mdsession-management.mdsql-execution.mdweb-ui.md
tile.json

sql-execution.mddocs/

SQL Execution

SQL statement execution with Spark SQL engine integration, result handling, and comprehensive operation management for query processing.

Capabilities

SparkExecuteStatementOperation

Executes SQL statements using the Spark SQL engine with full result set management and cancellation support.

/**
 * SQL statement execution operation using Spark SQL engine
 * Handles query execution, result fetching, and cancellation
 */
class SparkExecuteStatementOperation extends ExecuteStatementOperation {
  /**
   * Fetch the next set of result rows
   * @param order Fetch orientation (FETCH_NEXT, FETCH_PRIOR, etc.)
   * @param maxRowsL Maximum number of rows to fetch
   * @return TRowSet containing the fetched rows
   */
  def getNextRowSet(order: FetchOrientation, maxRowsL: Long): TRowSet
  
  /**
   * Get the schema of the result set
   * @return TTableSchema describing the result columns and types
   */
  def getResultSetSchema: TTableSchema
  
  /**
   * Internal method that executes the SQL statement
   * Called by the operation framework
   */
  def runInternal(): Unit
  
  /**
   * Cancel the currently executing statement
   */
  def cancel(): Unit
  
  /**
   * Cancel the statement due to timeout
   */
  def timeoutCancel(): Unit
}

Usage Examples:

import org.apache.spark.sql.hive.thriftserver.SparkExecuteStatementOperation
import org.apache.hive.service.cli.{FetchOrientation, OperationHandle}

// Execute a SQL statement (typically done through CLI service)
val statement = "SELECT name, age, department FROM employees WHERE age > 25"
val confOverlay = Map("spark.sql.adaptive.enabled" -> "true").asJava

// Get operation handle from session manager
val operationHandle = sessionManager.executeStatement(sessionHandle, statement, confOverlay)

// Fetch results
val resultSet = operation.getNextRowSet(FetchOrientation.FETCH_NEXT, 1000)
val schema = operation.getResultSetSchema()

// Process results
val columns = schema.getColumns().asScala
println(s"Result has ${columns.size} columns:")
columns.foreach { col =>
  println(s"  ${col.getColumnName()}: ${col.getTypeDesc()}")
}

SparkSQLDriver

SQL driver that provides Hive Driver interface compatibility while using Spark SQL for execution.

/**
 * SQL driver providing Hive Driver compatibility with Spark SQL execution
 * @param context SQL context to use for query execution (defaults to SparkSQLEnv.sqlContext)
 */
class SparkSQLDriver(val context: SQLContext = SparkSQLEnv.sqlContext) extends Driver {
  /**
   * Execute a SQL command
   * @param command SQL statement to execute
   * @return CommandProcessorResponse indicating success/failure and details
   */
  def run(command: String): CommandProcessorResponse
  
  /**
   * Get results from the last executed command
   * @param res List to populate with result strings
   * @return true if more results are available, false otherwise
   */
  def getResults(res: JList[_]): Boolean
  
  /**
   * Get the schema of the last query result
   * @return Schema object describing result structure
   */
  def getSchema: Schema
  
  /**
   * Initialize the driver
   */
  def init(): Unit
  
  /**
   * Close the driver and release resources
   * @return 0 on success
   */
  def close(): Int
  
  /**
   * Destroy the driver and clean up all resources
   */
  def destroy(): Unit
}

Usage Examples:

import org.apache.spark.sql.hive.thriftserver.SparkSQLDriver
import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse

// Create and initialize driver
val driver = new SparkSQLDriver()
driver.init()

try {
  // Execute SQL command
  val response = driver.run("SELECT COUNT(*) FROM sales WHERE year = 2023")
  
  if (response.getResponseCode == 0) {
    // Success - get results
    val results = new java.util.ArrayList[String]()
    while (driver.getResults(results)) {
      results.asScala.foreach(println)
      results.clear()
    }
    
    // Get schema information
    val schema = driver.getSchema
    if (schema != null && schema.getFieldSchemas != null) {
      schema.getFieldSchemas.asScala.foreach { field =>
        println(s"Column: ${field.getName}, Type: ${field.getType}")
      }
    }
  } else {
    // Error occurred
    println(s"Query failed: ${response.getErrorMessage}")
  }
  
} finally {
  driver.close()
  driver.destroy()
}

Query Execution Process

The query execution process involves multiple stages from parsing to result delivery.

// Query execution stages
class QueryExecution {
  def analyzed: LogicalPlan        // Analyzed logical plan
  def optimizedPlan: LogicalPlan   // Optimized logical plan  
  def sparkPlan: SparkPlan         // Physical execution plan
  def executedPlan: SparkPlan      // Executed physical plan
}

// Execution context and job management
object SQLExecution {
  /**
   * Execute code with a new execution ID for tracking
   * @param queryExecution Query execution context
   * @param name Optional name for the execution
   * @param body Code block to execute
   * @return Result of the code block
   */
  def withNewExecutionId[T](queryExecution: QueryExecution, name: Option[String])(body: => T): T
}

Result Set Management

Comprehensive result set handling with multiple format support and efficient data transfer.

/**
 * Base class for result sets
 */
abstract class RowSet {
  /**
   * Add a row to the result set
   * @param row Array of objects representing the row data
   */
  def addRow(row: Array[Object]): Unit
  
  /**
   * Convert to Thrift row set format
   * @return TRowSet for network transfer
   */
  def toTRowSet(): TRowSet
  
  /**
   * Get the number of rows in the result set
   * @return Row count
   */
  def numRows(): Int
}

/**
 * Row-based result set implementation
 */
class RowBasedSet extends RowSet {
  /**
   * Create row-based result set with schema
   * @param schema Table schema defining columns and types
   */
  def this(schema: TableSchema)
}

/**
 * Column-based result set implementation (more efficient for large results)
 */
class ColumnBasedSet extends RowSet {
  /**
   * Create column-based result set with schema
   * @param schema Table schema defining columns and types
   */
  def this(schema: TableSchema)
}

Usage Examples:

import org.apache.spark.sql.hive.thriftserver.RowSetUtils
import org.apache.hive.service.cli.{RowSet, TableSchema, ColumnDescriptor}

// Create schema for results
val schema = new TableSchema()
schema.addColumn(new ColumnDescriptor("name", "string", "Employee name"))
schema.addColumn(new ColumnDescriptor("age", "int", "Employee age"))
schema.addColumn(new ColumnDescriptor("salary", "decimal(10,2)", "Employee salary"))

// Create result set (row-based for small results)
val rowSet = RowSetFactory.create(schema, ProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V10, false)

// Add data rows
rowSet.addRow(Array("Alice", Integer.valueOf(30), new java.math.BigDecimal("75000.00")))
rowSet.addRow(Array("Bob", Integer.valueOf(25), new java.math.BigDecimal("65000.00")))

// Convert to Thrift format for network transfer
val tRowSet = rowSet.toTRowSet()

FetchIterator

Iterator abstraction for efficient result fetching with support for different fetch orientations.

/**
 * Base trait for result fetching iterators
 */
sealed trait FetchIterator[A] extends Iterator[A] {
  /**
   * Get the fetch type for this iterator
   * @return FetchType indicating the data source
   */
  def getFetchType(): FetchType
}

/**
 * Iterator for query output results
 */
class ArrayFetchIterator[A](iter: Iterator[A]) extends FetchIterator[A] {
  def hasNext: Boolean = iter.hasNext
  def next(): A = iter.next()
  def getFetchType(): FetchType = FetchType.QUERY_OUTPUT
}

/**
 * Iterator for operation logs
 */
class IterableFetchIterator[A](iterable: Iterable[A]) extends FetchIterator[A] {
  def getFetchType(): FetchType = FetchType.LOG
}

Operation States and Lifecycle

Operations progress through well-defined states during their lifecycle.

/**
 * Possible states for operations
 */
enum OperationState {
  INITIALIZED,  // Operation created but not started
  RUNNING,      // Operation currently executing
  FINISHED,     // Operation completed successfully
  CANCELED,     // Operation was canceled
  CLOSED,       // Operation closed and resources cleaned up
  ERROR,        // Operation failed with error
  UNKNOWN       // State cannot be determined
}

/**
 * Operation status information
 */
class OperationStatus {
  /**
   * Get the current state of the operation
   * @return OperationState indicating current status
   */
  public OperationState getState()
  
  /**
   * Get error information if operation failed
   * @return HiveSQLException with error details, or null if no error
   */
  public HiveSQLException getOperationException()
  
  /**
   * Get the operation start time
   * @return Start time in milliseconds since epoch
   */
  public long getOperationStarted()
  
  /**
   * Get the operation completion time
   * @return Completion time in milliseconds since epoch, or 0 if not completed
   */
  public long getOperationCompleted()
}

Usage Examples:

import org.apache.hive.service.cli.{OperationState, OperationStatus}

// Check operation status
val operationStatus = cliService.getOperationStatus(operationHandle)

operationStatus.getState match {
  case OperationState.RUNNING =>
    println("Query is still executing...")
  case OperationState.FINISHED =>
    println("Query completed successfully")
    val duration = operationStatus.getOperationCompleted - operationStatus.getOperationStarted
    println(s"Execution time: ${duration}ms")
  case OperationState.ERROR =>
    val exception = operationStatus.getOperationException
    println(s"Query failed: ${exception.getMessage}")
  case OperationState.CANCELED =>
    println("Query was canceled")
  case _ =>
    println(s"Query state: ${operationStatus.getState}")
}

Error Handling and Exceptions

Comprehensive error handling for SQL execution with detailed error information.

/**
 * Spark-specific throwable with SQL state information
 */
trait SparkThrowable extends Throwable {
  def getSqlState: String
  def getErrorClass: String
}

/**
 * Command processor response containing execution results and errors
 */
class CommandProcessorResponse {
  /**
   * Create response indicating success
   * @param responseCode 0 for success, non-zero for failure
   */
  def this(responseCode: Int)
  
  /**
   * Create response with error information
   * @param responseCode Non-zero error code
   * @param errorMessage Error message
   * @param sqlState SQL state code
   * @param exception Underlying exception
   */
  def this(responseCode: Int, errorMessage: String, sqlState: String, exception: Throwable)
  
  def getResponseCode(): Int
  def getErrorMessage(): String
  def getSQLState(): String
  def getException(): Throwable
}

Error Handling Examples:

import org.apache.spark.SparkThrowable
import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse

try {
  val response = driver.run("SELECT * FROM non_existent_table")
  
  if (response.getResponseCode != 0) {
    val exception = response.getException
    exception match {
      case st: SparkThrowable =>
        println(s"Spark SQL Error: ${st.getMessage}")
        println(s"Error Class: ${st.getErrorClass}")
        println(s"SQL State: ${st.getSqlState}")
      case _ =>
        println(s"General Error: ${exception.getMessage}")
        println(s"SQL State: ${response.getSQLState}")
    }
  }
} catch {
  case e: Exception =>
    println(s"Unexpected error during query execution: ${e.getMessage}")
}

Query Cancellation

Support for canceling long-running queries with proper resource cleanup.

// Cancel operation through CLI service
cliService.cancelOperation(operationHandle)

// Cancel through operation object
operation.cancel()

// Timeout-based cancellation
operation.timeoutCancel()

// Check if operation supports cancellation
val operation = operationManager.getOperation(operationHandle)
if (operation.shouldRunAsync()) {
  // Async operations support cancellation
  operation.cancel()
}

Cancellation Examples:

import java.util.concurrent.{Executors, TimeUnit}

// Start a long-running query
val operationHandle = cliService.executeStatementAsync(
  sessionHandle, 
  "SELECT * FROM large_table ORDER BY column1", 
  Map.empty.asJava
)

// Set up cancellation after timeout
val executor = Executors.newSingleThreadScheduledExecutor()
executor.schedule(new Runnable {
  def run(): Unit = {
    try {
      cliService.cancelOperation(operationHandle)
      println("Query canceled due to timeout")
    } catch {
      case e: Exception => println(s"Error canceling query: ${e.getMessage}")
    }
  }
}, 30, TimeUnit.SECONDS)

// Monitor query progress
while (true) {
  val status = cliService.getOperationStatus(operationHandle)
  status.getState match {
    case OperationState.FINISHED =>
      println("Query completed")
      break
    case OperationState.CANCELED =>
      println("Query was canceled")
      break
    case OperationState.ERROR =>
      println("Query failed")
      break
    case _ =>
      Thread.sleep(1000) // Poll every second
  }
}

executor.shutdown()