or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

analysis.mdexpressions.mdindex.mdoptimization.mdquery-planning.mdrow-operations.mdtree-operations.mdtypes.md

optimization.mddocs/

0

# Optimization

1

2

Rule-based optimization system with built-in optimizations for query plan improvement and performance enhancement.

3

4

## Capabilities

5

6

### Optimizer Framework

7

8

The main optimizer containing collections of optimization rules.

9

10

```scala { .api }

11

/**

12

* Collection of optimization rules for logical plans

13

*/

14

object Optimizer extends RuleExecutor[LogicalPlan] {

15

/** Sequence of optimization rule batches */

16

def batches: Seq[Batch]

17

18

/** Execute all optimization rules on a plan */

19

def execute(plan: LogicalPlan): LogicalPlan

20

}

21

22

/**

23

* Base class for transformation rules

24

*/

25

abstract class Rule[TreeType <: TreeNode[TreeType]] extends Logging {

26

/** Rule name for debugging and logging */

27

def ruleName: String = this.getClass.getSimpleName

28

29

/** Apply rule to a tree node */

30

def apply(plan: TreeType): TreeType

31

}

32

33

/**

34

* Executes batches of rules with different strategies

35

*/

36

abstract class RuleExecutor[TreeType <: TreeNode[TreeType]] extends Logging {

37

/** Execute all rule batches on input */

38

def execute(plan: TreeType): TreeType

39

40

/** Sequence of rule batches to execute */

41

def batches: Seq[Batch]

42

43

/** Maximum number of iterations per batch */

44

protected def maxIterations: Int = 100

45

}

46

47

/**

48

* Group of rules executed together with execution strategy

49

*/

50

case class Batch(name: String, strategy: Strategy, rules: Rule[TreeType]*) {

51

/** Rules in this batch */

52

def rulesIterator: Iterator[Rule[TreeType]] = rules.iterator

53

}

54

55

/** Execution strategies for rule batches */

56

sealed abstract class Strategy

57

case object Once extends Strategy

58

case class FixedPoint(maxIterations: Int) extends Strategy

59

```

60

61

**Usage Examples:**

62

63

```scala

64

import org.apache.spark.sql.catalyst.optimizer._

65

import org.apache.spark.sql.catalyst.plans.logical._

66

import org.apache.spark.sql.catalyst.expressions._

67

import org.apache.spark.sql.catalyst.rules._

68

69

// Apply optimizer to a logical plan

70

val plan = Project(

71

Seq(Add(Literal(1), Literal(2)).as("sum")), // 1 + 2 (can be folded)

72

LocalRelation(AttributeReference("dummy", IntegerType, false)())

73

)

74

75

val optimizedPlan = Optimizer.execute(plan)

76

// Result: Literal(3) replaces Add(Literal(1), Literal(2))

77

78

// Create custom optimization rule

79

object MyCustomRule extends Rule[LogicalPlan] {

80

def apply(plan: LogicalPlan): LogicalPlan = plan.transformUp {

81

case Project(projectList, child) if projectList.isEmpty =>

82

child // Remove empty projections

83

}

84

}

85

86

// Create custom optimizer

87

object MyOptimizer extends RuleExecutor[LogicalPlan] {

88

def batches: Seq[Batch] = Seq(

89

Batch("Custom Optimizations", Once, MyCustomRule),

90

Batch("Constant Folding", FixedPoint(100), ConstantFolding)

91

)

92

}

93

94

val customOptimized = MyOptimizer.execute(plan)

95

```

96

97

### Expression Optimization Rules

98

99

Rules for optimizing expressions within query plans.

100

101

