or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

cli-driver.mdcli-services.mdindex.mdmetadata-operations.mdoperation-management.mdserver-management.mdsession-management.mdsql-execution.mdweb-ui.md

sql-execution.mddocs/

0

# SQL Execution

1

2

SQL statement execution with Spark SQL engine integration, result handling, and comprehensive operation management for query processing.

3

4

## Capabilities

5

6

### SparkExecuteStatementOperation

7

8

Executes SQL statements using the Spark SQL engine with full result set management and cancellation support.

9

10

```scala { .api }

11

/**

12

* SQL statement execution operation using Spark SQL engine

13

* Handles query execution, result fetching, and cancellation

14

*/

15

class SparkExecuteStatementOperation extends ExecuteStatementOperation {

16

/**

17

* Fetch the next set of result rows

18

* @param order Fetch orientation (FETCH_NEXT, FETCH_PRIOR, etc.)

19

* @param maxRowsL Maximum number of rows to fetch

20

* @return TRowSet containing the fetched rows

21

*/

22

def getNextRowSet(order: FetchOrientation, maxRowsL: Long): TRowSet

23

24

/**

25

* Get the schema of the result set

26

* @return TTableSchema describing the result columns and types

27

*/

28

def getResultSetSchema: TTableSchema

29

30

/**

31

* Internal method that executes the SQL statement

32

* Called by the operation framework

33

*/

34

def runInternal(): Unit

35

36

/**

37

* Cancel the currently executing statement

38

*/

39

def cancel(): Unit

40

41

/**

42

* Cancel the statement due to timeout

43

*/

44

def timeoutCancel(): Unit

45

}

46

```

47

48

**Usage Examples:**

49

50

```scala

51

import org.apache.spark.sql.hive.thriftserver.SparkExecuteStatementOperation

52

import org.apache.hive.service.cli.{FetchOrientation, OperationHandle}

53

54

// Execute a SQL statement (typically done through CLI service)

55

val statement = "SELECT name, age, department FROM employees WHERE age > 25"

56

val confOverlay = Map("spark.sql.adaptive.enabled" -> "true").asJava

57

58

// Get operation handle from session manager

59

val operationHandle = sessionManager.executeStatement(sessionHandle, statement, confOverlay)

60

61

// Fetch results

62

val resultSet = operation.getNextRowSet(FetchOrientation.FETCH_NEXT, 1000)

63

val schema = operation.getResultSetSchema()

64

65

// Process results

66

val columns = schema.getColumns().asScala

67

println(s"Result has ${columns.size} columns:")

68

columns.foreach { col =>

69

println(s" ${col.getColumnName()}: ${col.getTypeDesc()}")

70

}

71

```

72

73

### SparkSQLDriver

74

75

SQL driver that provides Hive Driver interface compatibility while using Spark SQL for execution.

76

77

```scala { .api }

78

/**

79

* SQL driver providing Hive Driver compatibility with Spark SQL execution

80

* @param context SQL context to use for query execution (defaults to SparkSQLEnv.sqlContext)

81

*/

82

class SparkSQLDriver(val context: SQLContext = SparkSQLEnv.sqlContext) extends Driver {

83

/**

84

* Execute a SQL command

85

* @param command SQL statement to execute

86

* @return CommandProcessorResponse indicating success/failure and details

87

*/

88

def run(command: String): CommandProcessorResponse

89

90

/**

91

* Get results from the last executed command

92

* @param res List to populate with result strings

93

* @return true if more results are available, false otherwise

94

*/

95

def getResults(res: JList[_]): Boolean

96

97

/**

98

* Get the schema of the last query result

99

* @return Schema object describing result structure

100

*/

101

def getSchema: Schema

102

103

/**

104

* Initialize the driver

105

*/

106

def init(): Unit

107

108

/**

109

* Close the driver and release resources

110

* @return 0 on success

111

*/

112

def close(): Int

113

114

/**

115

* Destroy the driver and clean up all resources

116

*/

117

def destroy(): Unit

118

}

119

```

120

121

**Usage Examples:**

122

123

```scala

124

import org.apache.spark.sql.hive.thriftserver.SparkSQLDriver

125

import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse

126

127

// Create and initialize driver

128

val driver = new SparkSQLDriver()

129

driver.init()

130

131

try {

132

// Execute SQL command

133

val response = driver.run("SELECT COUNT(*) FROM sales WHERE year = 2023")

134

135

if (response.getResponseCode == 0) {

136

// Success - get results

137

val results = new java.util.ArrayList[String]()

138

while (driver.getResults(results)) {

139

results.asScala.foreach(println)

140

results.clear()

141

}

142

143

// Get schema information

144

val schema = driver.getSchema

145

if (schema != null && schema.getFieldSchemas != null) {

146

schema.getFieldSchemas.asScala.foreach { field =>

147

println(s"Column: ${field.getName}, Type: ${field.getType}")

148

}

149

}

150

} else {

151

// Error occurred

152

println(s"Query failed: ${response.getErrorMessage}")

153

}

154

155

} finally {

156

driver.close()

157

driver.destroy()

158

}

159

```

