or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

cli-driver.mdcli-services.mdindex.mdmetadata-operations.mdoperation-management.mdserver-management.mdsession-management.mdsql-execution.mdweb-ui.md

operation-management.mddocs/

0

# Operation Management

1

2

SQL operations and metadata operations management with session context mapping for the Spark Hive Thrift Server.

3

4

## Capabilities

5

6

### SparkSQLOperationManager

7

8

Manages the lifecycle of SQL operations and metadata operations, maintaining session-to-context mappings.

9

10

```scala { .api }

11

/**

12

* Executes queries using Spark SQL and maintains a list of handles to active queries

13

*/

14

class SparkSQLOperationManager extends OperationManager {

15

/**

16

* Map from session handles to their associated SQL contexts

17

*/

18

val sessionToContexts: ConcurrentHashMap[SessionHandle, SQLContext]

19

20

/**

21

* Create a new SQL execution operation

22

* @param parentSession The session that will own this operation

23

* @param statement SQL statement to execute

24

* @param confOverlay Configuration overrides for this operation

25

* @param async Whether to run asynchronously

26

* @param queryTimeout Timeout for query execution in milliseconds

27

* @return ExecuteStatementOperation handle

28

*/

29

override def newExecuteStatementOperation(

30

parentSession: HiveSession,

31

statement: String,

32

confOverlay: java.util.Map[String, String],

33

async: Boolean,

34

queryTimeout: Long

35

): ExecuteStatementOperation

36

37

/**

38

* Create a new get tables metadata operation

39

* @param parentSession The session that will own this operation

40

* @param catalogName Catalog name pattern (null for all catalogs)

41

* @param schemaName Schema name pattern (null for all schemas)

42

* @param tableName Table name pattern (null for all tables)

43

* @param tableTypes List of table types to include

44

* @return MetadataOperation handle

45

*/

46

override def newGetTablesOperation(

47

parentSession: HiveSession,

48

catalogName: String,

49

schemaName: String,

50

tableName: String,

51

tableTypes: java.util.List[String]

52

): MetadataOperation

53

54

/**

55

* Create a new get columns metadata operation

56

* @param parentSession The session that will own this operation

57

* @param catalogName Catalog name pattern

58

* @param schemaName Schema name pattern

59

* @param tableName Table name pattern

60

* @param columnName Column name pattern

61

* @return GetColumnsOperation handle

62

*/

63

override def newGetColumnsOperation(

64

parentSession: HiveSession,

65

catalogName: String,

66

schemaName: String,

67

tableName: String,

68

columnName: String

69

): GetColumnsOperation

70

71

/**

72

* Create a new get schemas metadata operation

73

* @param parentSession The session that will own this operation

74

* @param catalogName Catalog name pattern

75

* @param schemaName Schema name pattern

76

* @return GetSchemasOperation handle

77

*/

78

override def newGetSchemasOperation(

79

parentSession: HiveSession,

80

catalogName: String,

81

schemaName: String

82

): GetSchemasOperation

83

84

/**

85

* Create a new get functions metadata operation

86

* @param parentSession The session that will own this operation

87

* @param catalogName Catalog name pattern

88

* @param schemaName Schema name pattern

89

* @param functionName Function name pattern

90

* @return GetFunctionsOperation handle

91

*/

92

override def newGetFunctionsOperation(

93

parentSession: HiveSession,

94

catalogName: String,

95

schemaName: String,

96

functionName: String

97

): GetFunctionsOperation

98

99

/**

100

* Create a new get type info metadata operation

101

* @param parentSession The session that will own this operation

102

* @return GetTypeInfoOperation handle

103

*/

104

override def newGetTypeInfoOperation(parentSession: HiveSession): GetTypeInfoOperation

105

106

/**

107

* Create a new get catalogs metadata operation

108

* @param parentSession The session that will own this operation

109

* @return GetCatalogsOperation handle

110

*/

111

override def newGetCatalogsOperation(parentSession: HiveSession): GetCatalogsOperation

112

113

/**

114

* Create a new get table types metadata operation

115

* @param parentSession The session that will own this operation

116

* @return GetTableTypesOperation handle

117

*/

118

override def newGetTableTypesOperation(parentSession: HiveSession): GetTableTypesOperation

119

}

120

```