```scala { .api }

102

/**

103

* Fold constant expressions into literals

104

*/

105

object ConstantFolding extends Rule[LogicalPlan] {

106

def apply(plan: LogicalPlan): LogicalPlan = {

107

plan.transformAllExpressions {

108

case expr if expr.foldable => Literal.create(expr.eval(), expr.dataType)

109

}

110

}

111

}

112

113

/**

114

* Simplify boolean expressions

115

*/

116

object BooleanSimplification extends Rule[LogicalPlan] {

117

def apply(plan: LogicalPlan): LogicalPlan = {

118

plan.transformAllExpressions {

119

case And(TrueLiteral, right) => right

120

case And(left, TrueLiteral) => left

121

case And(FalseLiteral, _) => FalseLiteral

122

case And(_, FalseLiteral) => FalseLiteral

123

case Or(TrueLiteral, _) => TrueLiteral

124

case Or(_, TrueLiteral) => TrueLiteral

125

case Or(FalseLiteral, right) => right

126

case Or(left, FalseLiteral) => left

127

case Not(TrueLiteral) => FalseLiteral

128

case Not(FalseLiteral) => TrueLiteral

129

case Not(Not(expr)) => expr

130

}

131

}

132

}

133

134

/**

135

* Simplify LIKE expressions to more efficient forms

136

*/

137

object LikeSimplification extends Rule[LogicalPlan] {

138

def apply(plan: LogicalPlan): LogicalPlan = {

139

plan.transformAllExpressions {

140

case Like(input, Literal(pattern: UTF8String, StringType)) =>

141

if (pattern.toString == "%") {

142

// LIKE '%' is always true for non-null strings

143

IsNotNull(input)

144

} else if (!pattern.toString.contains("%") && !pattern.toString.contains("_")) {

145

// No wildcards - convert to equality

146

EqualTo(input, Literal(pattern, StringType))

147

} else {

148

// Keep original LIKE

149

Like(input, Literal(pattern, StringType))

150

}

151

}

152

}

153

}

154

```

155

156

**Usage Examples:**

157

158

```scala

159

import org.apache.spark.sql.catalyst.optimizer._

160

import org.apache.spark.sql.catalyst.expressions._

161

import org.apache.spark.sql.catalyst.plans.logical._

162

import org.apache.spark.sql.types._

163

164

// Constant folding example

165

val exprPlan = Project(

166

Seq(

167

Add(Literal(10), Literal(5)).as("sum"), // 10 + 5 -> 15

168

Multiply(Literal(3), Literal(4)).as("product"), // 3 * 4 -> 12

169

Subtract(Literal(20), Literal(8)).as("diff") // 20 - 8 -> 12

170

),

171

LocalRelation(AttributeReference("dummy", IntegerType, false)())

172

)

173

174

val foldedPlan = ConstantFolding.apply(exprPlan)

175

// All arithmetic expressions become literals

176

177

// Boolean simplification example

178

val boolPlan = Filter(

179

And(

180

And(Literal(true), GreaterThan(col("age"), Literal(18))), // TRUE AND (age > 18) -> (age > 18)

181

Or(Literal(false), EqualTo(col("active"), Literal(true))) // FALSE OR (active = TRUE) -> (active = TRUE)

182

),

183

someRelation

184

)

185

186

val simplifiedPlan = BooleanSimplification.apply(boolPlan)

187

// Results in: Filter(And(GreaterThan(col("age"), Literal(18)), EqualTo(col("active"), Literal(true))), someRelation)

188

189

// LIKE simplification example

190

val likePlan = Filter(

191

And(

192

Like(col("name"), Literal("%")), // Always true for non-null -> IsNotNull(name)

193

Like(col("code"), Literal("ABC")) // No wildcards -> EqualTo(code, "ABC")

194

),

195

someRelation

196

)

197

198

val likeSimplified = LikeSimplification.apply(likePlan)

199

```

200

201

### Structural Optimization Rules

202

203

Rules for optimizing the structure of query plans.

204

205

