0
# Query Planning
1
2
Logical and physical query plan representations with transformation and optimization capabilities for building and manipulating query execution trees.
3
4
## Capabilities
5
6
### QueryPlan Base Class
7
8
Base class for all query plans providing common functionality.
9
10
```scala { .api }
11
/**
12
* Base class for all query plans, extends TreeNode
13
*/
14
abstract class QueryPlan[PlanType <: TreeNode[PlanType]] extends TreeNode[PlanType] {
15
self: PlanType =>
16
17
/** Attributes output by this node */
18
def output: Seq[Attribute]
19
20
/** Set of output attributes */
21
def outputSet: AttributeSet
22
23
/** Attributes referenced in expressions */
24
def references: AttributeSet
25
26
/** Attributes input from children */
27
def inputSet: AttributeSet
28
29
/** Referenced but missing attributes */
30
def missingInput: AttributeSet
31
32
/**
33
* Transform expressions in this plan using the given rule
34
*/
35
def transformExpressions(rule: PartialFunction[Expression, Expression]): this.type
36
37
/**
38
* Transform expressions with specific traversal order
39
*/
40
def transformExpressionsDown(rule: PartialFunction[Expression, Expression]): this.type
41
def transformExpressionsUp(rule: PartialFunction[Expression, Expression]): this.type
42
}
43
```
44
45
### LogicalPlan Class
46
47
Base class for logical query plans representing query semantics.
48
49
```scala { .api }
50
/**
51
* Base class for logical query plans
52
*/
53
abstract class LogicalPlan extends QueryPlan[LogicalPlan] with Logging {
54
/** Whether plan has been analyzed */
55
def analyzed: Boolean
56
57
/** Mark plan as analyzed */
58
private[catalyst] def setAnalyzed(): Unit
59
60
/**
61
* Apply transformation rules to operators, skipping analyzed sub-trees
62
*/
63
def resolveOperators(rule: PartialFunction[LogicalPlan, LogicalPlan]): LogicalPlan
64
65
/**
66
* Transform expressions, skipping analyzed sub-trees
67
*/
68
def resolveExpressions(r: PartialFunction[Expression, Expression]): LogicalPlan
69
70
/** Compute plan statistics */
71
def statistics: Statistics
72
}
73
```
74
75
**Usage Examples:**
76
77
```scala
78
import org.apache.spark.sql.catalyst.plans.logical._
79
import org.apache.spark.sql.catalyst.expressions._
80
import org.apache.spark.sql.types._
81
82
// Create a simple logical plan
83
val relation = LocalRelation(
84
AttributeReference("id", IntegerType, false)(),
85
AttributeReference("name", StringType, true)()
86
)
87
88
// Access plan properties
89
val output = relation.output // Seq of attributes
90
val outputSet = relation.outputSet // AttributeSet
91
val analyzed = relation.analyzed // Boolean
92
93
// Create filter plan
94
val filterExpr = GreaterThan(
95
AttributeReference("id", IntegerType, false)(),
96
Literal(10, IntegerType)
97
)
98
val filtered = Filter(filterExpr, relation)
99
100
// Transform expressions in plan
101
val transformed = filtered.transformExpressions {
102
case Literal(value: Int, dataType) => Literal(value * 2, dataType)
103
}
104
```
105
106
### Join Types and Operations
107
108
Join type definitions and join plan operations.
109
110
```scala { .api }
111
/** Base trait for join types */
112
sealed abstract class JoinType {
113
def sql: String
114
}
115
116
/** Inner join */
117
case object Inner extends JoinType {
118
override def sql: String = "INNER"
119
}
120
121
/** Left outer join */
122
case object LeftOuter extends JoinType {
123
override def sql: String = "LEFT OUTER"
124
}
125
126
/** Right outer join */
127
case object RightOuter extends JoinType {
128
override def sql: String = "RIGHT OUTER"
129
}
130
131
/** Full outer join */
132
case object FullOuter extends JoinType {
133
override def sql: String = "FULL OUTER"
134
}
135
136
/** Left semi join */
137
case object LeftSemi extends JoinType {
138
override def sql: String = "LEFT SEMI"
139
}
140
141
/** Left anti join */
142
case object LeftAnti extends JoinType {
143
override def sql: String = "LEFT ANTI"
144
}
145
146
/**
147
* Join logical plan node
148
*/
149
case class Join(
150
left: LogicalPlan,
151
right: LogicalPlan,
152
joinType: JoinType,
153
condition: Option[Expression]) extends LogicalPlan {
154
155
override def children: Seq[LogicalPlan] = Seq(left, right)
156
157
override def output: Seq[Attribute] = {
158
joinType match {
159
case Inner | LeftOuter | RightOuter | FullOuter =>
160
left.output ++ right.output
161
case LeftSemi | LeftAnti =>
162
left.output
163
}
164
}
165
}
166
```
167
168
**Usage Examples:**
169
170
```scala
171
import org.apache.spark.sql.catalyst.plans.logical._
172
import org.apache.spark.sql.catalyst.plans._
173
import org.apache.spark.sql.catalyst.expressions._
174
175
// Create relations
176
val users = LocalRelation(
177
AttributeReference("user_id", IntegerType, false)(),
178
AttributeReference("name", StringType, true)()
179
)
180
181
val orders = LocalRelation(
182
AttributeReference("order_id", IntegerType, false)(),
183
AttributeReference("user_id", IntegerType, false)(),
184
AttributeReference("amount", DoubleType, true)()
185
)
186
187
// Create join condition
188
val joinCondition = EqualTo(
189
users.output.find(_.name == "user_id").get,
190
orders.output.find(_.name == "user_id").get
191
)
192
193
// Create different join types
194
val innerJoin = Join(users, orders, Inner, Some(joinCondition))
195
val leftJoin = Join(users, orders, LeftOuter, Some(joinCondition))
196
val rightJoin = Join(users, orders, RightOuter, Some(joinCondition))
197
val fullJoin = Join(users, orders, FullOuter, Some(joinCondition))
198
val semiJoin = Join(users, orders, LeftSemi, Some(joinCondition))
199
200
// Access join properties
201
val joinOutput = innerJoin.output // Combined output from both sides
202
val joinChildren = innerJoin.children // Seq(users, orders)
203
```
204
205
### Basic Logical Plan Operators
206
207
Fundamental logical plan operators for query construction.
208
209
```scala { .api }
210
/**
211
* Filter (WHERE clause) logical plan
212
*/
213
case class Filter(condition: Expression, child: LogicalPlan) extends UnaryNode {
214
override def output: Seq[Attribute] = child.output
215
}
216
217
/**
218
* Projection (SELECT clause) logical plan
219
*/
220
case class Project(projectList: Seq[NamedExpression], child: LogicalPlan) extends UnaryNode {
221
override def output: Seq[Attribute] = projectList.map(_.toAttribute)
222
}
223
224
/**
225
* Aggregation (GROUP BY clause) logical plan
226
*/
227
case class Aggregate(
228
groupingExpressions: Seq[Expression],
229
aggregateExpressions: Seq[NamedExpression],
230
child: LogicalPlan) extends UnaryNode {
231
232
override def output: Seq[Attribute] = aggregateExpressions.map(_.toAttribute)
233
}
234
235
/**
236
* Sort (ORDER BY clause) logical plan
237
*/
238
case class Sort(
239
order: Seq[SortOrder],
240
global: Boolean,
241
child: LogicalPlan) extends UnaryNode {
242
243
override def output: Seq[Attribute] = child.output
244
}
245
246
/**
247
* Limit logical plan
248
*/
249
case class Limit(limitExpr: Expression, child: LogicalPlan) extends UnaryNode {
250
override def output: Seq[Attribute] = child.output
251
}
252
253
/**
254
* Union logical plan
255
*/
256
case class Union(children: Seq[LogicalPlan]) extends LogicalPlan {
257
override def output: Seq[Attribute] = children.head.output
258
}
259
260
/**
261
* Local relation with in-memory data
262
*/
263
case class LocalRelation(output: Seq[Attribute], data: Seq[InternalRow] = Nil) extends LeafNode {
264
// Leaf node with no children
265
}
266
```
267
268
**Usage Examples:**
269
270
```scala
271
import org.apache.spark.sql.catalyst.plans.logical._
272
import org.apache.spark.sql.catalyst.expressions._
273
import org.apache.spark.sql.types._
274
275
// Base relation
276
val table = LocalRelation(
277
AttributeReference("id", IntegerType, false)(),
278
AttributeReference("name", StringType, true)(),
279
AttributeReference("age", IntegerType, true)(),
280
AttributeReference("score", DoubleType, true)()
281
)
282
283
// Filter: WHERE age > 18
284
val filterExpr = GreaterThan(table.output(2), Literal(18, IntegerType))
285
val filtered = Filter(filterExpr, table)
286
287
// Project: SELECT id, name
288
val projectList = Seq(table.output(0), table.output(1))
289
val projected = Project(projectList, filtered)
290
291
// Aggregate: SELECT age, COUNT(*), AVG(score) GROUP BY age
292
val groupExpr = Seq(table.output(2)) // age
293
val aggExprs = Seq(
294
table.output(2).as("age"),
295
Count(Literal(1)).as("count"),
296
Average(table.output(3)).as("avg_score")
297
)
298
val aggregated = Aggregate(groupExpr, aggExprs, table)
299
300
// Sort: ORDER BY score DESC
301
val sortOrder = Seq(SortOrder(table.output(3), Descending))
302
val sorted = Sort(sortOrder, global = true, table)
303
304
// Limit: LIMIT 10
305
val limited = Limit(Literal(10, IntegerType), sorted)
306
307
// Union: Combine two relations
308
val table2 = LocalRelation(table.output) // Same schema
309
val unioned = Union(Seq(table, table2))
310
311
// Complex query: SELECT name FROM table WHERE age > 21 ORDER BY score LIMIT 5
312
val complexQuery = Limit(
313
Literal(5, IntegerType),
314
Sort(
315
Seq(SortOrder(table.output(3), Ascending)),
316
global = true,
317
Project(
318
Seq(table.output(1)), // name
319
Filter(
320
GreaterThan(table.output(2), Literal(21, IntegerType)), // age > 21
321
table
322
)
323
)
324
)
325
)
326
```
327
328
### Physical Plan Integration
329
330
Bridge between logical and physical planning.
331
332
```scala { .api }
333
/**
334
* Base class for physical execution plans
335
*/
336
abstract class SparkPlan extends QueryPlan[SparkPlan] {
337
/** Execute this plan and return RDD of results */
338
def execute(): RDD[InternalRow]
339
340
/** Prepare this plan for execution */
341
def prepare(): Unit
342
343
/** Reset statistics and metrics */
344
def resetMetrics(): Unit
345
}
346
347
/**
348
* Physical plan statistics
349
*/
350
case class Statistics(sizeInBytes: BigInt, rowCount: Option[BigInt] = None) {
351
/** Whether these statistics are considered big */
352
def isBroadcastable: Boolean = sizeInBytes <= autoBroadcastJoinThreshold
353
}
354
```
355
356
**Usage Examples:**
357
358
```scala
359
import org.apache.spark.sql.catalyst.plans.logical._
360
import org.apache.spark.sql.catalyst.plans.physical._
361
362
// Logical plan can be converted to physical plan
363
val logicalPlan = Project(
364
Seq(AttributeReference("id", IntegerType, false)()),
365
LocalRelation(AttributeReference("id", IntegerType, false)())
366
)
367
368
// Statistics computation
369
val stats = Statistics(sizeInBytes = 1000, rowCount = Some(100))
370
val broadcastable = stats.isBroadcastable // Check if suitable for broadcast
371
372
// Physical properties like partitioning and ordering are preserved
373
// during logical to physical plan conversion
374
```