0
# Query Operations
1
2
Query operations manage SQL statement execution, result handling, and asynchronous processing within the Spark Hive Thrift Server.
3
4
## Operation Manager
5
6
### SparkSQLOperationManager
7
8
Central manager for all SQL query execution operations with session context management.
9
10
```scala { .api }
11
private[thriftserver] class SparkSQLOperationManager extends OperationManager with Logging {
12
val handleToOperation: JMap[OperationHandle, Operation]
13
val sessionToActivePool: ConcurrentHashMap[SessionHandle, String]
14
val sessionToContexts: ConcurrentHashMap[SessionHandle, SQLContext]
15
16
def newExecuteStatementOperation(parentSession: HiveSession, statement: String,
17
confOverlay: JMap[String, String], async: Boolean): ExecuteStatementOperation
18
def setConfMap(conf: SQLConf, confMap: java.util.Map[String, String]): Unit
19
}
20
```
21
22
#### Operation Creation
23
24
The `newExecuteStatementOperation` method creates new query execution operations:
25
26
**Usage Example:**
27
28
```scala
29
import java.util.{Map => JMap}
30
import org.apache.hive.service.cli.session.HiveSession
31
32
val confOverlay: JMap[String, String] = new java.util.HashMap()
33
confOverlay.put("spark.sql.adaptive.enabled", "true")
34
35
val operation = operationManager.newExecuteStatementOperation(
36
parentSession = hiveSession,
37
statement = "SELECT * FROM users WHERE age > 21",
38
confOverlay = confOverlay,
39
async = true
40
)
41
```
42
43
**Operation Creation Process:**
44
45
1. **Session Validation**: Ensures session has valid SQL context
46
2. **Configuration Merging**: Applies session and query-specific configuration
47
3. **Background Execution**: Determines if query should run asynchronously
48
4. **Operation Creation**: Creates `SparkExecuteStatementOperation` instance
49
5. **Registration**: Registers operation handle for tracking
50
51
#### Configuration Management
52
53
The `setConfMap` method applies configuration overrides to SQL contexts:
54
55
```scala
56
def setConfMap(conf: SQLConf, confMap: java.util.Map[String, String]): Unit = {
57
val iterator = confMap.entrySet().iterator()
58
while (iterator.hasNext) {
59
val kv = iterator.next()
60
conf.setConfString(kv.getKey, kv.getValue)
61
}
62
}
63
```
64
65
**Configuration Sources:**
66
- **Session State**: Hive session override configurations
67
- **Session Variables**: User-set Hive variables
68
- **Query Overlay**: Statement-specific configuration parameters
69
70
## Statement Execution
71
72
### SparkExecuteStatementOperation
73
74
Individual query execution operation with result management and data type handling.
75
76
```scala { .api }
77
private[hive] class SparkExecuteStatementOperation(
78
parentSession: HiveSession,
79
statement: String,
80
confOverlay: JMap[String, String],
81
runInBackground: Boolean = true
82
)(sqlContext: SQLContext, sessionToActivePool: JMap[SessionHandle, String])
83
extends ExecuteStatementOperation with Logging {
84
85
def close(): Unit
86
def addNonNullColumnValue(from: SparkRow, to: ArrayBuffer[Any], ordinal: Int): Unit
87
}
88
```
89
90
#### Query Execution Process
91
92
**Synchronous Execution:**
93
```scala
94
// Execute immediately and wait for results
95
val operation = new SparkExecuteStatementOperation(
96
session, "SELECT COUNT(*) FROM large_table", confOverlay, runInBackground = false
97
)(sqlContext, sessionToActivePool)
98
```
99
100
**Asynchronous Execution:**
101
```scala
102
// Execute in background thread
103
val operation = new SparkExecuteStatementOperation(
104
session, "SELECT * FROM large_table", confOverlay, runInBackground = true
105
)(sqlContext, sessionToActivePool)
106
```
107
108
#### Result Processing
109
110
The operation handles different data types in query results:
111
112
```scala
113
def addNonNullColumnValue(from: SparkRow, to: ArrayBuffer[Any], ordinal: Int): Unit = {
114
dataTypes(ordinal) match {
115
case StringType => to += from.getString(ordinal)
116
case IntegerType => to += from.getInt(ordinal)
117
case BooleanType => to += from.getBoolean(ordinal)
118
case DoubleType => to += from.getDouble(ordinal)
119
case FloatType => to += from.getFloat(ordinal)
120
case DecimalType() => to += from.getDecimal(ordinal)
121
case LongType => to += from.getLong(ordinal)
122
case ByteType => to += from.getByte(ordinal)
123
case ShortType => to += from.getShort(ordinal)
124
case DateType => to += from.getAs[Date](ordinal)
125
case TimestampType => to += from.getAs[Timestamp](ordinal)
126
// Additional type handling...
127
}
128
}
129
```
130
131
**Supported Data Types:**
132
- **Primitive Types**: String, Integer, Boolean, Double, Float, Long, Byte, Short
133
- **Decimal Types**: High-precision decimal numbers
134
- **Date/Time Types**: Date and Timestamp values
135
- **Complex Types**: Arrays, Maps, Structs (via nested processing)
136
137
#### Resource Management
138
139
Operations include comprehensive resource cleanup:
140
141
```scala
142
def close(): Unit = {
143
// RDDs will be cleaned automatically upon garbage collection
144
logDebug(s"CLOSING $statementId")
145
cleanup(OperationState.CLOSED)
146
sqlContext.sparkContext.clearJobGroup()
147
}
148
```
149
150
**Cleanup Process:**
151
1. **State Update**: Marks operation as closed
152
2. **Job Cancellation**: Cancels any running Spark jobs
153
3. **Memory Cleanup**: Releases cached result data
154
4. **Resource Release**: Frees system resources
155
156
## Execution Tracking
157
158
### ExecutionInfo
159
160
Detailed tracking of query execution lifecycle and performance metrics.
161
162
```scala { .api }
163
private[thriftserver] class ExecutionInfo(
164
statement: String,
165
sessionId: String,
166
startTimestamp: Long,
167
userName: String
168
) {
169
var finishTimestamp: Long = 0L
170
var executePlan: String = ""
171
var detail: String = ""
172
var state: ExecutionState.Value = ExecutionState.STARTED
173
val jobId: ArrayBuffer[String] = ArrayBuffer[String]()
174
var groupId: String = ""
175
def totalTime: Long
176
}
177
```
178
179
#### Execution States
180
181
```scala { .api }
182
private[thriftserver] object ExecutionState extends Enumeration {
183
val STARTED, COMPILED, FAILED, FINISHED = Value
184
type ExecutionState = Value
185
}
186
```
187
188
**State Transitions:**
189
1. **STARTED**: Query execution begins
190
2. **COMPILED**: SQL parsed and execution plan generated
191
3. **FAILED**: Query execution encountered error
192
4. **FINISHED**: Query completed successfully
193
194
#### Event Tracking
195
196
**Query Start:**
197
```scala
198
def onStatementStart(id: String, sessionId: String, statement: String,
199
groupId: String, userName: String = "UNKNOWN"): Unit = synchronized {
200
val info = new ExecutionInfo(statement, sessionId, System.currentTimeMillis, userName)
201
info.state = ExecutionState.STARTED
202
executionList.put(id, info)
203
trimExecutionIfNecessary()
204
sessionList(sessionId).totalExecution += 1
205
executionList(id).groupId = groupId
206
totalRunning += 1
207
}
208
```
209
210
**Query Completion:**
211
```scala
212
def onStatementFinish(id: String): Unit = synchronized {
213
executionList(id).finishTimestamp = System.currentTimeMillis
214
executionList(id).state = ExecutionState.FINISHED
215
totalRunning -= 1
216
trimExecutionIfNecessary()
217
}
218
```
219
220
**Error Handling:**
221
```scala
222
def onStatementError(id: String, errorMessage: String, errorTrace: String): Unit = {
223
synchronized {
224
executionList(id).finishTimestamp = System.currentTimeMillis
225
executionList(id).detail = errorMessage
226
executionList(id).state = ExecutionState.FAILED
227
totalRunning -= 1
228
trimExecutionIfNecessary()
229
}
230
}
231
```
232
233
## Performance Optimization
234
235
### Result Caching
236
237
Operations support different result collection strategies:
238
239
**Incremental Collection:**
240
```scala
241
// Enable incremental result fetching
242
spark.sql.thriftServer.incrementalCollect=true
243
```
244
- Results streamed as they become available
245
- Lower memory usage for large result sets
246
- Better responsiveness for interactive queries
247
248
**Full Collection:**
249
```scala
250
// Cache complete results (default)
251
spark.sql.thriftServer.incrementalCollect=false
252
```
253
- All results cached in memory
254
- Supports FETCH_FIRST operations
255
- Higher memory usage but better performance for small results
256
257
### Asynchronous Processing
258
259
Background execution prevents client blocking:
260
261
```scala
262
val runInBackground = async && conf.getConf(HiveUtils.HIVE_THRIFT_SERVER_ASYNC)
263
```
264
265
**Benefits:**
266
- Client remains responsive during long queries
267
- Multiple concurrent query execution
268
- Better resource utilization
269
270
### Job Group Management
271
272
Operations use Spark job groups for resource management:
273
274
```scala
275
sqlContext.sparkContext.setJobGroup(statementId, statement, interruptOnCancel = true)
276
```
277
278
**Features:**
279
- **Query Identification**: Link Spark jobs to SQL statements
280
- **Cancellation Support**: Enable query interruption
281
- **Resource Tracking**: Monitor resource usage per query
282
- **Performance Monitoring**: Collect execution metrics