```scala { .api }

206

/**

207

* Remove unused columns from query plans

208

*/

209

object ColumnPruning extends Rule[LogicalPlan] {

210

def apply(plan: LogicalPlan): LogicalPlan = {

211

plan.transformUp {

212

case Project(projectList, child) =>

213

val usedColumns = projectList.flatMap(_.references).toSet

214

pruneChild(child, usedColumns) match {

215

case prunedChild if prunedChild.output != child.output =>

216

Project(projectList, prunedChild)

217

case _ => Project(projectList, child)

218

}

219

}

220

}

221

222

private def pruneChild(plan: LogicalPlan, requiredColumns: Set[Attribute]): LogicalPlan = {

223

// Implementation to remove unused columns

224

}

225

}

226

227

/**

228

* Push filter predicates down to data sources

229

*/

230

object FilterPushdown extends Rule[LogicalPlan] {

231

def apply(plan: LogicalPlan): LogicalPlan = {

232

plan.transform {

233

case Filter(condition, Join(left, right, joinType, joinCondition)) =>

234

// Split condition into parts that can be pushed to left/right sides

235

val (leftFilters, rightFilters, remainingFilters) = splitConjunctivePredicates(condition)

236

237

val newLeft = if (leftFilters.nonEmpty) Filter(leftFilters.reduce(And), left) else left

238

val newRight = if (rightFilters.nonEmpty) Filter(rightFilters.reduce(And), right) else right

239

val newJoin = Join(newLeft, newRight, joinType, joinCondition)

240

241

if (remainingFilters.nonEmpty) {

242

Filter(remainingFilters.reduce(And), newJoin)

243

} else {

244

newJoin

245

}

246

}

247

}

248

}

249

250

/**

251

* Collapse adjacent projections

252

*/

253

object ProjectCollapsing extends Rule[LogicalPlan] {

254

def apply(plan: LogicalPlan): LogicalPlan = {

255

plan.transformUp {

256

case Project(projectList1, Project(projectList2, child)) =>

257

// Substitute expressions from inner projection into outer projection

258

val substituted = projectList1.map(_.transform {

259

case a: AttributeReference =>

260

projectList2.find(_.exprId == a.exprId).map(_.child).getOrElse(a)

261

})

262

Project(substituted, child)

263

}

264

}

265

}

266

267

/**

268

* Combine adjacent limit operations

269

*/

270

object CombineLimits extends Rule[LogicalPlan] {

271

def apply(plan: LogicalPlan): LogicalPlan = {

272

plan.transformUp {

273

case Limit(expr1, Limit(expr2, child)) =>

274

// Take minimum of the two limits

275

Limit(

276

If(LessThan(expr1, expr2), expr1, expr2),

277

child

278

)

279

}

280

}

281

}

282

283

/**

284

* Convert small relations to local relations

285

*/

286

object ConvertToLocalRelation extends Rule[LogicalPlan] {

287

def apply(plan: LogicalPlan): LogicalPlan = {

288

plan.transformUp {

289

case relation if isSmallRelation(relation) =>

290

// Convert to LocalRelation with data materialized in memory

291

materializeAsLocalRelation(relation)

292

}

293

}

294

295

private def isSmallRelation(plan: LogicalPlan): Boolean = {

296

// Check if relation is small enough to materialize locally

297

}

298

299

private def materializeAsLocalRelation(plan: LogicalPlan): LocalRelation = {

300

// Convert to LocalRelation

301

}

302

}

303

```

304

305

**Usage Examples:**

306

307

```scala

308

import org.apache.spark.sql.catalyst.optimizer._

309

import org.apache.spark.sql.catalyst.plans.logical._

310

import org.apache.spark.sql.catalyst.expressions._

311

312

// Column pruning example

313

val relation = LocalRelation(

314

AttributeReference("id", IntegerType, false)(),

315

AttributeReference("name", StringType, true)(),

316

AttributeReference("age", IntegerType, true)(),

317

AttributeReference("unused", StringType, true)()

318

)

319

320

val projectPlan = Project(

321

Seq(relation.output(0), relation.output(1)), // Only id and name used

322

relation

323

)

324

325

val prunedPlan = ColumnPruning.apply(projectPlan)

326

// Unused columns (age, unused) are removed from the scan

327

328

// Filter pushdown example

329

val leftRelation = LocalRelation(

330

AttributeReference("user_id", IntegerType, false)(),

331

AttributeReference("name", StringType, true)()

332

)

333

334

val rightRelation = LocalRelation(

335

AttributeReference("user_id", IntegerType, false)(),

336

AttributeReference("order_total", DoubleType, true)()

337

)

338

339

val joinPlan = Join(leftRelation, rightRelation, Inner,

340

Some(EqualTo(leftRelation.output(0), rightRelation.output(0))))

341

342

val filterAfterJoin = Filter(

343

And(

344

GreaterThan(leftRelation.output(0), Literal(100)), // Can push to left

345

GreaterThan(rightRelation.output(1), Literal(50.0)) // Can push to right

346

),

347

joinPlan

348

)

349

350

val pushedDown = FilterPushdown.apply(filterAfterJoin)

351

// Filters are pushed down before the join

352

353

// Project collapsing example

354

val innerProject = Project(

355

Seq(

356

relation.output(0).as("user_id"),

357

relation.output(1).as("user_name")

358

),

359

relation

360

)

361

362

val outerProject = Project(

363

Seq(innerProject.output(1)), // Only user_name

364

innerProject

365

)

366

367

val collapsed = ProjectCollapsing.apply(outerProject)

368

// Results in single projection: Project(Seq(relation.output(1)), relation)

369

370

// Limit combining example

371

val innerLimit = Limit(Literal(100), relation)

372

val outerLimit = Limit(Literal(50), innerLimit)

373

374

val combinedLimit = CombineLimits.apply(outerLimit)

375

// Results in: Limit(Literal(50), relation) - takes minimum

376

```

