or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

configuration-utilities.mdfile-formats.mdhive-client.mdindex.mdmetastore-integration.mdquery-execution.mdsession-configuration.mdudf-support.md

query-execution.mddocs/

0

# Query Execution

1

2

Specialized execution plans and strategies for Hive table operations and query processing.

3

4

## Core Imports

5

6

```scala

7

import org.apache.spark.sql.execution.SparkPlan

8

import org.apache.spark.sql.hive.execution._

9

import org.apache.spark.sql.hive.HiveStrategies

10

import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan

11

import org.apache.spark.sql.catalyst.rules.Rule

12

import org.apache.spark.sql.catalyst.expressions.Expression

13

```

14

15

## Capabilities

16

17

### Hive Table Scanning

18

19

Physical execution plan for scanning Hive tables with partition pruning and predicate pushdown.

20

21

```scala { .api }

22

case class HiveTableScanExec(

23

requestedAttributes: Seq[Attribute],

24

relation: HiveTableRelation,

25

partitionPruningPred: Seq[Expression]

26

)(@transient private val sparkSession: SparkSession) extends LeafExecNode {

27

28

/** Attributes produced by this execution plan */

29

override def output: Seq[Attribute]

30

31

/** Execute the table scan and produce RDD of internal rows */

32

override protected def doExecute(): RDD[InternalRow]

33

34

/** Statistics for query optimization */

35

override def computeStats(): Statistics

36

37

/** String representation for explain plans */

38

override def simpleString(maxFields: Int): String

39

}

40

```

41

42

### Script Transformation Execution

43

44

Execute Hive script transformations using external processes.

45

46

```scala { .api }

47

case class HiveScriptTransformationExec(

48

script: Seq[Expression],

49

output: Seq[Attribute],

50

child: SparkPlan,

51

ioschema: ScriptTransformationIOSchema

52

) extends UnaryExecNode {

53

54

/** Execute script transformation on input data */

55

override protected def doExecute(): RDD[InternalRow]

56

57

/** Schema for input/output serialization */

58

def ioSchema: ScriptTransformationIOSchema

59

60

/** Generate code for script execution */

61

override def doGenerate(ctx: CodegenContext, ev: ExprCode): ExprCode

62

}

63

```

64

65

### Hive Table Insert Operations

66

67

Command for inserting data into Hive tables with partition support.

68

69

```scala { .api }

70

case class InsertIntoHiveTable(

71

table: CatalogTable,

72

partition: Map[String, Option[String]],

73

query: LogicalPlan,

74

overwrite: Boolean,

75

ifPartitionNotExists: Boolean,

76

outputColumnNames: Seq[String]

77

) extends DataWritingCommand {

78

79

/** Execute the insert operation */

80

override def run(sparkSession: SparkSession, child: SparkPlan): Seq[Row]

81

82

/** Metrics for monitoring insert performance */

83

override lazy val metrics: Map[String, SQLMetric]

84

85

/** Output attributes after insert */

86

override def outputColumns: Seq[Attribute]

87

}

88

```

89

90

### Create Table As Select

91

92

Command for creating Hive tables from SELECT query results.

93

94

```scala { .api }

95

case class CreateHiveTableAsSelectCommand(

96

tableDesc: CatalogTable,

97

query: LogicalPlan,

98

outputColumnNames: Seq[String],

99

mode: SaveMode

100

) extends DataWritingCommand {

101

102

/** Execute table creation and data insertion */

103

override def run(sparkSession: SparkSession, child: SparkPlan): Seq[Row]

104

105

/** Check if table already exists */

106

def tableExists(sparkSession: SparkSession): Boolean

107

108

/** Validate table and query compatibility */

109

def validateTable(sparkSession: SparkSession): Unit

110

}

111

```

112

113

### Insert into Directory

114

115

Command for inserting data into HDFS directories using Hive format.

116

117