160

161

### Query Execution Process

162

163

The query execution process involves multiple stages from parsing to result delivery.

164

165

```scala { .api }

166

// Query execution stages

167

class QueryExecution {

168

def analyzed: LogicalPlan // Analyzed logical plan

169

def optimizedPlan: LogicalPlan // Optimized logical plan

170

def sparkPlan: SparkPlan // Physical execution plan

171

def executedPlan: SparkPlan // Executed physical plan

172

}

173

174

// Execution context and job management

175

object SQLExecution {

176

/**

177

* Execute code with a new execution ID for tracking

178

* @param queryExecution Query execution context

179

* @param name Optional name for the execution

180

* @param body Code block to execute

181

* @return Result of the code block

182

*/

183

def withNewExecutionId[T](queryExecution: QueryExecution, name: Option[String])(body: => T): T

184

}

185

```

186

187

### Result Set Management

188

189

Comprehensive result set handling with multiple format support and efficient data transfer.

190

191

```scala { .api }

192

/**

193

* Base class for result sets

194

*/

195

abstract class RowSet {

196

/**

197

* Add a row to the result set

198

* @param row Array of objects representing the row data

199

*/

200

def addRow(row: Array[Object]): Unit

201

202

/**

203

* Convert to Thrift row set format

204

* @return TRowSet for network transfer

205

*/

206

def toTRowSet(): TRowSet

207

208

/**

209

* Get the number of rows in the result set

210

* @return Row count

211

*/

212

def numRows(): Int

213

}

214

215

/**

216

* Row-based result set implementation

217

*/

218

class RowBasedSet extends RowSet {

219

/**

220

* Create row-based result set with schema

221

* @param schema Table schema defining columns and types

222

*/

223

def this(schema: TableSchema)

224

}

225

226

/**

227

* Column-based result set implementation (more efficient for large results)

228

*/

229

class ColumnBasedSet extends RowSet {

230

/**

231

* Create column-based result set with schema

232

* @param schema Table schema defining columns and types

233

*/

234

def this(schema: TableSchema)

235

}

236

```

237

238

**Usage Examples:**

239

240

```scala

241

import org.apache.spark.sql.hive.thriftserver.RowSetUtils

242

import org.apache.hive.service.cli.{RowSet, TableSchema, ColumnDescriptor}

243

244

// Create schema for results

245

val schema = new TableSchema()

246

schema.addColumn(new ColumnDescriptor("name", "string", "Employee name"))

247

schema.addColumn(new ColumnDescriptor("age", "int", "Employee age"))

248

schema.addColumn(new ColumnDescriptor("salary", "decimal(10,2)", "Employee salary"))

249

250

// Create result set (row-based for small results)

251

val rowSet = RowSetFactory.create(schema, ProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V10, false)

252

253

// Add data rows

254

rowSet.addRow(Array("Alice", Integer.valueOf(30), new java.math.BigDecimal("75000.00")))

255

rowSet.addRow(Array("Bob", Integer.valueOf(25), new java.math.BigDecimal("65000.00")))

256

257

// Convert to Thrift format for network transfer

258

val tRowSet = rowSet.toTRowSet()

259

```

260

261

### FetchIterator

262

263

Iterator abstraction for efficient result fetching with support for different fetch orientations.

264

265

```scala { .api }

266

/**

267

* Base trait for result fetching iterators

268

*/

269

sealed trait FetchIterator[A] extends Iterator[A] {

270

/**

271

* Get the fetch type for this iterator

272

* @return FetchType indicating the data source

273

*/

274

def getFetchType(): FetchType

275

}

276

277

/**

278

* Iterator for query output results

279

*/

280

class ArrayFetchIterator[A](iter: Iterator[A]) extends FetchIterator[A] {

281

def hasNext: Boolean = iter.hasNext

282

def next(): A = iter.next()

283

def getFetchType(): FetchType = FetchType.QUERY_OUTPUT

284

}

285

286

/**

287

* Iterator for operation logs

288

*/

289

class IterableFetchIterator[A](iterable: Iterable[A]) extends FetchIterator[A] {

290

def getFetchType(): FetchType = FetchType.LOG

291

}

292

```

293

294

### Operation States and Lifecycle

295

296

Operations progress through well-defined states during their lifecycle.

297

298

```java { .api }

299

/**

300

* Possible states for operations

301

*/

302

enum OperationState {

303

INITIALIZED, // Operation created but not started

304

RUNNING, // Operation currently executing

305

FINISHED, // Operation completed successfully

306

CANCELED, // Operation was canceled

307

CLOSED, // Operation closed and resources cleaned up

308

ERROR, // Operation failed with error

309

UNKNOWN // State cannot be determined

310

}

311

312

/**

313

* Operation status information

314

*/

315

class OperationStatus {

316

/**

317

* Get the current state of the operation

318

* @return OperationState indicating current status

319

*/

320

public OperationState getState()

321

322

/**

323

* Get error information if operation failed

324

* @return HiveSQLException with error details, or null if no error

325

*/

326

public HiveSQLException getOperationException()

327

328

/**

329

* Get the operation start time

330

* @return Start time in milliseconds since epoch

331

*/

332

public long getOperationStarted()

333

334

/**

335

* Get the operation completion time

336

* @return Completion time in milliseconds since epoch, or 0 if not completed

337

*/

338

public long getOperationCompleted()

339

}

340

```

