or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

cli-interface.mdenvironment-management.mdindex.mdquery-operations.mdserver-management.mdsession-management.mdweb-ui-monitoring.md

query-operations.mddocs/

0

# Query Operations

1

2

Query operations manage SQL statement execution, result handling, and asynchronous processing within the Spark Hive Thrift Server.

3

4

## Operation Manager

5

6

### SparkSQLOperationManager

7

8

Central manager for all SQL query execution operations with session context management.

9

10

```scala { .api }

11

private[thriftserver] class SparkSQLOperationManager extends OperationManager with Logging {

12

val handleToOperation: JMap[OperationHandle, Operation]

13

val sessionToActivePool: ConcurrentHashMap[SessionHandle, String]

14

val sessionToContexts: ConcurrentHashMap[SessionHandle, SQLContext]

15

16

def newExecuteStatementOperation(parentSession: HiveSession, statement: String,

17

confOverlay: JMap[String, String], async: Boolean): ExecuteStatementOperation

18

def setConfMap(conf: SQLConf, confMap: java.util.Map[String, String]): Unit

19

}

20

```

21

22

#### Operation Creation

23

24

The `newExecuteStatementOperation` method creates new query execution operations:

25

26

**Usage Example:**

27

28

```scala

29

import java.util.{Map => JMap}

30

import org.apache.hive.service.cli.session.HiveSession

31

32

val confOverlay: JMap[String, String] = new java.util.HashMap()

33

confOverlay.put("spark.sql.adaptive.enabled", "true")

34

35

val operation = operationManager.newExecuteStatementOperation(

36

parentSession = hiveSession,

37

statement = "SELECT * FROM users WHERE age > 21",

38

confOverlay = confOverlay,

39

async = true

40

)

41

```

42

43

**Operation Creation Process:**

44

45

1. **Session Validation**: Ensures session has valid SQL context

46

2. **Configuration Merging**: Applies session and query-specific configuration

47

3. **Background Execution**: Determines if query should run asynchronously

48

4. **Operation Creation**: Creates `SparkExecuteStatementOperation` instance

49

5. **Registration**: Registers operation handle for tracking

50

51

#### Configuration Management

52

53

The `setConfMap` method applies configuration overrides to SQL contexts:

54

55

```scala

56

def setConfMap(conf: SQLConf, confMap: java.util.Map[String, String]): Unit = {

57

val iterator = confMap.entrySet().iterator()

58

while (iterator.hasNext) {

59

val kv = iterator.next()

60

conf.setConfString(kv.getKey, kv.getValue)

61

}

62

}

63

```

64

65

**Configuration Sources:**

66

- **Session State**: Hive session override configurations

67

- **Session Variables**: User-set Hive variables

68

- **Query Overlay**: Statement-specific configuration parameters

69

70

## Statement Execution

71

72

### SparkExecuteStatementOperation

73

74

Individual query execution operation with result management and data type handling.

75

76

```scala { .api }

77

private[hive] class SparkExecuteStatementOperation(

78

parentSession: HiveSession,

79

statement: String,

80

confOverlay: JMap[String, String],

81

runInBackground: Boolean = true

82

)(sqlContext: SQLContext, sessionToActivePool: JMap[SessionHandle, String])

83

extends ExecuteStatementOperation with Logging {

84

85

def close(): Unit

86

def addNonNullColumnValue(from: SparkRow, to: ArrayBuffer[Any], ordinal: Int): Unit

87

}

88

```

89

90

#### Query Execution Process

91

92

**Synchronous Execution:**

93

```scala

94

// Execute immediately and wait for results

95

val operation = new SparkExecuteStatementOperation(

96

session, "SELECT COUNT(*) FROM large_table", confOverlay, runInBackground = false

97

)(sqlContext, sessionToActivePool)

98

```

99

100

**Asynchronous Execution:**

101

```scala

102

// Execute in background thread

103

val operation = new SparkExecuteStatementOperation(

104

session, "SELECT * FROM large_table", confOverlay, runInBackground = true

105

)(sqlContext, sessionToActivePool)

106

```

107

108

#### Result Processing

109

110

The operation handles different data types in query results:

111

112

```scala

113

def addNonNullColumnValue(from: SparkRow, to: ArrayBuffer[Any], ordinal: Int): Unit = {

114

dataTypes(ordinal) match {

115

case StringType => to += from.getString(ordinal)

116

case IntegerType => to += from.getInt(ordinal)

117

case BooleanType => to += from.getBoolean(ordinal)

118

case DoubleType => to += from.getDouble(ordinal)

119

case FloatType => to += from.getFloat(ordinal)

120

case DecimalType() => to += from.getDecimal(ordinal)

121

case LongType => to += from.getLong(ordinal)

122

case ByteType => to += from.getByte(ordinal)

123

case ShortType => to += from.getShort(ordinal)

124

case DateType => to += from.getAs[Date](ordinal)

125

case TimestampType => to += from.getAs[Timestamp](ordinal)

126

// Additional type handling...

127

}

128

}

129

```

