0
# SQL Execution
1
2
SQL statement execution with Spark SQL engine integration, result handling, and comprehensive operation management for query processing.
3
4
## Capabilities
5
6
### SparkExecuteStatementOperation
7
8
Executes SQL statements using the Spark SQL engine with full result set management and cancellation support.
9
10
```scala { .api }
11
/**
12
* SQL statement execution operation using Spark SQL engine
13
* Handles query execution, result fetching, and cancellation
14
*/
15
class SparkExecuteStatementOperation extends ExecuteStatementOperation {
16
/**
17
* Fetch the next set of result rows
18
* @param order Fetch orientation (FETCH_NEXT, FETCH_PRIOR, etc.)
19
* @param maxRowsL Maximum number of rows to fetch
20
* @return TRowSet containing the fetched rows
21
*/
22
def getNextRowSet(order: FetchOrientation, maxRowsL: Long): TRowSet
23
24
/**
25
* Get the schema of the result set
26
* @return TTableSchema describing the result columns and types
27
*/
28
def getResultSetSchema: TTableSchema
29
30
/**
31
* Internal method that executes the SQL statement
32
* Called by the operation framework
33
*/
34
def runInternal(): Unit
35
36
/**
37
* Cancel the currently executing statement
38
*/
39
def cancel(): Unit
40
41
/**
42
* Cancel the statement due to timeout
43
*/
44
def timeoutCancel(): Unit
45
}
46
```
47
48
**Usage Examples:**
49
50
```scala
51
import org.apache.spark.sql.hive.thriftserver.SparkExecuteStatementOperation
52
import org.apache.hive.service.cli.{FetchOrientation, OperationHandle}
53
54
// Execute a SQL statement (typically done through CLI service)
55
val statement = "SELECT name, age, department FROM employees WHERE age > 25"
56
val confOverlay = Map("spark.sql.adaptive.enabled" -> "true").asJava
57
58
// Get operation handle from session manager
59
val operationHandle = sessionManager.executeStatement(sessionHandle, statement, confOverlay)
60
61
// Fetch results
62
val resultSet = operation.getNextRowSet(FetchOrientation.FETCH_NEXT, 1000)
63
val schema = operation.getResultSetSchema()
64
65
// Process results
66
val columns = schema.getColumns().asScala
67
println(s"Result has ${columns.size} columns:")
68
columns.foreach { col =>
69
println(s" ${col.getColumnName()}: ${col.getTypeDesc()}")
70
}
71
```
72
73
### SparkSQLDriver
74
75
SQL driver that provides Hive Driver interface compatibility while using Spark SQL for execution.
76
77
```scala { .api }
78
/**
79
* SQL driver providing Hive Driver compatibility with Spark SQL execution
80
* @param context SQL context to use for query execution (defaults to SparkSQLEnv.sqlContext)
81
*/
82
class SparkSQLDriver(val context: SQLContext = SparkSQLEnv.sqlContext) extends Driver {
83
/**
84
* Execute a SQL command
85
* @param command SQL statement to execute
86
* @return CommandProcessorResponse indicating success/failure and details
87
*/
88
def run(command: String): CommandProcessorResponse
89
90
/**
91
* Get results from the last executed command
92
* @param res List to populate with result strings
93
* @return true if more results are available, false otherwise
94
*/
95
def getResults(res: JList[_]): Boolean
96
97
/**
98
* Get the schema of the last query result
99
* @return Schema object describing result structure
100
*/
101
def getSchema: Schema
102
103
/**
104
* Initialize the driver
105
*/
106
def init(): Unit
107
108
/**
109
* Close the driver and release resources
110
* @return 0 on success
111
*/
112
def close(): Int
113
114
/**
115
* Destroy the driver and clean up all resources
116
*/
117
def destroy(): Unit
118
}
119
```
120
121
**Usage Examples:**
122
123
```scala
124
import org.apache.spark.sql.hive.thriftserver.SparkSQLDriver
125
import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse
126
127
// Create and initialize driver
128
val driver = new SparkSQLDriver()
129
driver.init()
130
131
try {
132
// Execute SQL command
133
val response = driver.run("SELECT COUNT(*) FROM sales WHERE year = 2023")
134
135
if (response.getResponseCode == 0) {
136
// Success - get results
137
val results = new java.util.ArrayList[String]()
138
while (driver.getResults(results)) {
139
results.asScala.foreach(println)
140
results.clear()
141
}
142
143
// Get schema information
144
val schema = driver.getSchema
145
if (schema != null && schema.getFieldSchemas != null) {
146
schema.getFieldSchemas.asScala.foreach { field =>
147
println(s"Column: ${field.getName}, Type: ${field.getType}")
148
}
149
}
150
} else {
151
// Error occurred
152
println(s"Query failed: ${response.getErrorMessage}")
153
}
154
155
} finally {
156
driver.close()
157
driver.destroy()
158
}
159
```
160
161
### Query Execution Process
162
163
The query execution process involves multiple stages from parsing to result delivery.
164
165
```scala { .api }
166
// Query execution stages
167
class QueryExecution {
168
def analyzed: LogicalPlan // Analyzed logical plan
169
def optimizedPlan: LogicalPlan // Optimized logical plan
170
def sparkPlan: SparkPlan // Physical execution plan
171
def executedPlan: SparkPlan // Executed physical plan
172
}
173
174
// Execution context and job management
175
object SQLExecution {
176
/**
177
* Execute code with a new execution ID for tracking
178
* @param queryExecution Query execution context
179
* @param name Optional name for the execution
180
* @param body Code block to execute
181
* @return Result of the code block
182
*/
183
def withNewExecutionId[T](queryExecution: QueryExecution, name: Option[String])(body: => T): T
184
}
185
```
186
187
### Result Set Management
188
189
Comprehensive result set handling with multiple format support and efficient data transfer.
190
191
```scala { .api }
192
/**
193
* Base class for result sets
194
*/
195
abstract class RowSet {
196
/**
197
* Add a row to the result set
198
* @param row Array of objects representing the row data
199
*/
200
def addRow(row: Array[Object]): Unit
201
202
/**
203
* Convert to Thrift row set format
204
* @return TRowSet for network transfer
205
*/
206
def toTRowSet(): TRowSet
207
208
/**
209
* Get the number of rows in the result set
210
* @return Row count
211
*/
212
def numRows(): Int
213
}
214
215
/**
216
* Row-based result set implementation
217
*/
218
class RowBasedSet extends RowSet {
219
/**
220
* Create row-based result set with schema
221
* @param schema Table schema defining columns and types
222
*/
223
def this(schema: TableSchema)
224
}
225
226
/**
227
* Column-based result set implementation (more efficient for large results)
228
*/
229
class ColumnBasedSet extends RowSet {
230
/**
231
* Create column-based result set with schema
232
* @param schema Table schema defining columns and types
233
*/
234
def this(schema: TableSchema)
235
}
236
```
237
238
**Usage Examples:**
239
240
```scala
241
import org.apache.spark.sql.hive.thriftserver.RowSetUtils
242
import org.apache.hive.service.cli.{RowSet, TableSchema, ColumnDescriptor}
243
244
// Create schema for results
245
val schema = new TableSchema()
246
schema.addColumn(new ColumnDescriptor("name", "string", "Employee name"))
247
schema.addColumn(new ColumnDescriptor("age", "int", "Employee age"))
248
schema.addColumn(new ColumnDescriptor("salary", "decimal(10,2)", "Employee salary"))
249
250
// Create result set (row-based for small results)
251
val rowSet = RowSetFactory.create(schema, ProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V10, false)
252
253
// Add data rows
254
rowSet.addRow(Array("Alice", Integer.valueOf(30), new java.math.BigDecimal("75000.00")))
255
rowSet.addRow(Array("Bob", Integer.valueOf(25), new java.math.BigDecimal("65000.00")))
256
257
// Convert to Thrift format for network transfer
258
val tRowSet = rowSet.toTRowSet()
259
```
260
261
### FetchIterator
262
263
Iterator abstraction for efficient result fetching with support for different fetch orientations.
264
265
```scala { .api }
266
/**
267
* Base trait for result fetching iterators
268
*/
269
sealed trait FetchIterator[A] extends Iterator[A] {
270
/**
271
* Get the fetch type for this iterator
272
* @return FetchType indicating the data source
273
*/
274
def getFetchType(): FetchType
275
}
276
277
/**
278
* Iterator for query output results
279
*/
280
class ArrayFetchIterator[A](iter: Iterator[A]) extends FetchIterator[A] {
281
def hasNext: Boolean = iter.hasNext
282
def next(): A = iter.next()
283
def getFetchType(): FetchType = FetchType.QUERY_OUTPUT
284
}
285
286
/**
287
* Iterator for operation logs
288
*/
289
class IterableFetchIterator[A](iterable: Iterable[A]) extends FetchIterator[A] {
290
def getFetchType(): FetchType = FetchType.LOG
291
}
292
```
293
294
### Operation States and Lifecycle
295
296
Operations progress through well-defined states during their lifecycle.
297
298
```java { .api }
299
/**
300
* Possible states for operations
301
*/
302
enum OperationState {
303
INITIALIZED, // Operation created but not started
304
RUNNING, // Operation currently executing
305
FINISHED, // Operation completed successfully
306
CANCELED, // Operation was canceled
307
CLOSED, // Operation closed and resources cleaned up
308
ERROR, // Operation failed with error
309
UNKNOWN // State cannot be determined
310
}
311
312
/**
313
* Operation status information
314
*/
315
class OperationStatus {
316
/**
317
* Get the current state of the operation
318
* @return OperationState indicating current status
319
*/
320
public OperationState getState()
321
322
/**
323
* Get error information if operation failed
324
* @return HiveSQLException with error details, or null if no error
325
*/
326
public HiveSQLException getOperationException()
327
328
/**
329
* Get the operation start time
330
* @return Start time in milliseconds since epoch
331
*/
332
public long getOperationStarted()
333
334
/**
335
* Get the operation completion time
336
* @return Completion time in milliseconds since epoch, or 0 if not completed
337
*/
338
public long getOperationCompleted()
339
}
340
```
341
342
**Usage Examples:**
343
344
```scala
345
import org.apache.hive.service.cli.{OperationState, OperationStatus}
346
347
// Check operation status
348
val operationStatus = cliService.getOperationStatus(operationHandle)
349
350
operationStatus.getState match {
351
case OperationState.RUNNING =>
352
println("Query is still executing...")
353
case OperationState.FINISHED =>
354
println("Query completed successfully")
355
val duration = operationStatus.getOperationCompleted - operationStatus.getOperationStarted
356
println(s"Execution time: ${duration}ms")
357
case OperationState.ERROR =>
358
val exception = operationStatus.getOperationException
359
println(s"Query failed: ${exception.getMessage}")
360
case OperationState.CANCELED =>
361
println("Query was canceled")
362
case _ =>
363
println(s"Query state: ${operationStatus.getState}")
364
}
365
```
366
367
### Error Handling and Exceptions
368
369
Comprehensive error handling for SQL execution with detailed error information.
370
371
```scala { .api }
372
/**
373
* Spark-specific throwable with SQL state information
374
*/
375
trait SparkThrowable extends Throwable {
376
def getSqlState: String
377
def getErrorClass: String
378
}
379
380
/**
381
* Command processor response containing execution results and errors
382
*/
383
class CommandProcessorResponse {
384
/**
385
* Create response indicating success
386
* @param responseCode 0 for success, non-zero for failure
387
*/
388
def this(responseCode: Int)
389
390
/**
391
* Create response with error information
392
* @param responseCode Non-zero error code
393
* @param errorMessage Error message
394
* @param sqlState SQL state code
395
* @param exception Underlying exception
396
*/
397
def this(responseCode: Int, errorMessage: String, sqlState: String, exception: Throwable)
398
399
def getResponseCode(): Int
400
def getErrorMessage(): String
401
def getSQLState(): String
402
def getException(): Throwable
403
}
404
```
405
406
**Error Handling Examples:**
407
408
```scala
409
import org.apache.spark.SparkThrowable
410
import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse
411
412
try {
413
val response = driver.run("SELECT * FROM non_existent_table")
414
415
if (response.getResponseCode != 0) {
416
val exception = response.getException
417
exception match {
418
case st: SparkThrowable =>
419
println(s"Spark SQL Error: ${st.getMessage}")
420
println(s"Error Class: ${st.getErrorClass}")
421
println(s"SQL State: ${st.getSqlState}")
422
case _ =>
423
println(s"General Error: ${exception.getMessage}")
424
println(s"SQL State: ${response.getSQLState}")
425
}
426
}
427
} catch {
428
case e: Exception =>
429
println(s"Unexpected error during query execution: ${e.getMessage}")
430
}
431
```
432
433
### Query Cancellation
434
435
Support for canceling long-running queries with proper resource cleanup.
436
437
```scala { .api }
438
// Cancel operation through CLI service
439
cliService.cancelOperation(operationHandle)
440
441
// Cancel through operation object
442
operation.cancel()
443
444
// Timeout-based cancellation
445
operation.timeoutCancel()
446
447
// Check if operation supports cancellation
448
val operation = operationManager.getOperation(operationHandle)
449
if (operation.shouldRunAsync()) {
450
// Async operations support cancellation
451
operation.cancel()
452
}
453
```
454
455
**Cancellation Examples:**
456
457
```scala
458
import java.util.concurrent.{Executors, TimeUnit}
459
460
// Start a long-running query
461
val operationHandle = cliService.executeStatementAsync(
462
sessionHandle,
463
"SELECT * FROM large_table ORDER BY column1",
464
Map.empty.asJava
465
)
466
467
// Set up cancellation after timeout
468
val executor = Executors.newSingleThreadScheduledExecutor()
469
executor.schedule(new Runnable {
470
def run(): Unit = {
471
try {
472
cliService.cancelOperation(operationHandle)
473
println("Query canceled due to timeout")
474
} catch {
475
case e: Exception => println(s"Error canceling query: ${e.getMessage}")
476
}
477
}
478
}, 30, TimeUnit.SECONDS)
479
480
// Monitor query progress
481
while (true) {
482
val status = cliService.getOperationStatus(operationHandle)
483
status.getState match {
484
case OperationState.FINISHED =>
485
println("Query completed")
486
break
487
case OperationState.CANCELED =>
488
println("Query was canceled")
489
break
490
case OperationState.ERROR =>
491
println("Query failed")
492
break
493
case _ =>
494
Thread.sleep(1000) // Poll every second
495
}
496
}
497
498
executor.shutdown()
499
```