377

378

### Advanced Optimization Strategies

379

380

Complex optimization patterns and aggregate optimizations.

381

382

```scala { .api }

383

/**

384

* Optimize aggregate operations

385

*/

386

object AggregateOptimize extends Rule[LogicalPlan] {

387

def apply(plan: LogicalPlan): LogicalPlan = {

388

plan.transformUp {

389

case Aggregate(groupExpr, aggExpr, child) =>

390

// Optimize aggregations - remove redundant grouping, etc.

391

optimizeAggregate(groupExpr, aggExpr, child)

392

}

393

}

394

395

private def optimizeAggregate(

396

groupExpr: Seq[Expression],

397

aggExpr: Seq[NamedExpression],

398

child: LogicalPlan): LogicalPlan = {

399

// Implementation for aggregate optimizations

400

}

401

}

402

403

/**

404

* Optimize set operations (UNION, INTERSECT, EXCEPT)

405

*/

406

object SetOperationPushDown extends Rule[LogicalPlan] {

407

def apply(plan: LogicalPlan): LogicalPlan = {

408

plan.transformUp {

409

case Filter(condition, Union(children)) =>

410

// Push filter into all union children

411

Union(children.map(Filter(condition, _)))

412

413

case Project(projectList, Union(children)) =>

414

// Push projection into all union children

415

Union(children.map(Project(projectList, _)))

416

}

417

}

418

}

419

420

/**

421

* Eliminate common sub-expressions

422

*/

423

object EliminateSubexpressions extends Rule[LogicalPlan] {

424

def apply(plan: LogicalPlan): LogicalPlan = {

425

plan.transformUp {

426

case p =>

427

val commonExprs = findCommonSubexpressions(p.expressions)

428

if (commonExprs.nonEmpty) {

429

// Replace common subexpressions with references

430

eliminateCommonSubexpressions(p, commonExprs)

431

} else {

432

p

433

}

434

}

435

}

436

}

437

```

438

439

**Usage Examples:**

440

441

```scala

442

import org.apache.spark.sql.catalyst.optimizer._

443

import org.apache.spark.sql.catalyst.plans.logical._

444

import org.apache.spark.sql.catalyst.expressions._

445

import org.apache.spark.sql.catalyst.expressions.aggregate._

446

447

// Aggregate optimization example

448

val relation = LocalRelation(

449

AttributeReference("category", StringType, false)(),

450

AttributeReference("amount", DoubleType, false)(),

451

AttributeReference("quantity", IntegerType, false)()

452

)

453

454

val aggregate = Aggregate(

455

Seq(relation.output(0)), // GROUP BY category

456

Seq(

457

Sum(relation.output(1)).as("total_amount"),

458

Count(Literal(1)).as("count"),

459

Average(relation.output(1)).as("avg_amount")

460

),

461

relation

462

)

463

464

val optimizedAgg = AggregateOptimize.apply(aggregate)

465

466

// Set operation pushdown example

467

val relation1 = LocalRelation(

468

AttributeReference("id", IntegerType, false)(),

469

AttributeReference("value", StringType, true)()

470

)

471

472

val relation2 = LocalRelation(relation1.output) // Same schema

473

474

val unionPlan = Union(Seq(relation1, relation2))

475

val filteredUnion = Filter(

476

GreaterThan(relation1.output(0), Literal(10)),

477

unionPlan

478

)

479

480

val pushedFilter = SetOperationPushDown.apply(filteredUnion)

481

// Results in: Union(Seq(Filter(..., relation1), Filter(..., relation2)))

482

483

// Common subexpression elimination example

484

val complexExpr1 = Add(relation.output(1), relation.output(2)) // amount + quantity

485

val complexExpr2 = Multiply(complexExpr1, Literal(0.1)) // (amount + quantity) * 0.1

486

487

val planWithDuplicates = Project(

488

Seq(

489

complexExpr1.as("sum"),

490

complexExpr2.as("discounted"),

491

Add(complexExpr1, Literal(5)).as("sum_plus_five") // Reuses complexExpr1

492

),

493

relation

494

)

495

496

val optimizedPlan = EliminateSubexpressions.apply(planWithDuplicates)

497

// Common subexpression (amount + quantity) is computed once and reused

498

```