130

131

**Supported Data Types:**

132

- **Primitive Types**: String, Integer, Boolean, Double, Float, Long, Byte, Short

133

- **Decimal Types**: High-precision decimal numbers

134

- **Date/Time Types**: Date and Timestamp values

135

- **Complex Types**: Arrays, Maps, Structs (via nested processing)

136

137

#### Resource Management

138

139

Operations include comprehensive resource cleanup:

140

141

```scala

142

def close(): Unit = {

143

// RDDs will be cleaned automatically upon garbage collection

144

logDebug(s"CLOSING $statementId")

145

cleanup(OperationState.CLOSED)

146

sqlContext.sparkContext.clearJobGroup()

147

}

148

```

149

150

**Cleanup Process:**

151

1. **State Update**: Marks operation as closed

152

2. **Job Cancellation**: Cancels any running Spark jobs

153

3. **Memory Cleanup**: Releases cached result data

154

4. **Resource Release**: Frees system resources

155

156

## Execution Tracking

157

158

### ExecutionInfo

159

160

Detailed tracking of query execution lifecycle and performance metrics.

161

162

```scala { .api }

163

private[thriftserver] class ExecutionInfo(

164

statement: String,

165

sessionId: String,

166

startTimestamp: Long,

167

userName: String

168

) {

169

var finishTimestamp: Long = 0L

170

var executePlan: String = ""

171

var detail: String = ""

172

var state: ExecutionState.Value = ExecutionState.STARTED

173

val jobId: ArrayBuffer[String] = ArrayBuffer[String]()

174

var groupId: String = ""

175

def totalTime: Long

176

}

177

```

178

179

#### Execution States

180

181

```scala { .api }

182

private[thriftserver] object ExecutionState extends Enumeration {

183

val STARTED, COMPILED, FAILED, FINISHED = Value

184

type ExecutionState = Value

185

}

186

```

187

188

**State Transitions:**

189

1. **STARTED**: Query execution begins

190

2. **COMPILED**: SQL parsed and execution plan generated

191

3. **FAILED**: Query execution encountered error

192

4. **FINISHED**: Query completed successfully

193

194

#### Event Tracking

195

196

**Query Start:**

197

```scala

198

def onStatementStart(id: String, sessionId: String, statement: String,

199

groupId: String, userName: String = "UNKNOWN"): Unit = synchronized {

200

val info = new ExecutionInfo(statement, sessionId, System.currentTimeMillis, userName)

201

info.state = ExecutionState.STARTED

202

executionList.put(id, info)

203

trimExecutionIfNecessary()

204

sessionList(sessionId).totalExecution += 1

205

executionList(id).groupId = groupId

206

totalRunning += 1

207

}

208

```

209

210

**Query Completion:**

211

```scala

212

def onStatementFinish(id: String): Unit = synchronized {

213

executionList(id).finishTimestamp = System.currentTimeMillis

214

executionList(id).state = ExecutionState.FINISHED

215

totalRunning -= 1

216

trimExecutionIfNecessary()

217

}

218

```

219

220

**Error Handling:**

221

```scala

222

def onStatementError(id: String, errorMessage: String, errorTrace: String): Unit = {

223

synchronized {

224

executionList(id).finishTimestamp = System.currentTimeMillis

225

executionList(id).detail = errorMessage

226

executionList(id).state = ExecutionState.FAILED

227

totalRunning -= 1

228

trimExecutionIfNecessary()

229

}

230

}

231

```

232

233

## Performance Optimization

234

235

### Result Caching

236

237

Operations support different result collection strategies:

238

239

**Incremental Collection:**

240

```scala

241

// Enable incremental result fetching

242

spark.sql.thriftServer.incrementalCollect=true

243

```

244

- Results streamed as they become available

245

- Lower memory usage for large result sets

246

- Better responsiveness for interactive queries

247

248

**Full Collection:**

249

```scala

250

// Cache complete results (default)

251

spark.sql.thriftServer.incrementalCollect=false

252

```

253

- All results cached in memory

254

- Supports FETCH_FIRST operations

255

- Higher memory usage but better performance for small results

256

257

### Asynchronous Processing

258

259

Background execution prevents client blocking:

260

261

```scala

262

val runInBackground = async && conf.getConf(HiveUtils.HIVE_THRIFT_SERVER_ASYNC)

263

```

264

265

**Benefits:**

266

- Client remains responsive during long queries

267

- Multiple concurrent query execution

268

- Better resource utilization

269

270

### Job Group Management

271

272

Operations use Spark job groups for resource management:

273

274

```scala

275

sqlContext.sparkContext.setJobGroup(statementId, statement, interruptOnCancel = true)

276

```

277

278

**Features:**

279

- **Query Identification**: Link Spark jobs to SQL statements

280

- **Cancellation Support**: Enable query interruption

281

- **Resource Tracking**: Monitor resource usage per query

282

- **Performance Monitoring**: Collect execution metrics