0
# Query Execution
1
2
SQL statement execution with result management and schema introspection. Handles the processing of SQL queries submitted by clients through the thrift interface.
3
4
## Capabilities
5
6
### Statement Execution Operation
7
8
Core operation for executing SQL statements with result streaming and cancellation support.
9
10
```scala { .api }
11
private[hive] class SparkExecuteStatementOperation(
12
parentSession: HiveSession,
13
statement: String,
14
confOverlay: JMap[String, String],
15
runInBackground: Boolean
16
) extends ExecuteStatementOperation {
17
18
/**
19
* Release execution resources and clean up operation state
20
* Should be called when operation is no longer needed
21
*/
22
def close(): Unit
23
24
/**
25
* Fetch result rows from the executed query
26
* @param order Fetch direction (FETCH_NEXT, FETCH_PRIOR, etc.)
27
* @param maxRowsL Maximum number of rows to return
28
* @return RowSet containing the requested rows
29
*/
30
def getNextRowSet(order: FetchOrientation, maxRowsL: Long): RowSet
31
32
/**
33
* Get the schema of the query result set
34
* @return TableSchema describing column names and types
35
*/
36
def getResultSetSchema: TableSchema
37
38
/**
39
* Cancel the running operation
40
* Attempts to stop query execution and clean up resources
41
*/
42
def cancel(): Unit
43
}
44
```
45
46
**Usage Example:**
47
48
```scala
49
import org.apache.hive.service.cli._
50
51
// Create and execute a statement operation
52
val operation = new SparkExecuteStatementOperation(
53
parentSession = session,
54
statement = "SELECT name, age FROM users WHERE age > 25",
55
confOverlay = Map("spark.sql.adaptive.enabled" -> "true").asJava,
56
runInBackground = true
57
)
58
59
// Get result schema
60
val schema = operation.getResultSetSchema
61
println(s"Columns: ${schema.getColumns.asScala.map(_.getColumnName).mkString(", ")}")
62
63
// Fetch results in batches
64
val rowSet = operation.getNextRowSet(FetchOrientation.FETCH_NEXT, 1000)
65
val rows = rowSet.toTRowSet.getRows.asScala
66
67
// Process results
68
rows.foreach { row =>
69
val values = row.getColVals.asScala
70
println(s"Name: ${values(0)}, Age: ${values(1)}")
71
}
72
73
// Clean up
74
operation.close()
75
```
76
77
### SQL Driver
78
79
Low-level SQL command processor that executes queries against Spark SQL.
80
81
```scala { .api }
82
private[hive] class SparkSQLDriver(
83
context: HiveContext = SparkSQLEnv.hiveContext
84
) extends Driver {
85
86
/**
87
* Initialize the driver instance
88
* Prepares the driver for query execution
89
*/
90
def init(): Unit
91
92
/**
93
* Execute a SQL command and return response
94
* @param command SQL statement to execute
95
* @return CommandProcessorResponse with execution status and metadata
96
*/
97
def run(command: String): CommandProcessorResponse
98
99
/**
100
* Close the driver and return final status
101
* @return Final status code (0 for success)
102
*/
103
def close(): Int
104
105
/**
106
* Retrieve query results incrementally
107
* @param res List to populate with result rows (modified in-place)
108
* @return true if more results are available, false if complete
109
*/
110
def getResults(res: JList[_]): Boolean
111
112
/**
113
* Get the schema of the current result set
114
* @return Schema object describing result structure
115
*/
116
def getSchema: Schema
117
118
/**
119
* Destroy the driver instance and release all resources
120
* More aggressive cleanup than close()
121
*/
122
def destroy(): Unit
123
}
124
```
125
126
**Usage Example:**
127
128
```scala
129
val driver = new SparkSQLDriver()
130
driver.init()
131
132
// Execute query
133
val response = driver.run("SELECT COUNT(*) FROM large_table")
134
if (response.getResponseCode == 0) {
135
// Get schema information
136
val schema = driver.getSchema
137
val columns = schema.getFieldSchemas.asScala
138
println(s"Result columns: ${columns.map(_.getName).mkString(", ")}")
139
140
// Fetch results
141
val results = new java.util.ArrayList[String]()
142
while (driver.getResults(results)) {
143
results.asScala.foreach(println)
144
results.clear()
145
}
146
} else {
147
println(s"Query failed: ${response.getErrorMessage}")
148
}
149
150
// Cleanup
151
val exitCode = driver.close()
152
driver.destroy()
153
```
154
155
### Operation Management
156
157
Centralized management of all active query operations across sessions.
158
159
```scala { .api }
160
private[hive] class SparkSQLOperationManager extends OperationManager {
161
162
/**
163
* Map of operation handles to active operations
164
* Used for operation lifecycle tracking
165
*/
166
val handleToOperation: JMap[OperationHandle, Operation]
167
168
/**
169
* Map of sessions to their active scheduler pools
170
* Enables resource isolation between sessions
171
*/
172
val sessionToActivePool: Map[SessionHandle, String]
173
174
/**
175
* Map of sessions to their HiveContext instances
176
* Provides session-level context isolation
177
*/
178
val sessionToContexts: Map[SessionHandle, HiveContext]
179
180
/**
181
* Create a new statement execution operation
182
* @param parentSession Session that owns this operation
183
* @param statement SQL statement to execute
184
* @param confOverlay Session-specific configuration overrides
185
* @param runInBackground Whether to execute asynchronously
186
* @return New ExecuteStatementOperation instance
187
*/
188
override def newExecuteStatementOperation(
189
parentSession: HiveSession,
190
statement: String,
191
confOverlay: JMap[String, String],
192
runInBackground: Boolean
193
): ExecuteStatementOperation
194
}
195
```
196
197
### Asynchronous Execution
198
199
Operations can be executed synchronously or asynchronously:
200
201
**Synchronous Execution:**
202
```scala
203
val operation = operationManager.newExecuteStatementOperation(
204
parentSession = session,
205
statement = "SELECT * FROM small_table",
206
confOverlay = Map.empty.asJava,
207
runInBackground = false // Execute synchronously
208
)
209
// Operation completes before returning
210
val results = operation.getNextRowSet(FetchOrientation.FETCH_NEXT, 100)
211
```
212
213
**Asynchronous Execution:**
214
```scala
215
val operation = operationManager.newExecuteStatementOperation(
216
parentSession = session,
217
statement = "SELECT * FROM huge_table",
218
confOverlay = Map.empty.asJava,
219
runInBackground = true // Execute asynchronously
220
)
221
222
// Check operation status periodically
223
while (operation.getStatus.getState == OperationState.RUNNING) {
224
Thread.sleep(1000)
225
println(s"Query progress: ${operation.getStatus.getProgressText}")
226
}
227
228
// Fetch results when complete
229
if (operation.getStatus.getState == OperationState.FINISHED) {
230
val results = operation.getNextRowSet(FetchOrientation.FETCH_NEXT, 1000)
231
}
232
```
233
234
### Result Streaming
235
236
Large result sets are streamed incrementally to avoid memory issues:
237
238
```scala
239
// Stream results in configurable batch sizes
240
var hasMore = true
241
var totalRows = 0
242
243
while (hasMore) {
244
val rowSet = operation.getNextRowSet(FetchOrientation.FETCH_NEXT, 1000)
245
val batchSize = rowSet.toTRowSet.getRows.size()
246
247
if (batchSize > 0) {
248
totalRows += batchSize
249
// Process batch
250
processResultBatch(rowSet)
251
} else {
252
hasMore = false
253
}
254
}
255
256
println(s"Processed $totalRows total rows")
257
```
258
259
### Query Cancellation
260
261
Running queries can be cancelled gracefully:
262
263
```scala
264
// Start long-running query
265
val operation = operationManager.newExecuteStatementOperation(
266
parentSession = session,
267
statement = "SELECT * FROM massive_table ORDER BY complex_calculation(col1)",
268
confOverlay = Map.empty.asJava,
269
runInBackground = true
270
)
271
272
// Cancel after timeout or user request
273
Timer.schedule(30000) { // Cancel after 30 seconds
274
operation.cancel()
275
println("Query cancelled due to timeout")
276
}
277
```
278
279
### Error Handling
280
281
Comprehensive error handling for various failure scenarios:
282
283
**Query Compilation Errors:**
284
```scala
285
val response = driver.run("SELECT invalid_column FROM non_existent_table")
286
if (response.getResponseCode != 0) {
287
response.getException() match {
288
case e: AnalysisException =>
289
println(s"SQL Analysis Error: ${e.getMessage}")
290
case e: Exception =>
291
println(s"General Error: ${response.getErrorMessage}")
292
}
293
}
294
```
295
296
**Runtime Execution Errors:**
297
```scala
298
try {
299
val results = operation.getNextRowSet(FetchOrientation.FETCH_NEXT, 1000)
300
// Process results
301
} catch {
302
case e: HiveSQLException =>
303
println(s"SQL Execution Error: ${e.getMessage}")
304
operation.cancel() // Clean up failed operation
305
}
306
```
307
308
### Performance Optimization
309
310
Several optimizations are built into the query execution system:
311
312
**Incremental Collection:**
313
- Results are collected incrementally to reduce memory usage
314
- Configurable batch sizes for result streaming
315
- Lazy evaluation where possible
316
317
**Connection Pool Management:**
318
- Reuse of database connections across queries
319
- Connection pooling to reduce setup overhead
320
- Proper connection cleanup on operation completion
321
322
**Resource Isolation:**
323
- Per-session Spark job groups for resource tracking
324
- Configurable scheduler pools for workload isolation
325
- Memory management per operation
326
327
### Integration with Spark SQL
328
329
The execution system integrates deeply with Spark SQL:
330
331
**Catalyst Optimizer:**
332
- All queries go through Spark's Catalyst optimizer
333
- Cost-based optimization when statistics are available
334
- Adaptive query execution for dynamic optimization
335
336
**DataFrame API:**
337
```scala
338
// SQL queries are converted to DataFrame operations internally
339
val sql = "SELECT name, COUNT(*) FROM users GROUP BY name"
340
val dataframe = hiveContext.sql(sql)
341
val results = dataframe.collect() // Execute and collect results
342
```
343
344
**Data Source Integration:**
345
- Supports all Spark SQL data sources (Parquet, JSON, JDBC, etc.)
346
- Pushdown optimizations for compatible sources
347
- Schema inference and evolution
348
349
### Monitoring and Metrics
350
351
Query execution is monitored through multiple mechanisms:
352
353
**Spark Listeners:**
354
```scala
355
// Queries are tracked through Spark's event system
356
sparkContext.addSparkListener(new HiveThriftServer2Listener(server, conf))
357
```
358
359
**Operation Statistics:**
360
- Query compilation time
361
- Execution duration
362
- Rows processed and returned
363
- Resource usage (CPU, memory, I/O)
364
365
**Web UI Integration:**
366
- Real-time query status in Spark UI
367
- Query plans and execution details
368
- Performance metrics and bottleneck identification