or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

cli-interface.mdenvironment-management.mdindex.mdmonitoring-ui.mdquery-execution.mdserver-management.mdsession-management.md

query-execution.mddocs/

0

# Query Execution

1

2

SQL statement execution with result management and schema introspection. Handles the processing of SQL queries submitted by clients through the thrift interface.

3

4

## Capabilities

5

6

### Statement Execution Operation

7

8

Core operation for executing SQL statements with result streaming and cancellation support.

9

10

```scala { .api }

11

private[hive] class SparkExecuteStatementOperation(

12

parentSession: HiveSession,

13

statement: String,

14

confOverlay: JMap[String, String],

15

runInBackground: Boolean

16

) extends ExecuteStatementOperation {

17

18

/**

19

* Release execution resources and clean up operation state

20

* Should be called when operation is no longer needed

21

*/

22

def close(): Unit

23

24

/**

25

* Fetch result rows from the executed query

26

* @param order Fetch direction (FETCH_NEXT, FETCH_PRIOR, etc.)

27

* @param maxRowsL Maximum number of rows to return

28

* @return RowSet containing the requested rows

29

*/

30

def getNextRowSet(order: FetchOrientation, maxRowsL: Long): RowSet

31

32

/**

33

* Get the schema of the query result set

34

* @return TableSchema describing column names and types

35

*/

36

def getResultSetSchema: TableSchema

37

38

/**

39

* Cancel the running operation

40

* Attempts to stop query execution and clean up resources

41

*/

42

def cancel(): Unit

43

}

44

```

45

46

**Usage Example:**

47

48

```scala

49

import org.apache.hive.service.cli._

50

51

// Create and execute a statement operation

52

val operation = new SparkExecuteStatementOperation(

53

parentSession = session,

54

statement = "SELECT name, age FROM users WHERE age > 25",

55

confOverlay = Map("spark.sql.adaptive.enabled" -> "true").asJava,

56

runInBackground = true

57

)

58

59

// Get result schema

60

val schema = operation.getResultSetSchema

61

println(s"Columns: ${schema.getColumns.asScala.map(_.getColumnName).mkString(", ")}")

62

63

// Fetch results in batches

64

val rowSet = operation.getNextRowSet(FetchOrientation.FETCH_NEXT, 1000)

65

val rows = rowSet.toTRowSet.getRows.asScala

66

67

// Process results

68

rows.foreach { row =>

69

val values = row.getColVals.asScala

70

println(s"Name: ${values(0)}, Age: ${values(1)}")

71

}

72

73

// Clean up

74

operation.close()

75

```

76

77

### SQL Driver

78

79

Low-level SQL command processor that executes queries against Spark SQL.

80

81

```scala { .api }

82

private[hive] class SparkSQLDriver(

83

context: HiveContext = SparkSQLEnv.hiveContext

84

) extends Driver {

85

86

/**

87

* Initialize the driver instance

88

* Prepares the driver for query execution

89

*/

90

def init(): Unit

91

92

/**

93

* Execute a SQL command and return response

94

* @param command SQL statement to execute

95

* @return CommandProcessorResponse with execution status and metadata

96

*/

97

def run(command: String): CommandProcessorResponse

98

99

/**

100

* Close the driver and return final status

101

* @return Final status code (0 for success)

102

*/

103

def close(): Int

104

105

/**

106

* Retrieve query results incrementally

107

* @param res List to populate with result rows (modified in-place)

108

* @return true if more results are available, false if complete

109

*/

110

def getResults(res: JList[_]): Boolean

111

112

/**

113

* Get the schema of the current result set

114

* @return Schema object describing result structure

115

*/

116

def getSchema: Schema

117

118

/**

119

* Destroy the driver instance and release all resources

120

* More aggressive cleanup than close()

121

*/

122

def destroy(): Unit

123

}

124

```

125

126

**Usage Example:**

127

128

```scala

129

val driver = new SparkSQLDriver()

130

driver.init()

131

132

// Execute query

133

val response = driver.run("SELECT COUNT(*) FROM large_table")

134

if (response.getResponseCode == 0) {

135

// Get schema information

136

val schema = driver.getSchema

137

val columns = schema.getFieldSchemas.asScala

138

println(s"Result columns: ${columns.map(_.getName).mkString(", ")}")

139

140

// Fetch results

141

val results = new java.util.ArrayList[String]()

142

while (driver.getResults(results)) {

143

results.asScala.foreach(println)

144

results.clear()

145

}

146

} else {

147

println(s"Query failed: ${response.getErrorMessage}")

148

}

149

150

// Cleanup

151

val exitCode = driver.close()

152

driver.destroy()

153

```

154

155

### Operation Management

156

157

Centralized management of all active query operations across sessions.

158

159

```scala { .api }

160

private[hive] class SparkSQLOperationManager extends OperationManager {

161

162

/**

163

* Map of operation handles to active operations

164

* Used for operation lifecycle tracking

165

*/

166

val handleToOperation: JMap[OperationHandle, Operation]

167

168

/**

169

* Map of sessions to their active scheduler pools

170

* Enables resource isolation between sessions

171

*/

172

val sessionToActivePool: Map[SessionHandle, String]

173

174

/**

175

* Map of sessions to their HiveContext instances

176

* Provides session-level context isolation

177

*/

178

val sessionToContexts: Map[SessionHandle, HiveContext]

179

180

/**

181

* Create a new statement execution operation

182

* @param parentSession Session that owns this operation

183

* @param statement SQL statement to execute

184

* @param confOverlay Session-specific configuration overrides

185

* @param runInBackground Whether to execute asynchronously

186

* @return New ExecuteStatementOperation instance

187

*/

188

override def newExecuteStatementOperation(

189

parentSession: HiveSession,

190

statement: String,

191

confOverlay: JMap[String, String],

192

runInBackground: Boolean

193

): ExecuteStatementOperation

194

}

195

```

