0
# Query Execution
1
2
Specialized execution plans and strategies for Hive table operations and query processing.
3
4
## Core Imports
5
6
```scala
7
import org.apache.spark.sql.execution.SparkPlan
8
import org.apache.spark.sql.hive.execution._
9
import org.apache.spark.sql.hive.HiveStrategies
10
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
11
import org.apache.spark.sql.catalyst.rules.Rule
12
import org.apache.spark.sql.catalyst.expressions.Expression
13
```
14
15
## Capabilities
16
17
### Hive Table Scanning
18
19
Physical execution plan for scanning Hive tables with partition pruning and predicate pushdown.
20
21
```scala { .api }
22
case class HiveTableScanExec(
23
requestedAttributes: Seq[Attribute],
24
relation: HiveTableRelation,
25
partitionPruningPred: Seq[Expression]
26
)(@transient private val sparkSession: SparkSession) extends LeafExecNode {
27
28
/** Attributes produced by this execution plan */
29
override def output: Seq[Attribute]
30
31
/** Execute the table scan and produce RDD of internal rows */
32
override protected def doExecute(): RDD[InternalRow]
33
34
/** Statistics for query optimization */
35
override def computeStats(): Statistics
36
37
/** String representation for explain plans */
38
override def simpleString(maxFields: Int): String
39
}
40
```
41
42
### Script Transformation Execution
43
44
Execute Hive script transformations using external processes.
45
46
```scala { .api }
47
case class HiveScriptTransformationExec(
48
script: Seq[Expression],
49
output: Seq[Attribute],
50
child: SparkPlan,
51
ioschema: ScriptTransformationIOSchema
52
) extends UnaryExecNode {
53
54
/** Execute script transformation on input data */
55
override protected def doExecute(): RDD[InternalRow]
56
57
/** Schema for input/output serialization */
58
def ioSchema: ScriptTransformationIOSchema
59
60
/** Generate code for script execution */
61
override def doGenerate(ctx: CodegenContext, ev: ExprCode): ExprCode
62
}
63
```
64
65
### Hive Table Insert Operations
66
67
Command for inserting data into Hive tables with partition support.
68
69
```scala { .api }
70
case class InsertIntoHiveTable(
71
table: CatalogTable,
72
partition: Map[String, Option[String]],
73
query: LogicalPlan,
74
overwrite: Boolean,
75
ifPartitionNotExists: Boolean,
76
outputColumnNames: Seq[String]
77
) extends DataWritingCommand {
78
79
/** Execute the insert operation */
80
override def run(sparkSession: SparkSession, child: SparkPlan): Seq[Row]
81
82
/** Metrics for monitoring insert performance */
83
override lazy val metrics: Map[String, SQLMetric]
84
85
/** Output attributes after insert */
86
override def outputColumns: Seq[Attribute]
87
}
88
```
89
90
### Create Table As Select
91
92
Command for creating Hive tables from SELECT query results.
93
94
```scala { .api }
95
case class CreateHiveTableAsSelectCommand(
96
tableDesc: CatalogTable,
97
query: LogicalPlan,
98
outputColumnNames: Seq[String],
99
mode: SaveMode
100
) extends DataWritingCommand {
101
102
/** Execute table creation and data insertion */
103
override def run(sparkSession: SparkSession, child: SparkPlan): Seq[Row]
104
105
/** Check if table already exists */
106
def tableExists(sparkSession: SparkSession): Boolean
107
108
/** Validate table and query compatibility */
109
def validateTable(sparkSession: SparkSession): Unit
110
}
111
```
112
113
### Insert into Directory
114
115
Command for inserting data into HDFS directories using Hive format.
116
117
```scala { .api }
118
case class InsertIntoHiveDirCommand(
119
isLocal: Boolean,
120
storage: CatalogStorageFormat,
121
query: LogicalPlan,
122
overwrite: Boolean,
123
outputColumnNames: Seq[String]
124
) extends DataWritingCommand {
125
126
/** Execute directory insert operation */
127
override def run(sparkSession: SparkSession, child: SparkPlan): Seq[Row]
128
129
/** Resolve output path for directory insert */
130
def resolveOutputPath(): Path
131
}
132
```
133
134
## Query Planning Strategies
135
136
### Hive-Specific Query Strategies
137
138
```scala { .api }
139
private[hive] trait HiveStrategies {
140
141
/** Strategy for handling script transformations */
142
object HiveScripts extends Strategy {
143
def apply(plan: LogicalPlan): Seq[SparkPlan]
144
}
145
146
/** Strategy for Hive table scans with optimization */
147
object HiveTableScans extends Strategy {
148
def apply(plan: LogicalPlan): Seq[SparkPlan]
149
}
150
}
151
```
152
153
### Analysis Rules
154
155
Rules for analyzing and converting Hive-specific logical plans.
156
157
```scala { .api }
158
/** Convert generic operations to Hive-specific variants */
159
object HiveAnalysis extends Rule[LogicalPlan] {
160
override def apply(plan: LogicalPlan): LogicalPlan
161
}
162
163
/** Convert relations for better performance */
164
case class RelationConversions(
165
sessionCatalog: HiveSessionCatalog
166
) extends Rule[LogicalPlan] {
167
override def apply(plan: LogicalPlan): LogicalPlan
168
}
169
170
/** Resolve Hive SerDe table properties */
171
class ResolveHiveSerdeTable(session: SparkSession) extends Rule[LogicalPlan] {
172
override def apply(plan: LogicalPlan): LogicalPlan
173
}
174
175
/** Determine table statistics from HDFS */
176
class DetermineTableStats(session: SparkSession) extends Rule[LogicalPlan] {
177
override def apply(plan: LogicalPlan): LogicalPlan
178
}
179
```
180
181
## Usage Examples
182
183
### Custom Table Scan with Partition Pruning
184
185
```scala
186
import org.apache.spark.sql.SparkSession
187
import org.apache.spark.sql.catalyst.expressions._
188
189
val spark = SparkSession.builder()
190
.enableHiveSupport()
191
.getOrCreate()
192
193
// Query with partition pruning
194
val partitionedQuery = spark.sql("""
195
SELECT customer_id, order_total
196
FROM sales_partitioned
197
WHERE year = 2023 AND month >= 10
198
""")
199
200
// Examine execution plan
201
partitionedQuery.explain(true)
202
203
// Show only pushed-down partitions
204
partitionedQuery.queryExecution.executedPlan.collect {
205
case scan: HiveTableScanExec =>
206
println(s"Partition filters: ${scan.partitionPruningPred}")
207
}
208
```
209
210
### Script Transformation Example
211
212
```scala
213
// Register custom transformation script
214
spark.sql("""
215
SELECT TRANSFORM(name, age)
216
USING 'python3 /path/to/transform_script.py'
217
AS (processed_name STRING, age_group STRING)
218
FROM users
219
""").show()
220
221
// Alternative: Using script files
222
spark.sql("""
223
FROM users
224
SELECT TRANSFORM(*)
225
USING 'awk -F, "{print $1, ($3 > 30 ? "senior" : "junior")}"'
226
AS (name STRING, category STRING)
227
""").show()
228
```
229
230
### Dynamic Partition Insert
231
232
```scala
233
// Enable dynamic partitioning
234
spark.conf.set("hive.exec.dynamic.partition", "true")
235
spark.conf.set("hive.exec.dynamic.partition.mode", "nonstrict")
236
237
// Insert with dynamic partitions
238
spark.sql("""
239
INSERT INTO TABLE sales_partitioned
240
PARTITION (year, month)
241
SELECT customer_id, order_total, order_date,
242
YEAR(order_date) as year,
243
MONTH(order_date) as month
244
FROM raw_sales
245
""")
246
247
// Check created partitions
248
spark.sql("SHOW PARTITIONS sales_partitioned").show()
249
```
250
251
### Create External Table As Select
252
253
```scala
254
// Create external table from query with custom location
255
spark.sql("""
256
CREATE TABLE external_summary
257
USING HIVE
258
OPTIONS (
259
path '/user/warehouse/external/summary'
260
)
261
AS SELECT
262
customer_id,
263
COUNT(*) as order_count,
264
SUM(order_total) as total_spent
265
FROM orders
266
GROUP BY customer_id
267
""")
268
```
269
270
## Performance Optimization
271
272
### Predicate Pushdown
273
274
```scala
275
// Query demonstrating filter pushdown
276
val optimizedQuery = spark.sql("""
277
SELECT product_name, sales_amount
278
FROM product_sales
279
WHERE category = 'electronics'
280
AND sale_date >= '2023-01-01'
281
AND region IN ('us-west', 'us-east')
282
""")
283
284
// Verify pushdown in execution plan
285
optimizedQuery.queryExecution.optimizedPlan.collect {
286
case Filter(condition, _) =>
287
println(s"Filter condition: $condition")
288
}
289
```
290
291
### Vectorized ORC Reading
292
293
```scala
294
// Enable vectorized reading for better performance
295
spark.conf.set("spark.sql.orc.enableVectorizedReader", "true")
296
spark.conf.set("spark.sql.orc.columnarReaderBatchSize", "4096")
297
298
// Query will use vectorized reader
299
val vectorizedQuery = spark.sql("""
300
SELECT SUM(sales_amount), AVG(quantity)
301
FROM large_orc_table
302
WHERE year = 2023
303
""")
304
```
305
306
## Error Handling
307
308
Common execution exceptions:
309
310
- **SparkException**: General execution failures during query processing
311
- **TaskFailedException**: When individual tasks fail during execution
312
- **AnalysisException**: Schema or table access errors during execution
313
- **MetastoreException**: Hive metastore access errors during execution
314
315
```scala
316
import org.apache.spark.SparkException
317
import org.apache.spark.sql.AnalysisException
318
319
try {
320
val result = spark.sql("""
321
INSERT INTO non_existent_table
322
SELECT * FROM source_table
323
""")
324
result.collect()
325
} catch {
326
case e: AnalysisException if e.getMessage.contains("Table or view not found") =>
327
println("Target table does not exist")
328
case e: SparkException if e.getMessage.contains("Task failed") =>
329
println(s"Execution failed: ${e.getCause}")
330
case e: Exception =>
331
println(s"Unexpected error: ${e.getMessage}")
332
throw e
333
}
334
```
335
336
## Types
337
338
### Execution Plan Types
339
340
```scala { .api }
341
trait DataWritingCommand extends Command {
342
def run(sparkSession: SparkSession, child: SparkPlan): Seq[Row]
343
def outputColumns: Seq[Attribute]
344
def outputOrdering: Seq[SortOrder]
345
def metrics: Map[String, SQLMetric]
346
}
347
348
case class ScriptTransformationIOSchema(
349
inputRowFormat: Seq[(String, String)],
350
outputRowFormat: Seq[(String, String)],
351
inputSerdeClass: Option[String],
352
outputSerdeClass: Option[String],
353
inputSerdeProps: Seq[(String, String)],
354
outputSerdeProps: Seq[(String, String)],
355
recordReaderClass: Option[String],
356
recordWriterClass: Option[String],
357
schemaLess: Boolean
358
)
359
```
360
361
### Table and Partition Types
362
363
```scala { .api }
364
case class HiveTableRelation(
365
tableMeta: CatalogTable,
366
dataCols: Seq[AttributeReference],
367
partitionCols: Seq[AttributeReference],
368
tableStats: Option[Statistics],
369
prunedPartitions: Option[Seq[CatalogTablePartition]]
370
) extends LogicalRelation {
371
372
def isPartitioned: Boolean
373
def partitionSpec: Map[String, String]
374
def computeStats(): Statistics
375
}
376
377
case class CatalogTablePartition(
378
spec: TablePartitionSpec,
379
storage: CatalogStorageFormat,
380
parameters: Map[String, String]
381
) {
382
def location: Option[URI]
383
def toRow: InternalRow
384
}
385
```