499

500

### Optimization Pipeline and Configuration

501

502

Complete optimization pipeline with configurable batches.

503

504

```scala { .api }

505

/**

506

* Complete optimization pipeline

507

*/

508

object DefaultOptimizer extends RuleExecutor[LogicalPlan] {

509

def batches: Seq[Batch] = Seq(

510

// Finish Analysis

511

Batch("Finish Analysis", Once,

512

EliminateSubqueryAliases,

513

ReplaceExpressions,

514

ComputeCurrentTime,

515

GetCurrentDatabase(sessionCatalog)),

516

517

// Substitution

518

Batch("Substitution", fixedPoint,

519

CTESubstitution,

520

WindowsSubstitution,

521

EliminateUnions,

522

new SubstituteUnresolvedOrdinals(conf)),

523

524

// Constant Folding and Strength Reduction

525

Batch("Constant Folding", fixedPoint,

526

NullPropagation,

527

ConstantFolding,

528

BooleanSimplification,

529

SimplifyConditionals,

530

RemoveDispensableExpressions,

531

SimplifyBinaryComparison,

532

LikeSimplification),

533

534

// Operator Optimizations

535

Batch("Operator Optimizations", fixedPoint,

536

SetOperationPushDown,

537

SamplePushDown,

538

PushDownPredicate,

539

PushDownLeftSemiAntiJoin,

540

LimitPushDown,

541

ColumnPruning,

542

InferFiltersFromConstraints,

543

CollapseRepartition,

544

CollapseProject,

545

CombineFilters,

546

CombineLimits,

547

CombineUnions,

548

NullPropagation,

549

ConstantFolding,

550

BooleanSimplification,

551

RemoveRedundantProject,

552

SimplifyCreateStructOps,

553

SimplifyCreateArrayOps,

554

SimplifyCreateMapOps),

555

556

// Join Reorder

557

Batch("Join Reorder", Once,

558

CostBasedJoinReorder),

559

560

// Local Relation Optimization

561

Batch("LocalRelation", fixedPoint,

562

ConvertToLocalRelation)

563

)

564

565

val fixedPoint = FixedPoint(100)

566

}

567

```

568

569

**Usage Examples:**

570

571

```scala

572

import org.apache.spark.sql.catalyst.optimizer._

573

import org.apache.spark.sql.catalyst.plans.logical._

574

575

// Use complete optimization pipeline

576

val complexPlan = Project(

577

Seq(

578

Add(Literal(1), Literal(2)).as("const_sum"), // Constant folding

579

col("name").as("name") // Column pruning opportunity

580

),

581

Filter(

582

And(Literal(true), GreaterThan(col("age"), Literal(18))), // Boolean simplification

583

Project(

584

Seq(col("id"), col("name"), col("age"), col("unused")), // Column pruning

585

someBaseRelation

586

)

587

)

588

)

589

590

val fullyOptimized = DefaultOptimizer.execute(complexPlan)

591

// Applies all optimization rules in proper order:

592

// 1. Boolean simplification: TRUE AND (age > 18) -> (age > 18)

593

// 2. Constant folding: 1 + 2 -> 3

594

// 3. Column pruning: removes "unused" column and "id" (not referenced above)

595

// 4. Project collapsing: merges adjacent projections

596

// 5. Other applicable optimizations

597

598

// Custom optimization pipeline

599

object MinimalOptimizer extends RuleExecutor[LogicalPlan] {

600

def batches: Seq[Batch] = Seq(

601

Batch("Basic", FixedPoint(50),

602

ConstantFolding,

603

BooleanSimplification,

604

ColumnPruning

605

)

606

)

607

}

608

609

val minimalOptimized = MinimalOptimizer.execute(complexPlan)

610

// Applies only basic optimizations

611

```