0
# Optimization
1
2
Rule-based optimization system with built-in optimizations for query plan improvement and performance enhancement.
3
4
## Capabilities
5
6
### Optimizer Framework
7
8
The main optimizer containing collections of optimization rules.
9
10
```scala { .api }
11
/**
12
* Collection of optimization rules for logical plans
13
*/
14
object Optimizer extends RuleExecutor[LogicalPlan] {
15
/** Sequence of optimization rule batches */
16
def batches: Seq[Batch]
17
18
/** Execute all optimization rules on a plan */
19
def execute(plan: LogicalPlan): LogicalPlan
20
}
21
22
/**
23
* Base class for transformation rules
24
*/
25
abstract class Rule[TreeType <: TreeNode[TreeType]] extends Logging {
26
/** Rule name for debugging and logging */
27
def ruleName: String = this.getClass.getSimpleName
28
29
/** Apply rule to a tree node */
30
def apply(plan: TreeType): TreeType
31
}
32
33
/**
34
* Executes batches of rules with different strategies
35
*/
36
abstract class RuleExecutor[TreeType <: TreeNode[TreeType]] extends Logging {
37
/** Execute all rule batches on input */
38
def execute(plan: TreeType): TreeType
39
40
/** Sequence of rule batches to execute */
41
def batches: Seq[Batch]
42
43
/** Maximum number of iterations per batch */
44
protected def maxIterations: Int = 100
45
}
46
47
/**
48
* Group of rules executed together with execution strategy
49
*/
50
case class Batch(name: String, strategy: Strategy, rules: Rule[TreeType]*) {
51
/** Rules in this batch */
52
def rulesIterator: Iterator[Rule[TreeType]] = rules.iterator
53
}
54
55
/** Execution strategies for rule batches */
56
sealed abstract class Strategy
57
case object Once extends Strategy
58
case class FixedPoint(maxIterations: Int) extends Strategy
59
```
60
61
**Usage Examples:**
62
63
```scala
64
import org.apache.spark.sql.catalyst.optimizer._
65
import org.apache.spark.sql.catalyst.plans.logical._
66
import org.apache.spark.sql.catalyst.expressions._
67
import org.apache.spark.sql.catalyst.rules._
68
69
// Apply optimizer to a logical plan
70
val plan = Project(
71
Seq(Add(Literal(1), Literal(2)).as("sum")), // 1 + 2 (can be folded)
72
LocalRelation(AttributeReference("dummy", IntegerType, false)())
73
)
74
75
val optimizedPlan = Optimizer.execute(plan)
76
// Result: Literal(3) replaces Add(Literal(1), Literal(2))
77
78
// Create custom optimization rule
79
object MyCustomRule extends Rule[LogicalPlan] {
80
def apply(plan: LogicalPlan): LogicalPlan = plan.transformUp {
81
case Project(projectList, child) if projectList.isEmpty =>
82
child // Remove empty projections
83
}
84
}
85
86
// Create custom optimizer
87
object MyOptimizer extends RuleExecutor[LogicalPlan] {
88
def batches: Seq[Batch] = Seq(
89
Batch("Custom Optimizations", Once, MyCustomRule),
90
Batch("Constant Folding", FixedPoint(100), ConstantFolding)
91
)
92
}
93
94
val customOptimized = MyOptimizer.execute(plan)
95
```
96
97
### Expression Optimization Rules
98
99
Rules for optimizing expressions within query plans.
100
101
```scala { .api }
102
/**
103
* Fold constant expressions into literals
104
*/
105
object ConstantFolding extends Rule[LogicalPlan] {
106
def apply(plan: LogicalPlan): LogicalPlan = {
107
plan.transformAllExpressions {
108
case expr if expr.foldable => Literal.create(expr.eval(), expr.dataType)
109
}
110
}
111
}
112
113
/**
114
* Simplify boolean expressions
115
*/
116
object BooleanSimplification extends Rule[LogicalPlan] {
117
def apply(plan: LogicalPlan): LogicalPlan = {
118
plan.transformAllExpressions {
119
case And(TrueLiteral, right) => right
120
case And(left, TrueLiteral) => left
121
case And(FalseLiteral, _) => FalseLiteral
122
case And(_, FalseLiteral) => FalseLiteral
123
case Or(TrueLiteral, _) => TrueLiteral
124
case Or(_, TrueLiteral) => TrueLiteral
125
case Or(FalseLiteral, right) => right
126
case Or(left, FalseLiteral) => left
127
case Not(TrueLiteral) => FalseLiteral
128
case Not(FalseLiteral) => TrueLiteral
129
case Not(Not(expr)) => expr
130
}
131
}
132
}
133
134
/**
135
* Simplify LIKE expressions to more efficient forms
136
*/
137
object LikeSimplification extends Rule[LogicalPlan] {
138
def apply(plan: LogicalPlan): LogicalPlan = {
139
plan.transformAllExpressions {
140
case Like(input, Literal(pattern: UTF8String, StringType)) =>
141
if (pattern.toString == "%") {
142
// LIKE '%' is always true for non-null strings
143
IsNotNull(input)
144
} else if (!pattern.toString.contains("%") && !pattern.toString.contains("_")) {
145
// No wildcards - convert to equality
146
EqualTo(input, Literal(pattern, StringType))
147
} else {
148
// Keep original LIKE
149
Like(input, Literal(pattern, StringType))
150
}
151
}
152
}
153
}
154
```
155
156
**Usage Examples:**
157
158
```scala
159
import org.apache.spark.sql.catalyst.optimizer._
160
import org.apache.spark.sql.catalyst.expressions._
161
import org.apache.spark.sql.catalyst.plans.logical._
162
import org.apache.spark.sql.types._
163
164
// Constant folding example
165
val exprPlan = Project(
166
Seq(
167
Add(Literal(10), Literal(5)).as("sum"), // 10 + 5 -> 15
168
Multiply(Literal(3), Literal(4)).as("product"), // 3 * 4 -> 12
169
Subtract(Literal(20), Literal(8)).as("diff") // 20 - 8 -> 12
170
),
171
LocalRelation(AttributeReference("dummy", IntegerType, false)())
172
)
173
174
val foldedPlan = ConstantFolding.apply(exprPlan)
175
// All arithmetic expressions become literals
176
177
// Boolean simplification example
178
val boolPlan = Filter(
179
And(
180
And(Literal(true), GreaterThan(col("age"), Literal(18))), // TRUE AND (age > 18) -> (age > 18)
181
Or(Literal(false), EqualTo(col("active"), Literal(true))) // FALSE OR (active = TRUE) -> (active = TRUE)
182
),
183
someRelation
184
)
185
186
val simplifiedPlan = BooleanSimplification.apply(boolPlan)
187
// Results in: Filter(And(GreaterThan(col("age"), Literal(18)), EqualTo(col("active"), Literal(true))), someRelation)
188
189
// LIKE simplification example
190
val likePlan = Filter(
191
And(
192
Like(col("name"), Literal("%")), // Always true for non-null -> IsNotNull(name)
193
Like(col("code"), Literal("ABC")) // No wildcards -> EqualTo(code, "ABC")
194
),
195
someRelation
196
)
197
198
val likeSimplified = LikeSimplification.apply(likePlan)
199
```
200
201
### Structural Optimization Rules
202
203
Rules for optimizing the structure of query plans.
204
205
```scala { .api }
206
/**
207
* Remove unused columns from query plans
208
*/
209
object ColumnPruning extends Rule[LogicalPlan] {
210
def apply(plan: LogicalPlan): LogicalPlan = {
211
plan.transformUp {
212
case Project(projectList, child) =>
213
val usedColumns = projectList.flatMap(_.references).toSet
214
pruneChild(child, usedColumns) match {
215
case prunedChild if prunedChild.output != child.output =>
216
Project(projectList, prunedChild)
217
case _ => Project(projectList, child)
218
}
219
}
220
}
221
222
private def pruneChild(plan: LogicalPlan, requiredColumns: Set[Attribute]): LogicalPlan = {
223
// Implementation to remove unused columns
224
}
225
}
226
227
/**
228
* Push filter predicates down to data sources
229
*/
230
object FilterPushdown extends Rule[LogicalPlan] {
231
def apply(plan: LogicalPlan): LogicalPlan = {
232
plan.transform {
233
case Filter(condition, Join(left, right, joinType, joinCondition)) =>
234
// Split condition into parts that can be pushed to left/right sides
235
val (leftFilters, rightFilters, remainingFilters) = splitConjunctivePredicates(condition)
236
237
val newLeft = if (leftFilters.nonEmpty) Filter(leftFilters.reduce(And), left) else left
238
val newRight = if (rightFilters.nonEmpty) Filter(rightFilters.reduce(And), right) else right
239
val newJoin = Join(newLeft, newRight, joinType, joinCondition)
240
241
if (remainingFilters.nonEmpty) {
242
Filter(remainingFilters.reduce(And), newJoin)
243
} else {
244
newJoin
245
}
246
}
247
}
248
}
249
250
/**
251
* Collapse adjacent projections
252
*/
253
object ProjectCollapsing extends Rule[LogicalPlan] {
254
def apply(plan: LogicalPlan): LogicalPlan = {
255
plan.transformUp {
256
case Project(projectList1, Project(projectList2, child)) =>
257
// Substitute expressions from inner projection into outer projection
258
val substituted = projectList1.map(_.transform {
259
case a: AttributeReference =>
260
projectList2.find(_.exprId == a.exprId).map(_.child).getOrElse(a)
261
})
262
Project(substituted, child)
263
}
264
}
265
}
266
267
/**
268
* Combine adjacent limit operations
269
*/
270
object CombineLimits extends Rule[LogicalPlan] {
271
def apply(plan: LogicalPlan): LogicalPlan = {
272
plan.transformUp {
273
case Limit(expr1, Limit(expr2, child)) =>
274
// Take minimum of the two limits
275
Limit(
276
If(LessThan(expr1, expr2), expr1, expr2),
277
child
278
)
279
}
280
}
281
}
282
283
/**
284
* Convert small relations to local relations
285
*/
286
object ConvertToLocalRelation extends Rule[LogicalPlan] {
287
def apply(plan: LogicalPlan): LogicalPlan = {
288
plan.transformUp {
289
case relation if isSmallRelation(relation) =>
290
// Convert to LocalRelation with data materialized in memory
291
materializeAsLocalRelation(relation)
292
}
293
}
294
295
private def isSmallRelation(plan: LogicalPlan): Boolean = {
296
// Check if relation is small enough to materialize locally
297
}
298
299
private def materializeAsLocalRelation(plan: LogicalPlan): LocalRelation = {
300
// Convert to LocalRelation
301
}
302
}
303
```
304
305
**Usage Examples:**
306
307
```scala
308
import org.apache.spark.sql.catalyst.optimizer._
309
import org.apache.spark.sql.catalyst.plans.logical._
310
import org.apache.spark.sql.catalyst.expressions._
311
312
// Column pruning example
313
val relation = LocalRelation(
314
AttributeReference("id", IntegerType, false)(),
315
AttributeReference("name", StringType, true)(),
316
AttributeReference("age", IntegerType, true)(),
317
AttributeReference("unused", StringType, true)()
318
)
319
320
val projectPlan = Project(
321
Seq(relation.output(0), relation.output(1)), // Only id and name used
322
relation
323
)
324
325
val prunedPlan = ColumnPruning.apply(projectPlan)
326
// Unused columns (age, unused) are removed from the scan
327
328
// Filter pushdown example
329
val leftRelation = LocalRelation(
330
AttributeReference("user_id", IntegerType, false)(),
331
AttributeReference("name", StringType, true)()
332
)
333
334
val rightRelation = LocalRelation(
335
AttributeReference("user_id", IntegerType, false)(),
336
AttributeReference("order_total", DoubleType, true)()
337
)
338
339
val joinPlan = Join(leftRelation, rightRelation, Inner,
340
Some(EqualTo(leftRelation.output(0), rightRelation.output(0))))
341
342
val filterAfterJoin = Filter(
343
And(
344
GreaterThan(leftRelation.output(0), Literal(100)), // Can push to left
345
GreaterThan(rightRelation.output(1), Literal(50.0)) // Can push to right
346
),
347
joinPlan
348
)
349
350
val pushedDown = FilterPushdown.apply(filterAfterJoin)
351
// Filters are pushed down before the join
352
353
// Project collapsing example
354
val innerProject = Project(
355
Seq(
356
relation.output(0).as("user_id"),
357
relation.output(1).as("user_name")
358
),
359
relation
360
)
361
362
val outerProject = Project(
363
Seq(innerProject.output(1)), // Only user_name
364
innerProject
365
)
366
367
val collapsed = ProjectCollapsing.apply(outerProject)
368
// Results in single projection: Project(Seq(relation.output(1)), relation)
369
370
// Limit combining example
371
val innerLimit = Limit(Literal(100), relation)
372
val outerLimit = Limit(Literal(50), innerLimit)
373
374
val combinedLimit = CombineLimits.apply(outerLimit)
375
// Results in: Limit(Literal(50), relation) - takes minimum
376
```
377
378
### Advanced Optimization Strategies
379
380
Complex optimization patterns and aggregate optimizations.
381
382
```scala { .api }
383
/**
384
* Optimize aggregate operations
385
*/
386
object AggregateOptimize extends Rule[LogicalPlan] {
387
def apply(plan: LogicalPlan): LogicalPlan = {
388
plan.transformUp {
389
case Aggregate(groupExpr, aggExpr, child) =>
390
// Optimize aggregations - remove redundant grouping, etc.
391
optimizeAggregate(groupExpr, aggExpr, child)
392
}
393
}
394
395
private def optimizeAggregate(
396
groupExpr: Seq[Expression],
397
aggExpr: Seq[NamedExpression],
398
child: LogicalPlan): LogicalPlan = {
399
// Implementation for aggregate optimizations
400
}
401
}
402
403
/**
404
* Optimize set operations (UNION, INTERSECT, EXCEPT)
405
*/
406
object SetOperationPushDown extends Rule[LogicalPlan] {
407
def apply(plan: LogicalPlan): LogicalPlan = {
408
plan.transformUp {
409
case Filter(condition, Union(children)) =>
410
// Push filter into all union children
411
Union(children.map(Filter(condition, _)))
412
413
case Project(projectList, Union(children)) =>
414
// Push projection into all union children
415
Union(children.map(Project(projectList, _)))
416
}
417
}
418
}
419
420
/**
421
* Eliminate common sub-expressions
422
*/
423
object EliminateSubexpressions extends Rule[LogicalPlan] {
424
def apply(plan: LogicalPlan): LogicalPlan = {
425
plan.transformUp {
426
case p =>
427
val commonExprs = findCommonSubexpressions(p.expressions)
428
if (commonExprs.nonEmpty) {
429
// Replace common subexpressions with references
430
eliminateCommonSubexpressions(p, commonExprs)
431
} else {
432
p
433
}
434
}
435
}
436
}
437
```
438
439
**Usage Examples:**
440
441
```scala
442
import org.apache.spark.sql.catalyst.optimizer._
443
import org.apache.spark.sql.catalyst.plans.logical._
444
import org.apache.spark.sql.catalyst.expressions._
445
import org.apache.spark.sql.catalyst.expressions.aggregate._
446
447
// Aggregate optimization example
448
val relation = LocalRelation(
449
AttributeReference("category", StringType, false)(),
450
AttributeReference("amount", DoubleType, false)(),
451
AttributeReference("quantity", IntegerType, false)()
452
)
453
454
val aggregate = Aggregate(
455
Seq(relation.output(0)), // GROUP BY category
456
Seq(
457
Sum(relation.output(1)).as("total_amount"),
458
Count(Literal(1)).as("count"),
459
Average(relation.output(1)).as("avg_amount")
460
),
461
relation
462
)
463
464
val optimizedAgg = AggregateOptimize.apply(aggregate)
465
466
// Set operation pushdown example
467
val relation1 = LocalRelation(
468
AttributeReference("id", IntegerType, false)(),
469
AttributeReference("value", StringType, true)()
470
)
471
472
val relation2 = LocalRelation(relation1.output) // Same schema
473
474
val unionPlan = Union(Seq(relation1, relation2))
475
val filteredUnion = Filter(
476
GreaterThan(relation1.output(0), Literal(10)),
477
unionPlan
478
)
479
480
val pushedFilter = SetOperationPushDown.apply(filteredUnion)
481
// Results in: Union(Seq(Filter(..., relation1), Filter(..., relation2)))
482
483
// Common subexpression elimination example
484
val complexExpr1 = Add(relation.output(1), relation.output(2)) // amount + quantity
485
val complexExpr2 = Multiply(complexExpr1, Literal(0.1)) // (amount + quantity) * 0.1
486
487
val planWithDuplicates = Project(
488
Seq(
489
complexExpr1.as("sum"),
490
complexExpr2.as("discounted"),
491
Add(complexExpr1, Literal(5)).as("sum_plus_five") // Reuses complexExpr1
492
),
493
relation
494
)
495
496
val optimizedPlan = EliminateSubexpressions.apply(planWithDuplicates)
497
// Common subexpression (amount + quantity) is computed once and reused
498
```
499
500
### Optimization Pipeline and Configuration
501
502
Complete optimization pipeline with configurable batches.
503
504
```scala { .api }
505
/**
506
* Complete optimization pipeline
507
*/
508
object DefaultOptimizer extends RuleExecutor[LogicalPlan] {
509
def batches: Seq[Batch] = Seq(
510
// Finish Analysis
511
Batch("Finish Analysis", Once,
512
EliminateSubqueryAliases,
513
ReplaceExpressions,
514
ComputeCurrentTime,
515
GetCurrentDatabase(sessionCatalog)),
516
517
// Substitution
518
Batch("Substitution", fixedPoint,
519
CTESubstitution,
520
WindowsSubstitution,
521
EliminateUnions,
522
new SubstituteUnresolvedOrdinals(conf)),
523
524
// Constant Folding and Strength Reduction
525
Batch("Constant Folding", fixedPoint,
526
NullPropagation,
527
ConstantFolding,
528
BooleanSimplification,
529
SimplifyConditionals,
530
RemoveDispensableExpressions,
531
SimplifyBinaryComparison,
532
LikeSimplification),
533
534
// Operator Optimizations
535
Batch("Operator Optimizations", fixedPoint,
536
SetOperationPushDown,
537
SamplePushDown,
538
PushDownPredicate,
539
PushDownLeftSemiAntiJoin,
540
LimitPushDown,
541
ColumnPruning,
542
InferFiltersFromConstraints,
543
CollapseRepartition,
544
CollapseProject,
545
CombineFilters,
546
CombineLimits,
547
CombineUnions,
548
NullPropagation,
549
ConstantFolding,
550
BooleanSimplification,
551
RemoveRedundantProject,
552
SimplifyCreateStructOps,
553
SimplifyCreateArrayOps,
554
SimplifyCreateMapOps),
555
556
// Join Reorder
557
Batch("Join Reorder", Once,
558
CostBasedJoinReorder),
559
560
// Local Relation Optimization
561
Batch("LocalRelation", fixedPoint,
562
ConvertToLocalRelation)
563
)
564
565
val fixedPoint = FixedPoint(100)
566
}
567
```
568
569
**Usage Examples:**
570
571
```scala
572
import org.apache.spark.sql.catalyst.optimizer._
573
import org.apache.spark.sql.catalyst.plans.logical._
574
575
// Use complete optimization pipeline
576
val complexPlan = Project(
577
Seq(
578
Add(Literal(1), Literal(2)).as("const_sum"), // Constant folding
579
col("name").as("name") // Column pruning opportunity
580
),
581
Filter(
582
And(Literal(true), GreaterThan(col("age"), Literal(18))), // Boolean simplification
583
Project(
584
Seq(col("id"), col("name"), col("age"), col("unused")), // Column pruning
585
someBaseRelation
586
)
587
)
588
)
589
590
val fullyOptimized = DefaultOptimizer.execute(complexPlan)
591
// Applies all optimization rules in proper order:
592
// 1. Boolean simplification: TRUE AND (age > 18) -> (age > 18)
593
// 2. Constant folding: 1 + 2 -> 3
594
// 3. Column pruning: removes "unused" column and "id" (not referenced above)
595
// 4. Project collapsing: merges adjacent projections
596
// 5. Other applicable optimizations
597
598
// Custom optimization pipeline
599
object MinimalOptimizer extends RuleExecutor[LogicalPlan] {
600
def batches: Seq[Batch] = Seq(
601
Batch("Basic", FixedPoint(50),
602
ConstantFolding,
603
BooleanSimplification,
604
ColumnPruning
605
)
606
)
607
}
608
609
val minimalOptimized = MinimalOptimizer.execute(complexPlan)
610
// Applies only basic optimizations
611
```