0
# Query Plans
1
2
Catalyst's query planning system provides logical and physical representations of SQL queries through a tree-based structure. The planning framework enables sophisticated query optimization including predicate pushdown, join reordering, and cost-based optimization.
3
4
## Core Imports
5
6
```scala
7
import org.apache.spark.sql.catalyst.plans.logical._
8
import org.apache.spark.sql.catalyst.plans.physical._
9
import org.apache.spark.sql.catalyst.plans._
10
import org.apache.spark.sql.catalyst.expressions._
11
```
12
13
## Plan Hierarchy
14
15
### Base Plan Classes
16
17
```scala { .api }
18
abstract class LogicalPlan extends QueryPlan[LogicalPlan] {
19
def output: Seq[Attribute]
20
def children: Seq[LogicalPlan]
21
def resolved: Boolean
22
def childrenResolved: Boolean
23
def outputSet: AttributeSet
24
def references: AttributeSet
25
def inputSet: AttributeSet
26
def producedAttributes: AttributeSet
27
def missingInput: AttributeSet
28
def schema: StructType
29
def allAttributes: AttributeSet
30
def isStreaming: Boolean
31
def refresh(): Unit
32
}
33
34
abstract class QueryPlan[PlanType <: QueryPlan[PlanType]] extends TreeNode[PlanType] {
35
def output: Seq[Attribute]
36
def outputSet: AttributeSet
37
def schema: StructType
38
def printSchema(): Unit
39
def simpleString: String
40
}
41
```
42
43
### Plan Node Types by Arity
44
45
```scala { .api }
46
abstract class LeafNode extends LogicalPlan {
47
override final def children: Seq[LogicalPlan] = Nil
48
}
49
50
abstract class UnaryNode extends LogicalPlan {
51
def child: LogicalPlan
52
override final def children: Seq[LogicalPlan] = child :: Nil
53
}
54
55
abstract class BinaryNode extends LogicalPlan {
56
def left: LogicalPlan
57
def right: LogicalPlan
58
override final def children: Seq[LogicalPlan] = Seq(left, right)
59
}
60
```
61
62
## Leaf Nodes
63
64
### Data Sources
65
66
```scala { .api }
67
case class LocalRelation(output: Seq[Attribute], data: Seq[InternalRow] = Nil, isStreaming: Boolean = false) extends LeafNode
68
69
case class OneRowRelation() extends LeafNode {
70
override def output: Seq[Attribute] = Nil
71
}
72
73
case class Range(
74
start: Long,
75
end: Long,
76
step: Long,
77
numSlices: Option[Int],
78
output: Seq[Attribute],
79
isStreaming: Boolean = false
80
) extends LeafNode
81
82
case class UnresolvedRelation(
83
multipartIdentifier: Seq[String],
84
options: CaseInsensitiveStringMap = CaseInsensitiveStringMap.empty(),
85
isStreaming: Boolean = false
86
) extends LeafNode
87
```
88
89
**Usage Example:**
90
```scala
91
// Create a local relation with data
92
val attributes = Seq(
93
AttributeReference("id", IntegerType, false)(),
94
AttributeReference("name", StringType, false)()
95
)
96
val data = Seq(
97
InternalRow(1, UTF8String.fromString("Alice")),
98
InternalRow(2, UTF8String.fromString("Bob"))
99
)
100
val localRelation = LocalRelation(attributes, data)
101
102
// Create a range relation
103
val rangeRelation = Range(1, 100, 1, Some(4),
104
Seq(AttributeReference("id", LongType, false)()))
105
106
// Reference a table by name
107
val tableRef = UnresolvedRelation(Seq("my_database", "my_table"))
108
```
109
110
## Unary Nodes
111
112
### Projection and Filtering
113
114
```scala { .api }
115
case class Project(projectList: Seq[NamedExpression], child: LogicalPlan) extends UnaryNode {
116
override def output: Seq[Attribute] = projectList.map(_.toAttribute)
117
}
118
119
case class Filter(condition: Expression, child: LogicalPlan) extends UnaryNode {
120
override def output: Seq[Attribute] = child.output
121
}
122
123
case class SubqueryAlias(identifier: String, child: LogicalPlan) extends UnaryNode {
124
override def output: Seq[Attribute] = child.output.map(_.withQualifier(Seq(identifier)))
125
}
126
127
case class Limit(limitExpr: Expression, child: LogicalPlan) extends UnaryNode {
128
override def output: Seq[Attribute] = child.output
129
}
130
```
131
132
**Usage Example:**
133
```scala
134
val baseRelation = UnresolvedRelation(Seq("users"))
135
val nameAttr = AttributeReference("name", StringType, false)()
136
val ageAttr = AttributeReference("age", IntegerType, true)()
137
138
// Project specific columns
139
val projection = Project(Seq(nameAttr, ageAttr), baseRelation)
140
141
// Filter rows
142
val filterCondition = GreaterThan(ageAttr, Literal(18))
143
val filteredPlan = Filter(filterCondition, projection)
144
145
// Add alias
146
val aliasedPlan = SubqueryAlias("u", filteredPlan)
147
148
// Limit results
149
val limitedPlan = Limit(Literal(100), aliasedPlan)
150
```
151
152
### Sorting and Grouping
153
154
```scala { .api }
155
case class Sort(order: Seq[SortOrder], global: Boolean, child: LogicalPlan) extends UnaryNode
156
157
case class SortOrder(child: Expression, direction: SortDirection, nullOrdering: NullOrdering, sameOrderExpressions: Set[Expression] = Set.empty)
158
159
case class Aggregate(
160
groupingExpressions: Seq[Expression],
161
aggregateExpressions: Seq[NamedExpression],
162
child: LogicalPlan
163
) extends UnaryNode
164
165
case class Expand(
166
projections: Seq[Seq[Expression]],
167
output: Seq[Attribute],
168
child: LogicalPlan
169
) extends UnaryNode
170
```
171
172
**Usage Example:**
173
```scala
174
val baseRelation = UnresolvedRelation(Seq("sales"))
175
val customerAttr = AttributeReference("customer_id", IntegerType, false)()
176
val amountAttr = AttributeReference("amount", DecimalType(10, 2), false)()
177
val dateAttr = AttributeReference("sale_date", DateType, false)()
178
179
// Sort by date descending
180
val sortOrder = SortOrder(dateAttr, Descending, NullsLast)
181
val sortedPlan = Sort(Seq(sortOrder), global = true, baseRelation)
182
183
// Group by customer and aggregate amount
184
val groupingExprs = Seq(customerAttr)
185
val aggregateExprs = Seq(
186
customerAttr,
187
Alias(Sum(amountAttr), "total_amount")()
188
)
189
val aggregatePlan = Aggregate(groupingExprs, aggregateExprs, baseRelation)
190
```
191
192
### Window Operations
193
194
```scala { .api }
195
case class Window(
196
windowExpressions: Seq[NamedExpression],
197
partitionSpec: Seq[Expression],
198
orderSpec: Seq[SortOrder],
199
child: LogicalPlan
200
) extends UnaryNode
201
202
case class WindowExpression(
203
windowFunction: Expression,
204
windowSpec: WindowSpecDefinition
205
) extends Expression
206
207
case class WindowSpecDefinition(
208
partitionSpec: Seq[Expression],
209
orderSpec: Seq[SortOrder],
210
frameSpecification: WindowFrame
211
)
212
```
213
214
**Usage Example:**
215
```scala
216
val baseRelation = UnresolvedRelation(Seq("employees"))
217
val deptAttr = AttributeReference("department", StringType, false)()
218
val salaryAttr = AttributeReference("salary", DecimalType(10, 2), false)()
219
220
// Window function: rank employees by salary within each department
221
val windowSpec = WindowSpecDefinition(
222
partitionSpec = Seq(deptAttr),
223
orderSpec = Seq(SortOrder(salaryAttr, Descending, NullsLast)),
224
frameSpecification = UnspecifiedFrame
225
)
226
val rankExpr = WindowExpression(Rank(Seq(salaryAttr)), windowSpec)
227
val windowExprs = Seq(Alias(rankExpr, "salary_rank")())
228
229
val windowPlan = Window(windowExprs, Seq(deptAttr),
230
Seq(SortOrder(salaryAttr, Descending, NullsLast)), baseRelation)
231
```
232
233
## Binary Nodes
234
235
### Join Operations
236
237
```scala { .api }
238
case class Join(
239
left: LogicalPlan,
240
right: LogicalPlan,
241
joinType: JoinType,
242
condition: Option[Expression],
243
hint: JoinHint = JoinHint.NONE
244
) extends BinaryNode
245
246
sealed abstract class JoinType {
247
def sql: String
248
}
249
case object Inner extends JoinType
250
case object LeftOuter extends JoinType
251
case object RightOuter extends JoinType
252
case object FullOuter extends JoinType
253
case object LeftSemi extends JoinType
254
case object LeftAnti extends JoinType
255
case object Cross extends JoinType
256
```
257
258
**Usage Example:**
259
```scala
260
val usersTable = UnresolvedRelation(Seq("users"))
261
val ordersTable = UnresolvedRelation(Seq("orders"))
262
263
val userIdAttr = AttributeReference("user_id", IntegerType, false)()
264
val orderUserIdAttr = AttributeReference("user_id", IntegerType, false)()
265
266
// Inner join users and orders
267
val joinCondition = EqualTo(userIdAttr, orderUserIdAttr)
268
val innerJoin = Join(usersTable, ordersTable, Inner, Some(joinCondition))
269
270
// Left outer join
271
val leftJoin = Join(usersTable, ordersTable, LeftOuter, Some(joinCondition))
272
273
// Cross join (Cartesian product)
274
val crossJoin = Join(usersTable, ordersTable, Cross, None)
275
```
276
277
### Set Operations
278
279
```scala { .api }
280
case class Union(children: Seq[LogicalPlan], byName: Boolean = false, allowMissingCol: Boolean = false) extends LogicalPlan
281
282
case class Intersect(left: LogicalPlan, right: LogicalPlan, isAll: Boolean) extends BinaryNode
283
284
case class Except(left: LogicalPlan, right: LogicalPlan, isAll: Boolean) extends BinaryNode
285
```
286
287
**Usage Example:**
288
```scala
289
val currentUsers = UnresolvedRelation(Seq("current_users"))
290
val formerUsers = UnresolvedRelation(Seq("former_users"))
291
val activeUsers = UnresolvedRelation(Seq("active_users"))
292
293
// Union all users
294
val allUsers = Union(Seq(currentUsers, formerUsers))
295
296
// Find users in both current and active
297
val commonUsers = Intersect(currentUsers, activeUsers, isAll = false)
298
299
// Find current users who are not active
300
val inactiveUsers = Except(currentUsers, activeUsers, isAll = false)
301
```
302
303
## Advanced Logical Plans
304
305
### Subqueries
306
307
```scala { .api }
308
case class SubqueryExpression(
309
plan: LogicalPlan,
310
children: Seq[Expression] = Seq.empty,
311
exprId: ExprId = NamedExpression.newExprId,
312
joinCond: Seq[Expression] = Seq.empty,
313
hint: Option[HintInfo] = None
314
) extends PlanExpression[LogicalPlan]
315
316
case class Exists(plan: LogicalPlan, children: Seq[Expression] = Seq.empty, exprId: ExprId = NamedExpression.newExprId) extends SubqueryExpression(plan, children, exprId)
317
318
case class ScalarSubquery(plan: LogicalPlan, children: Seq[Expression] = Seq.empty, exprId: ExprId = NamedExpression.newExprId) extends SubqueryExpression(plan, children, exprId)
319
320
case class ListQuery(plan: LogicalPlan, children: Seq[Expression] = Seq.empty, exprId: ExprId = NamedExpression.newExprId) extends SubqueryExpression(plan, children, exprId)
321
```
322
323
**Usage Example:**
324
```scala
325
val usersTable = UnresolvedRelation(Seq("users"))
326
val ordersTable = UnresolvedRelation(Seq("orders"))
327
val userIdAttr = AttributeReference("user_id", IntegerType, false)()
328
329
// EXISTS subquery
330
val existsSubquery = Filter(
331
EqualTo(AttributeReference("user_id", IntegerType, false)(), userIdAttr),
332
ordersTable
333
)
334
val usersWithOrders = Filter(Exists(existsSubquery), usersTable)
335
336
// Scalar subquery
337
val countSubquery = Aggregate(
338
Seq.empty,
339
Seq(Alias(Count(Literal(1)), "order_count")()),
340
Filter(EqualTo(AttributeReference("user_id", IntegerType, false)(), userIdAttr), ordersTable)
341
)
342
val usersWithOrderCount = Project(
343
Seq(userIdAttr, Alias(ScalarSubquery(countSubquery), "order_count")()),
344
usersTable
345
)
346
```
347
348
### Table Modification
349
350
```scala { .api }
351
case class InsertIntoStatement(
352
table: LogicalPlan,
353
partition: Map[String, Option[String]],
354
userSpecifiedCols: Seq[String],
355
query: LogicalPlan,
356
overwrite: Boolean,
357
ifPartitionNotExists: Boolean = false
358
) extends Command
359
360
case class DeleteFromTable(table: LogicalPlan, condition: Option[Expression]) extends Command
361
362
case class UpdateTable(table: LogicalPlan, assignments: Seq[Assignment], condition: Option[Expression]) extends Command
363
364
case class MergeIntoTable(
365
targetTable: LogicalPlan,
366
sourceTable: LogicalPlan,
367
mergeCondition: Expression,
368
matchedActions: Seq[MergeAction],
369
notMatchedActions: Seq[MergeAction]
370
) extends Command
371
```
372
373
### Data Definition
374
375
```scala { .api }
376
case class CreateTable(
377
tableDesc: CatalogTable,
378
mode: SaveMode,
379
query: Option[LogicalPlan]
380
) extends Command
381
382
case class DropTable(
383
identifier: Seq[String],
384
ifExists: Boolean,
385
isView: Boolean
386
) extends Command
387
388
case class AlterTable(
389
table: LogicalPlan,
390
changes: Seq[TableChange]
391
) extends Command
392
```
393
394
## Physical Planning Concepts
395
396
### Physical Plan Base
397
398
```scala { .api }
399
abstract class SparkPlan extends QueryPlan[SparkPlan] {
400
def execute(): RDD[InternalRow]
401
def executeCollect(): Array[InternalRow]
402
def executeBroadcast[T](): broadcast.Broadcast[T]
403
def executeColumnar(): RDD[ColumnarBatch]
404
def requiredChildDistribution: Seq[Distribution]
405
def requiredChildOrdering: Seq[Seq[SortOrder]]
406
def outputPartitioning: Partitioning
407
def outputOrdering: Seq[SortOrder]
408
def metrics: Map[String, SQLMetric]
409
}
410
```
411
412
### Distribution and Partitioning
413
414
```scala { .api }
415
sealed abstract class Distribution
416
417
case object UnspecifiedDistribution extends Distribution
418
case object AllTuples extends Distribution
419
case object BroadcastDistribution extends Distribution
420
case class HashClusteredDistribution(expressions: Seq[Expression]) extends Distribution
421
case class RangeDistribution(ordering: Seq[SortOrder]) extends Distribution
422
423
sealed abstract class Partitioning {
424
def numPartitions: Int
425
def satisfies(required: Distribution): Boolean
426
}
427
428
case class HashPartitioning(expressions: Seq[Expression], numPartitions: Int) extends Partitioning
429
case class RangePartitioning(ordering: Seq[SortOrder], numPartitions: Int) extends Partitioning
430
case class RoundRobinPartitioning(numPartitions: Int) extends Partitioning
431
case object SinglePartition extends Partitioning
432
case object UnknownPartitioning extends Partitioning
433
```
434
435
## Plan Transformations
436
437
### Rule-Based Optimization
438
439
```scala { .api }
440
abstract class Rule[TreeType <: TreeNode[TreeType]] {
441
val ruleName: String
442
def apply(plan: TreeType): TreeType
443
}
444
445
case class Batch(name: String, strategy: Strategy, rules: Rule[LogicalPlan]*)
446
447
sealed abstract class Strategy
448
case object Once extends Strategy
449
case class FixedPoint(maxIterations: Int) extends Strategy
450
```
451
452
**Usage Example:**
453
```scala
454
// Custom optimization rule
455
object PushDownFilters extends Rule[LogicalPlan] {
456
val ruleName = "PushDownFilters"
457
458
def apply(plan: LogicalPlan): LogicalPlan = plan transform {
459
case Filter(condition, Join(left, right, joinType, joinCondition, hint)) =>
460
// Logic to push filter conditions down past joins
461
pushFilterThroughJoin(condition, Join(left, right, joinType, joinCondition, hint))
462
case other => other
463
}
464
465
private def pushFilterThroughJoin(filter: Expression, join: Join): LogicalPlan = {
466
// Implementation details for filter pushdown
467
join
468
}
469
}
470
471
// Apply optimization rules in batches
472
val optimizer = Seq(
473
Batch("Filter Pushdown", FixedPoint(10), PushDownFilters),
474
Batch("Constant Folding", Once, ConstantFolding)
475
)
476
```
477
478
## Common Query Plan Patterns
479
480
### Building Query Plans Programmatically
481
```scala
482
// Build a complex query plan
483
val usersTable = UnresolvedRelation(Seq("users"))
484
val ordersTable = UnresolvedRelation(Seq("orders"))
485
486
val userIdAttr = AttributeReference("user_id", IntegerType, false)()
487
val nameAttr = AttributeReference("name", StringType, false)()
488
val orderAmountAttr = AttributeReference("amount", DecimalType(10, 2), false)()
489
490
// SELECT u.name, SUM(o.amount) as total
491
// FROM users u JOIN orders o ON u.user_id = o.user_id
492
// WHERE u.active = true
493
// GROUP BY u.user_id, u.name
494
// HAVING SUM(o.amount) > 1000
495
// ORDER BY total DESC
496
// LIMIT 10
497
498
val joinCondition = EqualTo(userIdAttr, orderAmountAttr)
499
val joinedPlan = Join(usersTable, ordersTable, Inner, Some(joinCondition))
500
501
val filterCondition = EqualTo(AttributeReference("active", BooleanType, false)(), Literal(true))
502
val filteredPlan = Filter(filterCondition, joinedPlan)
503
504
val groupingExprs = Seq(userIdAttr, nameAttr)
505
val aggregateExprs = Seq(
506
nameAttr,
507
Alias(Sum(orderAmountAttr), "total")()
508
)
509
val aggregatedPlan = Aggregate(groupingExprs, aggregateExprs, filteredPlan)
510
511
val havingCondition = GreaterThan(Sum(orderAmountAttr), Literal(1000))
512
val havingPlan = Filter(havingCondition, aggregatedPlan)
513
514
val sortOrder = SortOrder(AttributeReference("total", DecimalType(10, 2), false)(), Descending, NullsLast)
515
val sortedPlan = Sort(Seq(sortOrder), global = true, havingPlan)
516
517
val finalPlan = Limit(Literal(10), sortedPlan)
518
```
519
520
The query planning system in Catalyst provides a comprehensive framework for representing and optimizing SQL queries through a rich tree-based structure that enables sophisticated transformations and optimizations.