341

342

**Usage Examples:**

343

344

```scala

345

import org.apache.hive.service.cli.{OperationState, OperationStatus}

346

347

// Check operation status

348

val operationStatus = cliService.getOperationStatus(operationHandle)

349

350

operationStatus.getState match {

351

case OperationState.RUNNING =>

352

println("Query is still executing...")

353

case OperationState.FINISHED =>

354

println("Query completed successfully")

355

val duration = operationStatus.getOperationCompleted - operationStatus.getOperationStarted

356

println(s"Execution time: ${duration}ms")

357

case OperationState.ERROR =>

358

val exception = operationStatus.getOperationException

359

println(s"Query failed: ${exception.getMessage}")

360

case OperationState.CANCELED =>

361

println("Query was canceled")

362

case _ =>

363

println(s"Query state: ${operationStatus.getState}")

364

}

365

```

366

367

### Error Handling and Exceptions

368

369

Comprehensive error handling for SQL execution with detailed error information.

370

371

```scala { .api }

372

/**

373

* Spark-specific throwable with SQL state information

374

*/

375

trait SparkThrowable extends Throwable {

376

def getSqlState: String

377

def getErrorClass: String

378

}

379

380

/**

381

* Command processor response containing execution results and errors

382

*/

383

class CommandProcessorResponse {

384

/**

385

* Create response indicating success

386

* @param responseCode 0 for success, non-zero for failure

387

*/

388

def this(responseCode: Int)

389

390

/**

391

* Create response with error information

392

* @param responseCode Non-zero error code

393

* @param errorMessage Error message

394

* @param sqlState SQL state code

395

* @param exception Underlying exception

396

*/

397

def this(responseCode: Int, errorMessage: String, sqlState: String, exception: Throwable)

398

399

def getResponseCode(): Int

400

def getErrorMessage(): String

401

def getSQLState(): String

402

def getException(): Throwable

403

}

404

```

405

406

**Error Handling Examples:**

407

408

```scala

409

import org.apache.spark.SparkThrowable

410

import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse

411

412

try {

413

val response = driver.run("SELECT * FROM non_existent_table")

414

415

if (response.getResponseCode != 0) {

416

val exception = response.getException

417

exception match {

418

case st: SparkThrowable =>

419

println(s"Spark SQL Error: ${st.getMessage}")

420

println(s"Error Class: ${st.getErrorClass}")

421

println(s"SQL State: ${st.getSqlState}")

422

case _ =>

423

println(s"General Error: ${exception.getMessage}")

424

println(s"SQL State: ${response.getSQLState}")

425

}

426

}

427

} catch {

428

case e: Exception =>

429

println(s"Unexpected error during query execution: ${e.getMessage}")

430

}

431

```

432

433

### Query Cancellation

434

435

Support for canceling long-running queries with proper resource cleanup.

436

437

```scala { .api }

438

// Cancel operation through CLI service

439

cliService.cancelOperation(operationHandle)

440

441

// Cancel through operation object

442

operation.cancel()

443

444

// Timeout-based cancellation

445

operation.timeoutCancel()

446

447

// Check if operation supports cancellation

448

val operation = operationManager.getOperation(operationHandle)

449

if (operation.shouldRunAsync()) {

450

// Async operations support cancellation

451

operation.cancel()

452

}

453

```

454

455

**Cancellation Examples:**

456

457

```scala

458

import java.util.concurrent.{Executors, TimeUnit}

459

460

// Start a long-running query

461

val operationHandle = cliService.executeStatementAsync(

462

sessionHandle,

463

"SELECT * FROM large_table ORDER BY column1",

464

Map.empty.asJava

465

)

466

467

// Set up cancellation after timeout

468

val executor = Executors.newSingleThreadScheduledExecutor()

469

executor.schedule(new Runnable {

470

def run(): Unit = {

471

try {

472

cliService.cancelOperation(operationHandle)

473

println("Query canceled due to timeout")

474

} catch {

475

case e: Exception => println(s"Error canceling query: ${e.getMessage}")

476

}

477

}

478

}, 30, TimeUnit.SECONDS)

479

480

// Monitor query progress

481

while (true) {

482

val status = cliService.getOperationStatus(operationHandle)

483

status.getState match {

484

case OperationState.FINISHED =>

485

println("Query completed")

486

break

487

case OperationState.CANCELED =>

488

println("Query was canceled")

489

break

490

case OperationState.ERROR =>

491

println("Query failed")

492

break

493

case _ =>

494

Thread.sleep(1000) // Poll every second

495

}

496

}

497

498

executor.shutdown()

499

```