0
# Execution Engine
1
2
The Apache Spark Hive integration execution engine provides specialized physical plans and execution strategies for Hive table operations, including table scanning, data insertion, and table creation with optimized performance for Hive-compatible formats.
3
4
## Physical Execution Plans
5
6
### HiveTableScanExec
7
8
Physical execution plan for scanning Hive tables with partition pruning and predicate pushdown capabilities.
9
10
```scala { .api }
11
case class HiveTableScanExec(
12
requestedAttributes: Seq[Attribute],
13
relation: HiveTableRelation,
14
partitionPruningPred: Seq[Expression]
15
) extends LeafExecNode with CodegenSupport {
16
17
def doExecute(): RDD[InternalRow]
18
def inputRDDs(): Seq[RDD[InternalRow]]
19
def doProduce(ctx: CodegenContext): String
20
def doConsume(ctx: CodegenContext, input: Seq[ExprCode], row: ExprCode): String
21
}
22
```
23
24
**Usage Example:**
25
26
The HiveTableScanExec is automatically generated when querying Hive tables:
27
28
```scala
29
// This query will generate HiveTableScanExec plan
30
val result = spark.sql("""
31
SELECT id, name, department
32
FROM employee
33
WHERE department = 'Engineering' AND hire_date > '2020-01-01'
34
""")
35
36
// View execution plan
37
result.explain(true)
38
```
39
40
**Key Features:**
41
- **Partition Pruning**: Automatically eliminates irrelevant partitions
42
- **Predicate Pushdown**: Pushes filters to storage layer when possible
43
- **Column Pruning**: Reads only required columns
44
- **Code Generation**: Supports code generation for better performance
45
46
### HiveTableRelation
47
48
Logical representation of a Hive table used in physical planning.
49
50
```scala { .api }
51
case class HiveTableRelation(
52
tableMeta: CatalogTable,
53
dataCols: Seq[Attribute],
54
partitionCols: Seq[Attribute]
55
) extends LeafNode with MultiInstanceRelation {
56
57
def output: Seq[Attribute]
58
def refresh(): Unit
59
def newInstance(): HiveTableRelation
60
}
61
```
62
63
## Data Insertion Operations
64
65
### InsertIntoHiveTable
66
67
Command for inserting data into Hive tables with support for static and dynamic partitioning.
68
69
```scala { .api }
70
case class InsertIntoHiveTable(
71
table: CatalogTable,
72
partition: Map[String, Option[String]],
73
query: LogicalPlan,
74
overwrite: Boolean,
75
ifPartitionNotExists: Boolean
76
) extends UnaryCommand {
77
78
def run(sparkSession: SparkSession, child: SparkPlan): Seq[Row]
79
def innerChildren: Seq[QueryPlan[_]]
80
}
81
```
82
83
**Usage Examples:**
84
85
**Static Partitioning:**
86
```scala
87
// Insert into specific partition
88
spark.sql("""
89
INSERT INTO TABLE partitioned_sales PARTITION(year=2023, month=12)
90
SELECT transaction_id, amount, customer_id FROM daily_sales
91
WHERE date_col = '2023-12-01'
92
""")
93
94
// Overwrite partition
95
spark.sql("""
96
INSERT OVERWRITE TABLE partitioned_sales PARTITION(year=2023, month=12)
97
SELECT transaction_id, amount, customer_id FROM corrected_sales
98
WHERE date_col = '2023-12-01'
99
""")
100
```
101
102
**Dynamic Partitioning:**
103
```scala
104
// Enable dynamic partitioning
105
spark.conf.set("hive.exec.dynamic.partition", "true")
106
spark.conf.set("hive.exec.dynamic.partition.mode", "nonstrict")
107
108
// Insert with dynamic partitioning
109
spark.sql("""
110
INSERT INTO TABLE partitioned_sales PARTITION(year, month)
111
SELECT transaction_id, amount, customer_id,
112
year(transaction_date), month(transaction_date)
113
FROM raw_sales
114
""")
115
```
116
117
**Conditional Insert:**
118
```scala
119
// Insert only if partition doesn't exist
120
spark.sql("""
121
ALTER TABLE partitioned_sales ADD IF NOT EXISTS PARTITION(year=2023, month=11)
122
""")
123
124
spark.sql("""
125
INSERT INTO TABLE partitioned_sales PARTITION(year=2023, month=11)
126
SELECT * FROM source_data WHERE year=2023 AND month=11
127
""")
128
```
129
130
## Table Creation Operations
131
132
### CreateHiveTableAsSelectCommand
133
134
Command for creating Hive tables from query results with configurable storage formats and properties.
135
136
```scala { .api }
137
case class CreateHiveTableAsSelectCommand(
138
tableDesc: CatalogTable,
139
query: LogicalPlan,
140
ignoreIfExists: Boolean
141
) extends DataWritingCommand {
142
143
def run(sparkSession: SparkSession, child: SparkPlan): Seq[Row]
144
def innerChildren: Seq[QueryPlan[_]]
145
}
146
```
147
148
**Usage Examples:**
149
150
**Basic CTAS:**
151
```scala
152
spark.sql("""
153
CREATE TABLE employee_summary AS
154
SELECT department,
155
COUNT(*) as employee_count,
156
AVG(salary) as avg_salary
157
FROM employee
158
GROUP BY department
159
""")
160
```
161
162
**CTAS with Storage Format:**
163
```scala
164
spark.sql("""
165
CREATE TABLE employee_orc
166
USING HIVE
167
STORED AS ORC
168
AS SELECT * FROM employee
169
""")
170
```
171
172
**CTAS with Partitioning:**
173
```scala
174
spark.sql("""
175
CREATE TABLE partitioned_employee_summary
176
USING HIVE
177
PARTITIONED BY (department)
178
STORED AS PARQUET
179
AS SELECT id, name, salary, department FROM employee
180
""")
181
```
182
183
**CTAS with Properties:**
184
```scala
185
spark.sql("""
186
CREATE TABLE compressed_employee
187
USING HIVE
188
STORED AS ORC
189
TBLPROPERTIES (
190
'orc.compress'='SNAPPY',
191
'orc.stripe.size'='67108864'
192
)
193
AS SELECT * FROM employee
194
""")
195
```
196
197
## Script Transformation
198
199
### ScriptTransformationExec
200
201
Execution plan for TRANSFORM queries using external scripts (MAP-REDUCE style processing).
202
203
```scala { .api }
204
case class ScriptTransformationExec(
205
input: Seq[Expression],
206
script: String,
207
output: Seq[Attribute],
208
child: SparkPlan,
209
ioschema: HiveScriptIOSchema
210
) extends UnaryExecNode {
211
212
def doExecute(): RDD[InternalRow]
213
protected def withNewChildInternal(newChild: SparkPlan): ScriptTransformationExec
214
}
215
```
216
217
**Usage Example:**
218
219
```scala
220
// Transform using external script
221
spark.sql("""
222
SELECT TRANSFORM(id, name, salary)
223
USING 'python process_employee.py'
224
AS (processed_id INT, processed_name STRING, salary_grade STRING)
225
FROM employee
226
""")
227
228
// Transform with custom input/output format
229
spark.sql("""
230
SELECT TRANSFORM(name, department)
231
ROW FORMAT DELIMITED
232
FIELDS TERMINATED BY '\t'
233
USING '/usr/bin/python3 transform_data.py'
234
ROW FORMAT DELIMITED
235
FIELDS TERMINATED BY ','
236
AS (transformed_name STRING, dept_code STRING)
237
FROM employee
238
""")
239
```
240
241
## Hive Query Strategies
242
243
### HiveStrategies
244
245
Planning strategies specific to Hive integration that determine optimal execution plans.
246
247
```scala { .api }
248
object HiveStrategies extends Strategy {
249
def apply(plan: LogicalPlan): Seq[SparkPlan]
250
251
// Specific strategies
252
object Scripts extends Strategy
253
object DataSinks extends Strategy
254
object DDLStrategy extends Strategy
255
}
256
```
257
258
**Key Strategies:**
259
260
1. **Hive Table Scans**: Optimized scanning of Hive tables
261
2. **Script Transformations**: Execution of TRANSFORM queries
262
3. **Data Sinks**: Efficient writing to Hive tables
263
4. **DDL Operations**: Handling Hive DDL commands
264
265
## Optimization Features
266
267
### Predicate Pushdown
268
269
Automatic pushdown of filters to reduce data scanning:
270
271
```scala
272
// Filters pushed down to storage layer
273
val optimizedQuery = spark.sql("""
274
SELECT id, name
275
FROM large_table
276
WHERE year = 2023 AND month = 12 AND status = 'ACTIVE'
277
""")
278
279
// Check pushdown in execution plan
280
optimizedQuery.queryExecution.executedPlan
281
```
282
283
### Partition Pruning
284
285
Elimination of unnecessary partition scans:
286
287
```scala
288
// Only scans relevant partitions
289
val prunedQuery = spark.sql("""
290
SELECT COUNT(*)
291
FROM partitioned_sales
292
WHERE year IN (2022, 2023) AND month > 6
293
""")
294
295
// Verify partition pruning
296
prunedQuery.queryExecution.optimizedPlan
297
```
298
299
### Column Pruning
300
301
Reading only required columns from storage:
302
303
```scala
304
// Only reads 'name' and 'salary' columns
305
val columnPrunedQuery = spark.sql("""
306
SELECT name, salary
307
FROM employee_with_many_columns
308
WHERE department = 'Engineering'
309
""")
310
```
311
312
## Bucketed Table Support
313
314
### Bucketed Reads
315
316
Optimized reading from bucketed Hive tables:
317
318
```scala
319
// Create bucketed table
320
spark.sql("""
321
CREATE TABLE bucketed_employee (
322
id INT, name STRING, department STRING, salary DOUBLE
323
) USING HIVE
324
CLUSTERED BY (id) INTO 4 BUCKETS
325
STORED AS ORC
326
""")
327
328
// Bucketed joins (automatic optimization)
329
val bucketedJoin = spark.sql("""
330
SELECT e1.name, e2.name as manager_name
331
FROM bucketed_employee e1
332
JOIN bucketed_employee e2 ON e1.manager_id = e2.id
333
""")
334
```
335
336
### Sort-Merge Bucket Joins
337
338
High-performance joins for bucketed tables:
339
340
```scala
341
// Enable sort-merge bucket joins
342
spark.conf.set("spark.sql.bucketing.coalesceBucketsInJoin.enabled", "true")
343
344
// Automatic SMB join for compatible bucketed tables
345
val smbJoin = spark.sql("""
346
SELECT o.order_id, c.customer_name
347
FROM bucketed_orders o
348
JOIN bucketed_customers c ON o.customer_id = c.customer_id
349
""")
350
```
351
352
## Configuration and Tuning
353
354
### Execution Configuration
355
356
```scala
357
// Configure Hive execution settings
358
spark.conf.set("hive.exec.dynamic.partition", "true")
359
spark.conf.set("hive.exec.dynamic.partition.mode", "nonstrict")
360
spark.conf.set("hive.exec.max.dynamic.partitions", "5000")
361
spark.conf.set("hive.exec.max.dynamic.partitions.pernode", "2000")
362
363
// Control small files
364
spark.conf.set("hive.merge.tezfiles", "true")
365
spark.conf.set("hive.merge.smallfiles.avgsize", "16000000")
366
```
367
368
### Performance Tuning
369
370
```scala
371
// Optimize for large tables
372
spark.conf.set("spark.sql.adaptive.enabled", "true")
373
spark.conf.set("spark.sql.adaptive.coalescePartitions.enabled", "true")
374
375
// Configure join strategies
376
spark.conf.set("spark.sql.join.preferSortMergeJoin", "true")
377
spark.conf.set("spark.sql.bucketing.coalesceBucketsInJoin.enabled", "true")
378
379
// Memory management
380
spark.conf.set("spark.sql.shuffle.partitions", "200")
381
spark.conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
382
```
383
384
## Error Handling
385
386
### Common Execution Errors
387
388
**Partition Not Found:**
389
```scala
390
// Handle missing partitions
391
spark.conf.set("hive.exec.dynamic.partition.mode", "nonstrict")
392
393
// Graceful handling
394
val safeQuery = spark.sql("""
395
SELECT * FROM partitioned_table
396
WHERE year = 2023 AND month BETWEEN 1 AND 12
397
""").filter($"year".isNotNull && $"month".isNotNull)
398
```
399
400
**Schema Evolution:**
401
```scala
402
// Handle schema changes
403
spark.conf.set("spark.sql.adaptive.coalescePartitions.enabled", "true")
404
405
// Read with schema tolerance
406
val schema_tolerant = spark.read
407
.option("mergeSchema", "true")
408
.table("evolving_table")
409
```
410
411
**Resource Constraints:**
412
```scala
413
// Optimize for limited resources
414
spark.conf.set("spark.sql.adaptive.maxShuffledHashJoinLocalMapThreshold", "0")
415
spark.conf.set("spark.sql.adaptive.localShuffleReader.enabled", "true")
416
```
417
418
## Monitoring and Debugging
419
420
### Execution Plan Analysis
421
422
```scala
423
// View complete execution plan
424
val query = spark.sql("SELECT * FROM large_hive_table WHERE id > 1000")
425
426
// Physical plan
427
query.explain(true)
428
429
// Formatted plan
430
query.queryExecution.debug.codegen()
431
432
// Execution statistics
433
query.queryExecution.executedPlan.execute().count()
434
```
435
436
### Performance Metrics
437
438
```scala
439
// Enable metrics collection
440
spark.conf.set("spark.sql.adaptive.enabled", "true")
441
spark.conf.set("spark.sql.adaptive.logLevel", "INFO")
442
443
// Access metrics after execution
444
val metrics = query.queryExecution.executedPlan.metrics
445
metrics.foreach { case (name, metric) =>
446
println(s"$name: ${metric.value}")
447
}
448
```
449
450
## Types
451
452
```scala { .api }
453
// Base execution node
454
trait SparkPlan extends QueryPlan[SparkPlan] {
455
def execute(): RDD[InternalRow]
456
def executeCollect(): Array[InternalRow]
457
def metrics: Map[String, SQLMetric]
458
}
459
460
// Leaf execution node
461
trait LeafExecNode extends SparkPlan {
462
final override def children: Seq[SparkPlan] = Nil
463
def doExecute(): RDD[InternalRow]
464
}
465
466
// Unary execution node
467
trait UnaryExecNode extends SparkPlan {
468
def child: SparkPlan
469
final override def children: Seq[SparkPlan] = child :: Nil
470
}
471
472
// Code generation support
473
trait CodegenSupport extends SparkPlan {
474
def doProduce(ctx: CodegenContext): String
475
def doConsume(ctx: CodegenContext, input: Seq[ExprCode], row: ExprCode): String
476
}
477
478
// Data writing command
479
trait DataWritingCommand extends Command {
480
def outputColumnNames: Seq[String]
481
def outputColumns: Seq[Attribute]
482
}
483
484
// Command interface
485
trait Command extends LogicalPlan {
486
override def output: Seq[Attribute] = Seq.empty
487
override def children: Seq[LogicalPlan] = Seq.empty
488
def run(sparkSession: SparkSession): Seq[Row]
489
}
490
491
// Hive script I/O schema
492
case class HiveScriptIOSchema(
493
inputRowFormat: Seq[(String, String)],
494
outputRowFormat: Seq[(String, String)],
495
inputSerdeClass: Option[String],
496
outputSerdeClass: Option[String],
497
inputSerdeProps: Seq[(String, String)],
498
outputSerdeProps: Seq[(String, String)],
499
recordReaderClass: Option[String],
500
recordWriterClass: Option[String],
501
schemaLess: Boolean
502
)
503
504
// Table partition specification
505
type TablePartitionSpec = Map[String, String]
506
507
// SQL metric for execution statistics
508
class SQLMetric(
509
val metricType: String,
510
initValue: Long = 0L
511
) extends AccumulatorV2[Long, Long]
512
513
// Expression for column references and computations
514
trait Expression extends TreeNode[Expression] {
515
def dataType: DataType
516
def nullable: Boolean
517
def eval(input: InternalRow): Any
518
}
519
520
// Attribute for column metadata
521
trait Attribute extends Expression with NamedExpression {
522
def name: String
523
def dataType: DataType
524
def nullable: Boolean
525
def qualifier: Seq[String]
526
}
527
```