```scala { .api }

118

case class InsertIntoHiveDirCommand(

119

isLocal: Boolean,

120

storage: CatalogStorageFormat,

121

query: LogicalPlan,

122

overwrite: Boolean,

123

outputColumnNames: Seq[String]

124

) extends DataWritingCommand {

125

126

/** Execute directory insert operation */

127

override def run(sparkSession: SparkSession, child: SparkPlan): Seq[Row]

128

129

/** Resolve output path for directory insert */

130

def resolveOutputPath(): Path

131

}

132

```

133

134

## Query Planning Strategies

135

136

### Hive-Specific Query Strategies

137

138

```scala { .api }

139

private[hive] trait HiveStrategies {

140

141

/** Strategy for handling script transformations */

142

object HiveScripts extends Strategy {

143

def apply(plan: LogicalPlan): Seq[SparkPlan]

144

}

145

146

/** Strategy for Hive table scans with optimization */

147

object HiveTableScans extends Strategy {

148

def apply(plan: LogicalPlan): Seq[SparkPlan]

149

}

150

}

151

```

152

153

### Analysis Rules

154

155

Rules for analyzing and converting Hive-specific logical plans.

156

157

```scala { .api }

158

/** Convert generic operations to Hive-specific variants */

159

object HiveAnalysis extends Rule[LogicalPlan] {

160

override def apply(plan: LogicalPlan): LogicalPlan

161

}

162

163

/** Convert relations for better performance */

164

case class RelationConversions(

165

sessionCatalog: HiveSessionCatalog

166

) extends Rule[LogicalPlan] {

167

override def apply(plan: LogicalPlan): LogicalPlan

168

}

169

170

/** Resolve Hive SerDe table properties */

171

class ResolveHiveSerdeTable(session: SparkSession) extends Rule[LogicalPlan] {

172

override def apply(plan: LogicalPlan): LogicalPlan

173

}

174

175

/** Determine table statistics from HDFS */

176

class DetermineTableStats(session: SparkSession) extends Rule[LogicalPlan] {

177

override def apply(plan: LogicalPlan): LogicalPlan

178

}

179

```

180

181

## Usage Examples

182

183

### Custom Table Scan with Partition Pruning

184

185

```scala

186

import org.apache.spark.sql.SparkSession

187

import org.apache.spark.sql.catalyst.expressions._

188

189

val spark = SparkSession.builder()

190

.enableHiveSupport()

191

.getOrCreate()

192

193

// Query with partition pruning

194

val partitionedQuery = spark.sql("""

195

SELECT customer_id, order_total

196

FROM sales_partitioned

197

WHERE year = 2023 AND month >= 10

198

""")

199

200

// Examine execution plan

201

partitionedQuery.explain(true)

202

203

// Show only pushed-down partitions

204

partitionedQuery.queryExecution.executedPlan.collect {

205

case scan: HiveTableScanExec =>

206

println(s"Partition filters: ${scan.partitionPruningPred}")

207

}

208

```

209

210

### Script Transformation Example

211

212

```scala

213

// Register custom transformation script

214

spark.sql("""

215

SELECT TRANSFORM(name, age)

216

USING 'python3 /path/to/transform_script.py'

217

AS (processed_name STRING, age_group STRING)

218

FROM users

219

""").show()

220

221

// Alternative: Using script files

222

spark.sql("""

223

FROM users

224

SELECT TRANSFORM(*)

225

USING 'awk -F, "{print $1, ($3 > 30 ? "senior" : "junior")}"'

226

AS (name STRING, category STRING)

227

""").show()

228

```

229

230

### Dynamic Partition Insert

231

232

```scala

233

// Enable dynamic partitioning

234

spark.conf.set("hive.exec.dynamic.partition", "true")

235

spark.conf.set("hive.exec.dynamic.partition.mode", "nonstrict")

236

237

// Insert with dynamic partitions

238

spark.sql("""

239

INSERT INTO TABLE sales_partitioned

240

PARTITION (year, month)

241

SELECT customer_id, order_total, order_date,

242

YEAR(order_date) as year,

243

MONTH(order_date) as month

244

FROM raw_sales

245

""")

246

247

// Check created partitions

248

spark.sql("SHOW PARTITIONS sales_partitioned").show()

249

```

250

251

### Create External Table As Select

252

253

