0
# Query Plans
1
2
This section covers the query plan system in Spark Catalyst, including logical and physical plan representations with tree-based transformations and optimization support. Query plans form the core of Catalyst's query processing pipeline.
3
4
## Core Imports
5
6
```scala
7
import org.apache.spark.sql.catalyst.plans._
8
import org.apache.spark.sql.catalyst.plans.logical._
9
import org.apache.spark.sql.catalyst.plans.physical._
10
import org.apache.spark.sql.catalyst.plans.joinTypes._
11
import org.apache.spark.sql.catalyst.expressions._
12
```
13
14
## Query Plan Hierarchy
15
16
### QueryPlan (abstract class)
17
18
Base class for all query plans (logical and physical), extending the tree framework.
19
20
```scala { .api }
21
abstract class QueryPlan[PlanType <: QueryPlan[PlanType]] extends TreeNode[PlanType] {
22
def output: Seq[Attribute]
23
def outputSet: AttributeSet
24
def references: AttributeSet
25
def inputSet: AttributeSet
26
def schema: StructType
27
def outputOrdering: Seq[SortOrder]
28
def maxRows: Option[Long]
29
def transformExpressions(rule: PartialFunction[Expression, Expression]): PlanType
30
def transformExpressionsDown(rule: PartialFunction[Expression, Expression]): PlanType
31
def transformExpressionsUp(rule: PartialFunction[Expression, Expression]): PlanType
32
}
33
```
34
35
### LogicalPlan (abstract class)
36
37
Base class for all logical query plans representing the query structure before physical optimization.
38
39
```scala { .api }
40
abstract class LogicalPlan extends QueryPlan[LogicalPlan] {
41
def resolved: Boolean
42
def childrenResolved: Boolean
43
def statistics: Statistics
44
}
45
```
46
47
#### Plan Node Types
48
49
```scala { .api }
50
abstract class LeafNode extends LogicalPlan {
51
override final def children: Seq[LogicalPlan] = Nil
52
}
53
54
abstract class UnaryNode extends LogicalPlan {
55
def child: LogicalPlan
56
override final def children: Seq[LogicalPlan] = child :: Nil
57
}
58
59
abstract class BinaryNode extends LogicalPlan {
60
def left: LogicalPlan
61
def right: LogicalPlan
62
override final def children: Seq[LogicalPlan] = left :: right :: Nil
63
}
64
```
65
66
#### Usage Example
67
68
```scala
69
import org.apache.spark.sql.catalyst.plans.logical._
70
import org.apache.spark.sql.catalyst.expressions._
71
72
// Create a simple query plan
73
val relation = UnresolvedRelation(TableIdentifier("users"))
74
val filter = Filter(EqualTo(UnresolvedAttribute("age"), Literal(25)), relation)
75
val project = Project(Seq(UnresolvedAttribute("name")), filter)
76
77
// Check if plan is resolved
78
val isResolved = project.resolved
79
80
// Get output schema
81
val schema = project.schema
82
```
83
84
## Join Operations
85
86
### Join Types
87
88
```scala { .api }
89
sealed abstract class JoinType {
90
def sql: String
91
}
92
93
case object Inner extends JoinType
94
case object LeftOuter extends JoinType
95
case object RightOuter extends JoinType
96
case object FullOuter extends JoinType
97
case object LeftSemi extends JoinType
98
case object LeftAnti extends JoinType
99
case object Cross extends JoinType
100
```
101
102
### Join Plan
103
104
```scala { .api }
105
case class Join(
106
left: LogicalPlan,
107
right: LogicalPlan,
108
joinType: JoinType,
109
condition: Option[Expression]
110
) extends BinaryNode {
111
override def output: Seq[Attribute] = {
112
joinType match {
113
case LeftSemi | LeftAnti => left.output
114
case _ => left.output ++ right.output
115
}
116
}
117
}
118
```
119
120
#### Usage Example
121
122
```scala
123
import org.apache.spark.sql.catalyst.plans.logical._
124
import org.apache.spark.sql.catalyst.plans.joinTypes._
125
126
// Create join operation
127
val users = UnresolvedRelation(TableIdentifier("users"))
128
val orders = UnresolvedRelation(TableIdentifier("orders"))
129
val joinCondition = EqualTo(
130
UnresolvedAttribute("users.id"),
131
UnresolvedAttribute("orders.user_id")
132
)
133
val join = Join(users, orders, Inner, Some(joinCondition))
134
```
135
136
## Basic Logical Operators
137
138
### Project (SELECT)
139
140
```scala { .api }
141
case class Project(
142
projectList: Seq[NamedExpression],
143
child: LogicalPlan
144
) extends UnaryNode {
145
override def output: Seq[Attribute] = projectList.map(_.toAttribute)
146
}
147
```
148
149
### Filter (WHERE)
150
151
```scala { .api }
152
case class Filter(
153
condition: Expression,
154
child: LogicalPlan
155
) extends UnaryNode {
156
override def output: Seq[Attribute] = child.output
157
}
158
```
159
160
### Aggregate (GROUP BY)
161
162
```scala { .api }
163
case class Aggregate(
164
groupingExpressions: Seq[Expression],
165
aggregateExpressions: Seq[NamedExpression],
166
child: LogicalPlan
167
) extends UnaryNode {
168
override def output: Seq[Attribute] = aggregateExpressions.map(_.toAttribute)
169
}
170
```
171
172
### Sort (ORDER BY)
173
174
```scala { .api }
175
case class Sort(
176
order: Seq[SortOrder],
177
global: Boolean,
178
child: LogicalPlan
179
) extends UnaryNode {
180
override def output: Seq[Attribute] = child.output
181
}
182
183
case class SortOrder(
184
child: Expression,
185
direction: SortDirection,
186
nullOrdering: NullOrdering,
187
sameOrderExpressions: Set[Expression] = Set.empty
188
) extends Expression with Unevaluable {
189
override def dataType: DataType = child.dataType
190
override def nullable: Boolean = child.nullable
191
}
192
193
abstract class SortDirection {
194
def sql: String
195
}
196
case object Ascending extends SortDirection
197
case object Descending extends SortDirection
198
199
abstract class NullOrdering {
200
def sql: String
201
}
202
case object NullsFirst extends NullOrdering
203
case object NullsLast extends NullOrdering
204
```
205
206
### Limit
207
208
```scala { .api }
209
case class Limit(
210
limitExpr: Expression,
211
child: LogicalPlan
212
) extends UnaryNode {
213
override def output: Seq[Attribute] = child.output
214
}
215
```
216
217
### Union
218
219
```scala { .api }
220
case class Union(children: Seq[LogicalPlan]) extends LogicalPlan {
221
override def output: Seq[Attribute] = children.head.output
222
}
223
```
224
225
### Distinct
226
227
```scala { .api }
228
case class Distinct(child: LogicalPlan) extends UnaryNode {
229
override def output: Seq[Attribute] = child.output
230
}
231
```
232
233
#### Usage Example
234
235
```scala
236
import org.apache.spark.sql.catalyst.plans.logical._
237
import org.apache.spark.sql.catalyst.expressions._
238
239
// Build a complex query plan: SELECT name FROM users WHERE age > 18 ORDER BY name LIMIT 10
240
val relation = UnresolvedRelation(TableIdentifier("users"))
241
242
val filter = Filter(
243
GreaterThan(UnresolvedAttribute("age"), Literal(18)),
244
relation
245
)
246
247
val project = Project(
248
Seq(UnresolvedAttribute("name")),
249
filter
250
)
251
252
val sort = Sort(
253
Seq(SortOrder(UnresolvedAttribute("name"), Ascending, NullsLast)),
254
global = true,
255
project
256
)
257
258
val limit = Limit(Literal(10), sort)
259
260
// The final plan represents: SELECT name FROM users WHERE age > 18 ORDER BY name LIMIT 10
261
```
262
263
## Data Sources
264
265
### UnresolvedRelation
266
267
```scala { .api }
268
case class UnresolvedRelation(
269
tableIdentifier: TableIdentifier,
270
alias: Option[String] = None
271
) extends LeafNode {
272
override def output: Seq[Attribute] = Nil
273
}
274
275
case class TableIdentifier(
276
table: String,
277
database: Option[String] = None
278
) {
279
def identifier: String = database.map(_ + ".").getOrElse("") + table
280
def quotedString: String = database.map(quoteIdentifier).map(_ + ".").getOrElse("") + quoteIdentifier(table)
281
def unquotedString: String = identifier
282
}
283
```
284
285
### LocalRelation
286
287
```scala { .api }
288
case class LocalRelation(
289
output: Seq[Attribute],
290
data: Seq[InternalRow] = Nil,
291
isStreaming: Boolean = false
292
) extends LeafNode {
293
// Constructor for single row
294
def this(output: Seq[Attribute], data: InternalRow) = this(output, data :: Nil)
295
}
296
```
297
298
#### Usage Example
299
300
```scala
301
import org.apache.spark.sql.catalyst.plans.logical._
302
import org.apache.spark.sql.types._
303
import org.apache.spark.sql.catalyst.expressions._
304
305
// Create a local relation with data
306
val schema = Seq(
307
AttributeReference("id", IntegerType, nullable = false)(),
308
AttributeReference("name", StringType, nullable = true)()
309
)
310
311
val data = Seq(
312
InternalRow(1, UTF8String.fromString("Alice")),
313
InternalRow(2, UTF8String.fromString("Bob"))
314
)
315
316
val localRelation = LocalRelation(schema, data)
317
```
318
319
## Statistics and Cost Information
320
321
### Statistics
322
323
```scala { .api }
324
case class Statistics(
325
sizeInBytes: BigInt,
326
rowCount: Option[BigInt] = None,
327
attributeStats: AttributeMap[ColumnStat] = AttributeMap.empty,
328
hints: HintInfo = HintInfo()
329
) {
330
def simpleString: String = {
331
s"sizeInBytes=${Utils.bytesToString(sizeInBytes)}, " +
332
s"rowCount=${rowCount.map(_.toString).getOrElse("unknown")}"
333
}
334
}
335
336
case class ColumnStat(
337
distinctCount: Option[BigInt] = None,
338
min: Option[Any] = None,
339
max: Option[Any] = None,
340
nullCount: Option[BigInt] = None,
341
avgLen: Option[Long] = None,
342
maxLen: Option[Long] = None,
343
histogram: Option[Histogram] = None
344
)
345
346
case class HintInfo(
347
broadcast: Boolean = false,
348
cartesianProduct: Boolean = false
349
)
350
```
351
352
#### Usage Example
353
354
```scala
355
import org.apache.spark.sql.catalyst.plans.logical._
356
357
// Create statistics for a relation
358
val stats = Statistics(
359
sizeInBytes = 1024 * 1024, // 1MB
360
rowCount = Some(1000),
361
hints = HintInfo(broadcast = true)
362
)
363
364
// Attach statistics to a plan
365
val relation = LocalRelation(schema)
366
val relationWithStats = relation.copy().withNewChildren(relation.children).transform {
367
case plan => plan.withStats(stats)
368
}
369
```
370
371
## Physical Plans
372
373
While most physical plan details are in the execution engine, Catalyst defines the base physical plan structure:
374
375
### Physical Plan Base
376
377
```scala { .api }
378
abstract class SparkPlan extends QueryPlan[SparkPlan] {
379
def execute(): RDD[InternalRow]
380
def executeCollect(): Array[InternalRow]
381
def executeTake(n: Int): Array[InternalRow]
382
def executeToIterator(): Iterator[InternalRow]
383
384
def metrics: Map[String, SQLMetric]
385
def longMetric(name: String): SQLMetric
386
}
387
```
388
389
## Tree Transformations
390
391
Query plans inherit tree transformation capabilities:
392
393
### Common Transformation Patterns
394
395
```scala
396
// Transform all expressions in a plan
397
val transformedPlan = plan.transformExpressions {
398
case expr if expr.dataType == StringType =>
399
Upper(expr)
400
}
401
402
// Transform the plan structure
403
val optimizedPlan = plan.transform {
404
case Filter(condition, child) if condition == Literal(true) =>
405
child // Remove always-true filters
406
}
407
408
// Collect information from the plan
409
val allFilters = plan.collect {
410
case filter: Filter => filter.condition
411
}
412
```
413
414
#### Usage Example
415
416
```scala
417
import org.apache.spark.sql.catalyst.plans.logical._
418
import org.apache.spark.sql.catalyst.expressions._
419
420
// Original plan with nested filters
421
val relation = UnresolvedRelation(TableIdentifier("users"))
422
val filter1 = Filter(GreaterThan(UnresolvedAttribute("age"), Literal(18)), relation)
423
val filter2 = Filter(EqualTo(UnresolvedAttribute("active"), Literal(true)), filter1)
424
425
// Combine filters into a single AND condition
426
val combinedPlan = filter2.transform {
427
case Filter(condition1, Filter(condition2, child)) =>
428
Filter(And(condition1, condition2), child)
429
}
430
431
// Result: Filter(And(active = true, age > 18), UnresolvedRelation(users))
432
```
433
434
## Plan Validation
435
436
### Resolution Status
437
438
```scala
439
// Check if a plan is fully resolved
440
def isFullyResolved(plan: LogicalPlan): Boolean = {
441
plan.resolved && plan.children.forall(isFullyResolved)
442
}
443
444
// Find unresolved references
445
def findUnresolvedReferences(plan: LogicalPlan): Seq[UnresolvedAttribute] = {
446
plan.collect {
447
case expr: UnresolvedAttribute => expr
448
}
449
}
450
```
451
452
This comprehensive query plan system enables Catalyst to represent, transform, and optimize SQL queries through a flexible tree-based structure that supports both logical planning and physical execution strategies.