121

122

**Usage Examples:**

123

124

```scala

125

import org.apache.spark.sql.hive.thriftserver.server.SparkSQLOperationManager

126

import org.apache.hive.service.cli.session.HiveSession

127

import scala.collection.JavaConverters._

128

129

// Create operation manager

130

val operationManager = new SparkSQLOperationManager()

131

132

// Assuming we have a session and session handle

133

val session: HiveSession = // ... obtained from session manager

134

val sessionHandle = session.getSessionHandle()

135

136

// Associate a SQL context with the session

137

operationManager.sessionToContexts.put(sessionHandle, SparkSQLEnv.sqlContext)

138

139

// Create a SQL execution operation

140

val statement = "SELECT * FROM my_table LIMIT 10"

141

val confOverlay = Map("spark.sql.adaptive.enabled" -> "true").asJava

142

val executeOp = operationManager.newExecuteStatementOperation(

143

session, statement, confOverlay, async = true, queryTimeout = 30000L

144

)

145

146

// Create metadata operations

147

val tablesOp = operationManager.newGetTablesOperation(

148

session, null, "default", "%", List("TABLE", "VIEW").asJava

149

)

150

151

val columnsOp = operationManager.newGetColumnsOperation(

152

session, null, "default", "my_table", null

153

)

154

```

155

156

### Operation Lifecycle

157

158

Operations follow a standard lifecycle managed by the operation manager.

159

160

```scala { .api }

161

// Operation states and lifecycle

162

abstract class Operation {

163

/**

164

* Get the handle identifying this operation

165

* @return OperationHandle for this operation

166

*/

167

def getHandle(): OperationHandle

168

169

/**

170

* Get the current state of this operation

171

* @return OperationState indicating current status

172

*/

173

def getStatus(): OperationStatus

174

175

/**

176

* Cancel this operation if it's running

177

*/

178

def cancel(): Unit

179

180

/**

181

* Close this operation and clean up resources

182

*/

183

def close(): Unit

184

}

185

186

/**

187

* Base class for metadata operations

188

*/

189

abstract class MetadataOperation extends Operation {

190

/**

191

* Get the result set for this metadata operation

192

* @return RowSet containing the metadata results

193

*/

194

def getResultSet(): RowSet

195

196

/**

197

* Get the schema for the result set

198

* @return TableSchema describing the result columns

199

*/

200

def getResultSetSchema(): TableSchema

201

}

202

```

203

204

### Session Context Management

205

206

The operation manager maintains the critical mapping between sessions and their SQL contexts.

207

208

```scala { .api }

209

// Session to context mapping

210

val sessionToContexts: ConcurrentHashMap[SessionHandle, SQLContext]

211

212

// Context lifecycle management

213

def associateContext(sessionHandle: SessionHandle, sqlContext: SQLContext): Unit = {

214

sessionToContexts.put(sessionHandle, sqlContext)

215

}

216

217

def getContextForSession(sessionHandle: SessionHandle): SQLContext = {

218

val context = sessionToContexts.get(sessionHandle)

219

require(context != null, s"Session handle: $sessionHandle has not been initialized or had already closed.")

220

context

221

}

222

223

def removeContextForSession(sessionHandle: SessionHandle): SQLContext = {

224

sessionToContexts.remove(sessionHandle)

225

}

226

```

227

228

**Usage Examples:**

229

230

```scala

231

// Session context lifecycle

232

val sessionHandle = // ... from session manager

233

val sqlContext = SparkSQLEnv.sqlContext.newSession()

234

235

// Associate context when session opens

236

operationManager.sessionToContexts.put(sessionHandle, sqlContext)

237

238

// Use context for operations

239

val context = operationManager.sessionToContexts.get(sessionHandle)

240

require(context != null, "Session not found")

241

242

// Clean up when session closes

243

operationManager.sessionToContexts.remove(sessionHandle)

244

```