```scala

254

// Create external table from query with custom location

255

spark.sql("""

256

CREATE TABLE external_summary

257

USING HIVE

258

OPTIONS (

259

path '/user/warehouse/external/summary'

260

)

261

AS SELECT

262

customer_id,

263

COUNT(*) as order_count,

264

SUM(order_total) as total_spent

265

FROM orders

266

GROUP BY customer_id

267

""")

268

```

269

270

## Performance Optimization

271

272

### Predicate Pushdown

273

274

```scala

275

// Query demonstrating filter pushdown

276

val optimizedQuery = spark.sql("""

277

SELECT product_name, sales_amount

278

FROM product_sales

279

WHERE category = 'electronics'

280

AND sale_date >= '2023-01-01'

281

AND region IN ('us-west', 'us-east')

282

""")

283

284

// Verify pushdown in execution plan

285

optimizedQuery.queryExecution.optimizedPlan.collect {

286

case Filter(condition, _) =>

287

println(s"Filter condition: $condition")

288

}

289

```

290

291

### Vectorized ORC Reading

292

293

```scala

294

// Enable vectorized reading for better performance

295

spark.conf.set("spark.sql.orc.enableVectorizedReader", "true")

296

spark.conf.set("spark.sql.orc.columnarReaderBatchSize", "4096")

297

298

// Query will use vectorized reader

299

val vectorizedQuery = spark.sql("""

300

SELECT SUM(sales_amount), AVG(quantity)

301

FROM large_orc_table

302

WHERE year = 2023

303

""")

304

```

305

306

## Error Handling

307

308

Common execution exceptions:

309

310

- **SparkException**: General execution failures during query processing

311

- **TaskFailedException**: When individual tasks fail during execution

312

- **AnalysisException**: Schema or table access errors during execution

313

- **MetastoreException**: Hive metastore access errors during execution

314

315

```scala

316

import org.apache.spark.SparkException

317

import org.apache.spark.sql.AnalysisException

318

319

try {

320

val result = spark.sql("""

321

INSERT INTO non_existent_table

322

SELECT * FROM source_table

323

""")

324

result.collect()

325

} catch {

326

case e: AnalysisException if e.getMessage.contains("Table or view not found") =>

327

println("Target table does not exist")

328

case e: SparkException if e.getMessage.contains("Task failed") =>

329

println(s"Execution failed: ${e.getCause}")

330

case e: Exception =>

331

println(s"Unexpected error: ${e.getMessage}")

332

throw e

333

}

334

```

335

336

## Types

337

338

### Execution Plan Types

339

340

```scala { .api }

341

trait DataWritingCommand extends Command {

342

def run(sparkSession: SparkSession, child: SparkPlan): Seq[Row]

343

def outputColumns: Seq[Attribute]

344

def outputOrdering: Seq[SortOrder]

345

def metrics: Map[String, SQLMetric]

346

}

347

348

case class ScriptTransformationIOSchema(

349

inputRowFormat: Seq[(String, String)],

350

outputRowFormat: Seq[(String, String)],

351

inputSerdeClass: Option[String],

352

outputSerdeClass: Option[String],

353

inputSerdeProps: Seq[(String, String)],

354

outputSerdeProps: Seq[(String, String)],

355

recordReaderClass: Option[String],

356

recordWriterClass: Option[String],

357

schemaLess: Boolean

358

)

359

```

360

361

### Table and Partition Types

362

363

```scala { .api }

364

case class HiveTableRelation(

365

tableMeta: CatalogTable,

366

dataCols: Seq[AttributeReference],

367

partitionCols: Seq[AttributeReference],

368

tableStats: Option[Statistics],

369

prunedPartitions: Option[Seq[CatalogTablePartition]]

370

) extends LogicalRelation {

371

372

def isPartitioned: Boolean

373

def partitionSpec: Map[String, String]

374

def computeStats(): Statistics

375

}

376

377

case class CatalogTablePartition(

378

spec: TablePartitionSpec,

379

storage: CatalogStorageFormat,

380

parameters: Map[String, String]

381

) {

382

def location: Option[URI]

383

def toRow: InternalRow

384

}

385

```