196

197

### Asynchronous Execution

198

199

Operations can be executed synchronously or asynchronously:

200

201

**Synchronous Execution:**

202

```scala

203

val operation = operationManager.newExecuteStatementOperation(

204

parentSession = session,

205

statement = "SELECT * FROM small_table",

206

confOverlay = Map.empty.asJava,

207

runInBackground = false // Execute synchronously

208

)

209

// Operation completes before returning

210

val results = operation.getNextRowSet(FetchOrientation.FETCH_NEXT, 100)

211

```

212

213

**Asynchronous Execution:**

214

```scala

215

val operation = operationManager.newExecuteStatementOperation(

216

parentSession = session,

217

statement = "SELECT * FROM huge_table",

218

confOverlay = Map.empty.asJava,

219

runInBackground = true // Execute asynchronously

220

)

221

222

// Check operation status periodically

223

while (operation.getStatus.getState == OperationState.RUNNING) {

224

Thread.sleep(1000)

225

println(s"Query progress: ${operation.getStatus.getProgressText}")

226

}

227

228

// Fetch results when complete

229

if (operation.getStatus.getState == OperationState.FINISHED) {

230

val results = operation.getNextRowSet(FetchOrientation.FETCH_NEXT, 1000)

231

}

232

```

233

234

### Result Streaming

235

236

Large result sets are streamed incrementally to avoid memory issues:

237

238

```scala

239

// Stream results in configurable batch sizes

240

var hasMore = true

241

var totalRows = 0

242

243

while (hasMore) {

244

val rowSet = operation.getNextRowSet(FetchOrientation.FETCH_NEXT, 1000)

245

val batchSize = rowSet.toTRowSet.getRows.size()

246

247

if (batchSize > 0) {

248

totalRows += batchSize

249

// Process batch

250

processResultBatch(rowSet)

251

} else {

252

hasMore = false

253

}

254

}

255

256

println(s"Processed $totalRows total rows")

257

```

258

259

### Query Cancellation

260

261

Running queries can be cancelled gracefully:

262

263

```scala

264

// Start long-running query

265

val operation = operationManager.newExecuteStatementOperation(

266

parentSession = session,

267

statement = "SELECT * FROM massive_table ORDER BY complex_calculation(col1)",

268

confOverlay = Map.empty.asJava,

269

runInBackground = true

270

)

271

272

// Cancel after timeout or user request

273

Timer.schedule(30000) { // Cancel after 30 seconds

274

operation.cancel()

275

println("Query cancelled due to timeout")

276

}

277

```

278

279

### Error Handling

280

281

Comprehensive error handling for various failure scenarios:

282

283

**Query Compilation Errors:**

284

```scala

285

val response = driver.run("SELECT invalid_column FROM non_existent_table")

286

if (response.getResponseCode != 0) {

287

response.getException() match {

288

case e: AnalysisException =>

289

println(s"SQL Analysis Error: ${e.getMessage}")

290

case e: Exception =>

291

println(s"General Error: ${response.getErrorMessage}")

292

}

293

}

294

```

295

296

**Runtime Execution Errors:**

297

```scala

298

try {

299

val results = operation.getNextRowSet(FetchOrientation.FETCH_NEXT, 1000)

300

// Process results

301

} catch {

302

case e: HiveSQLException =>

303

println(s"SQL Execution Error: ${e.getMessage}")

304

operation.cancel() // Clean up failed operation

305

}

306

```

307

308

### Performance Optimization

309

310

Several optimizations are built into the query execution system:

311

312

**Incremental Collection:**

313

- Results are collected incrementally to reduce memory usage

314

- Configurable batch sizes for result streaming

315

- Lazy evaluation where possible

316

317

**Connection Pool Management:**

318

- Reuse of database connections across queries

319

- Connection pooling to reduce setup overhead

320

- Proper connection cleanup on operation completion

321

322

**Resource Isolation:**

323

- Per-session Spark job groups for resource tracking

324

- Configurable scheduler pools for workload isolation

325

- Memory management per operation

326

327

### Integration with Spark SQL

328

329

The execution system integrates deeply with Spark SQL:

330

331

**Catalyst Optimizer:**

332

- All queries go through Spark's Catalyst optimizer

333

- Cost-based optimization when statistics are available

334

- Adaptive query execution for dynamic optimization

335

336

**DataFrame API:**

337

```scala

338

// SQL queries are converted to DataFrame operations internally

339

val sql = "SELECT name, COUNT(*) FROM users GROUP BY name"

340

val dataframe = hiveContext.sql(sql)

341

val results = dataframe.collect() // Execute and collect results

342

```

343

344

**Data Source Integration:**

345

- Supports all Spark SQL data sources (Parquet, JSON, JDBC, etc.)

346

- Pushdown optimizations for compatible sources

347

- Schema inference and evolution

348

349

### Monitoring and Metrics

350

351

Query execution is monitored through multiple mechanisms:

352

353

**Spark Listeners:**

354

```scala

355

// Queries are tracked through Spark's event system

356

sparkContext.addSparkListener(new HiveThriftServer2Listener(server, conf))

357

```

358

359

**Operation Statistics:**

360

- Query compilation time

361

- Execution duration

362

- Rows processed and returned

363

- Resource usage (CPU, memory, I/O)

364

365

**Web UI Integration:**

366

- Real-time query status in Spark UI

367

- Query plans and execution details

368

- Performance metrics and bottleneck identification