0
# Operation Management
1
2
SQL operations and metadata operations management with session context mapping for the Spark Hive Thrift Server.
3
4
## Capabilities
5
6
### SparkSQLOperationManager
7
8
Manages the lifecycle of SQL operations and metadata operations, maintaining session-to-context mappings.
9
10
```scala { .api }
11
/**
12
* Executes queries using Spark SQL and maintains a list of handles to active queries
13
*/
14
class SparkSQLOperationManager extends OperationManager {
15
/**
16
* Map from session handles to their associated SQL contexts
17
*/
18
val sessionToContexts: ConcurrentHashMap[SessionHandle, SQLContext]
19
20
/**
21
* Create a new SQL execution operation
22
* @param parentSession The session that will own this operation
23
* @param statement SQL statement to execute
24
* @param confOverlay Configuration overrides for this operation
25
* @param async Whether to run asynchronously
26
* @param queryTimeout Timeout for query execution in milliseconds
27
* @return ExecuteStatementOperation handle
28
*/
29
override def newExecuteStatementOperation(
30
parentSession: HiveSession,
31
statement: String,
32
confOverlay: java.util.Map[String, String],
33
async: Boolean,
34
queryTimeout: Long
35
): ExecuteStatementOperation
36
37
/**
38
* Create a new get tables metadata operation
39
* @param parentSession The session that will own this operation
40
* @param catalogName Catalog name pattern (null for all catalogs)
41
* @param schemaName Schema name pattern (null for all schemas)
42
* @param tableName Table name pattern (null for all tables)
43
* @param tableTypes List of table types to include
44
* @return MetadataOperation handle
45
*/
46
override def newGetTablesOperation(
47
parentSession: HiveSession,
48
catalogName: String,
49
schemaName: String,
50
tableName: String,
51
tableTypes: java.util.List[String]
52
): MetadataOperation
53
54
/**
55
* Create a new get columns metadata operation
56
* @param parentSession The session that will own this operation
57
* @param catalogName Catalog name pattern
58
* @param schemaName Schema name pattern
59
* @param tableName Table name pattern
60
* @param columnName Column name pattern
61
* @return GetColumnsOperation handle
62
*/
63
override def newGetColumnsOperation(
64
parentSession: HiveSession,
65
catalogName: String,
66
schemaName: String,
67
tableName: String,
68
columnName: String
69
): GetColumnsOperation
70
71
/**
72
* Create a new get schemas metadata operation
73
* @param parentSession The session that will own this operation
74
* @param catalogName Catalog name pattern
75
* @param schemaName Schema name pattern
76
* @return GetSchemasOperation handle
77
*/
78
override def newGetSchemasOperation(
79
parentSession: HiveSession,
80
catalogName: String,
81
schemaName: String
82
): GetSchemasOperation
83
84
/**
85
* Create a new get functions metadata operation
86
* @param parentSession The session that will own this operation
87
* @param catalogName Catalog name pattern
88
* @param schemaName Schema name pattern
89
* @param functionName Function name pattern
90
* @return GetFunctionsOperation handle
91
*/
92
override def newGetFunctionsOperation(
93
parentSession: HiveSession,
94
catalogName: String,
95
schemaName: String,
96
functionName: String
97
): GetFunctionsOperation
98
99
/**
100
* Create a new get type info metadata operation
101
* @param parentSession The session that will own this operation
102
* @return GetTypeInfoOperation handle
103
*/
104
override def newGetTypeInfoOperation(parentSession: HiveSession): GetTypeInfoOperation
105
106
/**
107
* Create a new get catalogs metadata operation
108
* @param parentSession The session that will own this operation
109
* @return GetCatalogsOperation handle
110
*/
111
override def newGetCatalogsOperation(parentSession: HiveSession): GetCatalogsOperation
112
113
/**
114
* Create a new get table types metadata operation
115
* @param parentSession The session that will own this operation
116
* @return GetTableTypesOperation handle
117
*/
118
override def newGetTableTypesOperation(parentSession: HiveSession): GetTableTypesOperation
119
}
120
```
121
122
**Usage Examples:**
123
124
```scala
125
import org.apache.spark.sql.hive.thriftserver.server.SparkSQLOperationManager
126
import org.apache.hive.service.cli.session.HiveSession
127
import scala.collection.JavaConverters._
128
129
// Create operation manager
130
val operationManager = new SparkSQLOperationManager()
131
132
// Assuming we have a session and session handle
133
val session: HiveSession = // ... obtained from session manager
134
val sessionHandle = session.getSessionHandle()
135
136
// Associate a SQL context with the session
137
operationManager.sessionToContexts.put(sessionHandle, SparkSQLEnv.sqlContext)
138
139
// Create a SQL execution operation
140
val statement = "SELECT * FROM my_table LIMIT 10"
141
val confOverlay = Map("spark.sql.adaptive.enabled" -> "true").asJava
142
val executeOp = operationManager.newExecuteStatementOperation(
143
session, statement, confOverlay, async = true, queryTimeout = 30000L
144
)
145
146
// Create metadata operations
147
val tablesOp = operationManager.newGetTablesOperation(
148
session, null, "default", "%", List("TABLE", "VIEW").asJava
149
)
150
151
val columnsOp = operationManager.newGetColumnsOperation(
152
session, null, "default", "my_table", null
153
)
154
```
155
156
### Operation Lifecycle
157
158
Operations follow a standard lifecycle managed by the operation manager.
159
160
```scala { .api }
161
// Operation states and lifecycle
162
abstract class Operation {
163
/**
164
* Get the handle identifying this operation
165
* @return OperationHandle for this operation
166
*/
167
def getHandle(): OperationHandle
168
169
/**
170
* Get the current state of this operation
171
* @return OperationState indicating current status
172
*/
173
def getStatus(): OperationStatus
174
175
/**
176
* Cancel this operation if it's running
177
*/
178
def cancel(): Unit
179
180
/**
181
* Close this operation and clean up resources
182
*/
183
def close(): Unit
184
}
185
186
/**
187
* Base class for metadata operations
188
*/
189
abstract class MetadataOperation extends Operation {
190
/**
191
* Get the result set for this metadata operation
192
* @return RowSet containing the metadata results
193
*/
194
def getResultSet(): RowSet
195
196
/**
197
* Get the schema for the result set
198
* @return TableSchema describing the result columns
199
*/
200
def getResultSetSchema(): TableSchema
201
}
202
```
203
204
### Session Context Management
205
206
The operation manager maintains the critical mapping between sessions and their SQL contexts.
207
208
```scala { .api }
209
// Session to context mapping
210
val sessionToContexts: ConcurrentHashMap[SessionHandle, SQLContext]
211
212
// Context lifecycle management
213
def associateContext(sessionHandle: SessionHandle, sqlContext: SQLContext): Unit = {
214
sessionToContexts.put(sessionHandle, sqlContext)
215
}
216
217
def getContextForSession(sessionHandle: SessionHandle): SQLContext = {
218
val context = sessionToContexts.get(sessionHandle)
219
require(context != null, s"Session handle: $sessionHandle has not been initialized or had already closed.")
220
context
221
}
222
223
def removeContextForSession(sessionHandle: SessionHandle): SQLContext = {
224
sessionToContexts.remove(sessionHandle)
225
}
226
```
227
228
**Usage Examples:**
229
230
```scala
231
// Session context lifecycle
232
val sessionHandle = // ... from session manager
233
val sqlContext = SparkSQLEnv.sqlContext.newSession()
234
235
// Associate context when session opens
236
operationManager.sessionToContexts.put(sessionHandle, sqlContext)
237
238
// Use context for operations
239
val context = operationManager.sessionToContexts.get(sessionHandle)
240
require(context != null, "Session not found")
241
242
// Clean up when session closes
243
operationManager.sessionToContexts.remove(sessionHandle)
244
```
245
246
### Background Execution
247
248
Operations can be executed asynchronously when configured appropriately.
249
250
```scala { .api }
251
// Asynchronous execution configuration
252
val runInBackground = async && conf.getConf(HiveUtils.HIVE_THRIFT_SERVER_ASYNC)
253
254
// Background execution is determined by:
255
// 1. Client request (async parameter)
256
// 2. Server configuration (spark.sql.hive.thriftServer.async)
257
// 3. Operation type (some operations are always synchronous)
258
```
259
260
**Configuration Examples:**
261
262
```scala
263
// Enable async execution globally
264
spark.conf.set("spark.sql.hive.thriftServer.async", "true")
265
266
// Client requests async execution
267
val executeOp = operationManager.newExecuteStatementOperation(
268
session, statement, confOverlay,
269
async = true, // Request async execution
270
queryTimeout = 30000L
271
)
272
273
// Check if operation is running in background
274
val status = executeOp.getStatus()
275
if (status.getState() == OperationState.RUNNING) {
276
println("Operation is running asynchronously")
277
}
278
```
279
280
### Error Handling
281
282
Comprehensive error handling for operation lifecycle management.
283
284
```java { .api }
285
/**
286
* Exception thrown during operation management
287
*/
288
class HiveSQLException extends SQLException {
289
public HiveSQLException(String reason, String sqlState, int vendorCode)
290
public HiveSQLException(String reason, String sqlState, Throwable cause)
291
}
292
293
// Common error scenarios
294
public static final String INVALID_SESSION_HANDLE = "HY000";
295
public static final String OPERATION_NOT_FOUND = "HY000";
296
public static final String OPERATION_ALREADY_CLOSED = "HY010";
297
public static final String QUERY_TIMEOUT = "HYT00";
298
```
299
300
**Error Handling Examples:**
301
302
```scala
303
import org.apache.hive.service.cli.HiveSQLException
304
305
try {
306
val executeOp = operationManager.newExecuteStatementOperation(
307
session, statement, confOverlay, async = false, queryTimeout = 30000L
308
)
309
310
// Operation execution...
311
312
} catch {
313
case e: HiveSQLException if e.getSqlState == "HY000" =>
314
println("Invalid session or operation handle")
315
case e: HiveSQLException if e.getSqlState == "HYT00" =>
316
println("Query timeout exceeded")
317
case e: HiveSQLException =>
318
println(s"Operation error: ${e.getMessage} (${e.getSqlState})")
319
case e: IllegalArgumentException =>
320
println(s"Session not initialized: ${e.getMessage}")
321
}
322
```
323
324
### Operation Handle Management
325
326
The operation manager maintains handles to all active operations.
327
328
```scala { .api }
329
// Handle to operation mapping (inherited from OperationManager)
330
val handleToOperation: java.util.Map[OperationHandle, Operation]
331
332
// Operation handle lifecycle
333
def addOperation(operation: Operation): Unit = {
334
handleToOperation.put(operation.getHandle(), operation)
335
}
336
337
def getOperation(operationHandle: OperationHandle): Operation = {
338
handleToOperation.get(operationHandle)
339
}
340
341
def removeOperation(operationHandle: OperationHandle): Operation = {
342
handleToOperation.remove(operationHandle)
343
}
344
```
345
346
**Operation Tracking Examples:**
347
348
```scala
349
// Track all operations for a session
350
def getOperationsForSession(sessionHandle: SessionHandle): List[Operation] = {
351
handleToOperation.values().asScala.filter { op =>
352
op.getParentSession().getSessionHandle() == sessionHandle
353
}.toList
354
}
355
356
// Clean up operations when session closes
357
def closeAllOperationsForSession(sessionHandle: SessionHandle): Unit = {
358
getOperationsForSession(sessionHandle).foreach { operation =>
359
try {
360
operation.cancel()
361
operation.close()
362
} catch {
363
case e: Exception =>
364
logWarning(s"Error closing operation ${operation.getHandle()}", e)
365
}
366
}
367
}
368
```