245

246

### Background Execution

247

248

Operations can be executed asynchronously when configured appropriately.

249

250

```scala { .api }

251

// Asynchronous execution configuration

252

val runInBackground = async && conf.getConf(HiveUtils.HIVE_THRIFT_SERVER_ASYNC)

253

254

// Background execution is determined by:

255

// 1. Client request (async parameter)

256

// 2. Server configuration (spark.sql.hive.thriftServer.async)

257

// 3. Operation type (some operations are always synchronous)

258

```

259

260

**Configuration Examples:**

261

262

```scala

263

// Enable async execution globally

264

spark.conf.set("spark.sql.hive.thriftServer.async", "true")

265

266

// Client requests async execution

267

val executeOp = operationManager.newExecuteStatementOperation(

268

session, statement, confOverlay,

269

async = true, // Request async execution

270

queryTimeout = 30000L

271

)

272

273

// Check if operation is running in background

274

val status = executeOp.getStatus()

275

if (status.getState() == OperationState.RUNNING) {

276

println("Operation is running asynchronously")

277

}

278

```

279

280

### Error Handling

281

282

Comprehensive error handling for operation lifecycle management.

283

284

```java { .api }

285

/**

286

* Exception thrown during operation management

287

*/

288

class HiveSQLException extends SQLException {

289

public HiveSQLException(String reason, String sqlState, int vendorCode)

290

public HiveSQLException(String reason, String sqlState, Throwable cause)

291

}

292

293

// Common error scenarios

294

public static final String INVALID_SESSION_HANDLE = "HY000";

295

public static final String OPERATION_NOT_FOUND = "HY000";

296

public static final String OPERATION_ALREADY_CLOSED = "HY010";

297

public static final String QUERY_TIMEOUT = "HYT00";

298

```

299

300

**Error Handling Examples:**

301

302

```scala

303

import org.apache.hive.service.cli.HiveSQLException

304

305

try {

306

val executeOp = operationManager.newExecuteStatementOperation(

307

session, statement, confOverlay, async = false, queryTimeout = 30000L

308

)

309

310

// Operation execution...

311

312

} catch {

313

case e: HiveSQLException if e.getSqlState == "HY000" =>

314

println("Invalid session or operation handle")

315

case e: HiveSQLException if e.getSqlState == "HYT00" =>

316

println("Query timeout exceeded")

317

case e: HiveSQLException =>

318

println(s"Operation error: ${e.getMessage} (${e.getSqlState})")

319

case e: IllegalArgumentException =>

320

println(s"Session not initialized: ${e.getMessage}")

321

}

322

```

323

324

### Operation Handle Management

325

326

The operation manager maintains handles to all active operations.

327

328

```scala { .api }

329

// Handle to operation mapping (inherited from OperationManager)

330

val handleToOperation: java.util.Map[OperationHandle, Operation]

331

332

// Operation handle lifecycle

333

def addOperation(operation: Operation): Unit = {

334

handleToOperation.put(operation.getHandle(), operation)

335

}

336

337

def getOperation(operationHandle: OperationHandle): Operation = {

338

handleToOperation.get(operationHandle)

339

}

340

341

def removeOperation(operationHandle: OperationHandle): Operation = {

342

handleToOperation.remove(operationHandle)

343

}

344

```

345

346

**Operation Tracking Examples:**

347

348

```scala

349

// Track all operations for a session

350

def getOperationsForSession(sessionHandle: SessionHandle): List[Operation] = {

351

handleToOperation.values().asScala.filter { op =>

352

op.getParentSession().getSessionHandle() == sessionHandle

353

}.toList

354

}

355

356

// Clean up operations when session closes

357

def closeAllOperationsForSession(sessionHandle: SessionHandle): Unit = {

358

getOperationsForSession(sessionHandle).foreach { operation =>

359

try {

360

operation.cancel()

361

operation.close()

362

} catch {

363

case e: Exception =>

364

logWarning(s"Error closing operation ${operation.getHandle()}", e)

365

}

366

}

367

}

368

```