Logical and physical query plan representations with transformation and optimization capabilities for building and manipulating query execution trees.
Base class for all query plans providing common functionality.
/**
* Base class for all query plans, extends TreeNode
*/
abstract class QueryPlan[PlanType <: TreeNode[PlanType]] extends TreeNode[PlanType] {
self: PlanType =>
/** Attributes output by this node */
def output: Seq[Attribute]
/** Set of output attributes */
def outputSet: AttributeSet
/** Attributes referenced in expressions */
def references: AttributeSet
/** Attributes input from children */
def inputSet: AttributeSet
/** Referenced but missing attributes */
def missingInput: AttributeSet
/**
* Transform expressions in this plan using the given rule
*/
def transformExpressions(rule: PartialFunction[Expression, Expression]): this.type
/**
* Transform expressions with specific traversal order
*/
def transformExpressionsDown(rule: PartialFunction[Expression, Expression]): this.type
def transformExpressionsUp(rule: PartialFunction[Expression, Expression]): this.type
}Base class for logical query plans representing query semantics.
/**
* Base class for logical query plans
*/
abstract class LogicalPlan extends QueryPlan[LogicalPlan] with Logging {
/** Whether plan has been analyzed */
def analyzed: Boolean
/** Mark plan as analyzed */
private[catalyst] def setAnalyzed(): Unit
/**
* Apply transformation rules to operators, skipping analyzed sub-trees
*/
def resolveOperators(rule: PartialFunction[LogicalPlan, LogicalPlan]): LogicalPlan
/**
* Transform expressions, skipping analyzed sub-trees
*/
def resolveExpressions(r: PartialFunction[Expression, Expression]): LogicalPlan
/** Compute plan statistics */
def statistics: Statistics
}Usage Examples:
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.types._
// Create a simple logical plan
val relation = LocalRelation(
AttributeReference("id", IntegerType, false)(),
AttributeReference("name", StringType, true)()
)
// Access plan properties
val output = relation.output // Seq of attributes
val outputSet = relation.outputSet // AttributeSet
val analyzed = relation.analyzed // Boolean
// Create filter plan
val filterExpr = GreaterThan(
AttributeReference("id", IntegerType, false)(),
Literal(10, IntegerType)
)
val filtered = Filter(filterExpr, relation)
// Transform expressions in plan
val transformed = filtered.transformExpressions {
case Literal(value: Int, dataType) => Literal(value * 2, dataType)
}Join type definitions and join plan operations.
/** Base trait for join types */
sealed abstract class JoinType {
def sql: String
}
/** Inner join */
case object Inner extends JoinType {
override def sql: String = "INNER"
}
/** Left outer join */
case object LeftOuter extends JoinType {
override def sql: String = "LEFT OUTER"
}
/** Right outer join */
case object RightOuter extends JoinType {
override def sql: String = "RIGHT OUTER"
}
/** Full outer join */
case object FullOuter extends JoinType {
override def sql: String = "FULL OUTER"
}
/** Left semi join */
case object LeftSemi extends JoinType {
override def sql: String = "LEFT SEMI"
}
/** Left anti join */
case object LeftAnti extends JoinType {
override def sql: String = "LEFT ANTI"
}
/**
* Join logical plan node
*/
case class Join(
left: LogicalPlan,
right: LogicalPlan,
joinType: JoinType,
condition: Option[Expression]) extends LogicalPlan {
override def children: Seq[LogicalPlan] = Seq(left, right)
override def output: Seq[Attribute] = {
joinType match {
case Inner | LeftOuter | RightOuter | FullOuter =>
left.output ++ right.output
case LeftSemi | LeftAnti =>
left.output
}
}
}Usage Examples:
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.plans._
import org.apache.spark.sql.catalyst.expressions._
// Create relations
val users = LocalRelation(
AttributeReference("user_id", IntegerType, false)(),
AttributeReference("name", StringType, true)()
)
val orders = LocalRelation(
AttributeReference("order_id", IntegerType, false)(),
AttributeReference("user_id", IntegerType, false)(),
AttributeReference("amount", DoubleType, true)()
)
// Create join condition
val joinCondition = EqualTo(
users.output.find(_.name == "user_id").get,
orders.output.find(_.name == "user_id").get
)
// Create different join types
val innerJoin = Join(users, orders, Inner, Some(joinCondition))
val leftJoin = Join(users, orders, LeftOuter, Some(joinCondition))
val rightJoin = Join(users, orders, RightOuter, Some(joinCondition))
val fullJoin = Join(users, orders, FullOuter, Some(joinCondition))
val semiJoin = Join(users, orders, LeftSemi, Some(joinCondition))
// Access join properties
val joinOutput = innerJoin.output // Combined output from both sides
val joinChildren = innerJoin.children // Seq(users, orders)Fundamental logical plan operators for query construction.
/**
* Filter (WHERE clause) logical plan
*/
case class Filter(condition: Expression, child: LogicalPlan) extends UnaryNode {
override def output: Seq[Attribute] = child.output
}
/**
* Projection (SELECT clause) logical plan
*/
case class Project(projectList: Seq[NamedExpression], child: LogicalPlan) extends UnaryNode {
override def output: Seq[Attribute] = projectList.map(_.toAttribute)
}
/**
* Aggregation (GROUP BY clause) logical plan
*/
case class Aggregate(
groupingExpressions: Seq[Expression],
aggregateExpressions: Seq[NamedExpression],
child: LogicalPlan) extends UnaryNode {
override def output: Seq[Attribute] = aggregateExpressions.map(_.toAttribute)
}
/**
* Sort (ORDER BY clause) logical plan
*/
case class Sort(
order: Seq[SortOrder],
global: Boolean,
child: LogicalPlan) extends UnaryNode {
override def output: Seq[Attribute] = child.output
}
/**
* Limit logical plan
*/
case class Limit(limitExpr: Expression, child: LogicalPlan) extends UnaryNode {
override def output: Seq[Attribute] = child.output
}
/**
* Union logical plan
*/
case class Union(children: Seq[LogicalPlan]) extends LogicalPlan {
override def output: Seq[Attribute] = children.head.output
}
/**
* Local relation with in-memory data
*/
case class LocalRelation(output: Seq[Attribute], data: Seq[InternalRow] = Nil) extends LeafNode {
// Leaf node with no children
}Usage Examples:
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.types._
// Base relation
val table = LocalRelation(
AttributeReference("id", IntegerType, false)(),
AttributeReference("name", StringType, true)(),
AttributeReference("age", IntegerType, true)(),
AttributeReference("score", DoubleType, true)()
)
// Filter: WHERE age > 18
val filterExpr = GreaterThan(table.output(2), Literal(18, IntegerType))
val filtered = Filter(filterExpr, table)
// Project: SELECT id, name
val projectList = Seq(table.output(0), table.output(1))
val projected = Project(projectList, filtered)
// Aggregate: SELECT age, COUNT(*), AVG(score) GROUP BY age
val groupExpr = Seq(table.output(2)) // age
val aggExprs = Seq(
table.output(2).as("age"),
Count(Literal(1)).as("count"),
Average(table.output(3)).as("avg_score")
)
val aggregated = Aggregate(groupExpr, aggExprs, table)
// Sort: ORDER BY score DESC
val sortOrder = Seq(SortOrder(table.output(3), Descending))
val sorted = Sort(sortOrder, global = true, table)
// Limit: LIMIT 10
val limited = Limit(Literal(10, IntegerType), sorted)
// Union: Combine two relations
val table2 = LocalRelation(table.output) // Same schema
val unioned = Union(Seq(table, table2))
// Complex query: SELECT name FROM table WHERE age > 21 ORDER BY score LIMIT 5
val complexQuery = Limit(
Literal(5, IntegerType),
Sort(
Seq(SortOrder(table.output(3), Ascending)),
global = true,
Project(
Seq(table.output(1)), // name
Filter(
GreaterThan(table.output(2), Literal(21, IntegerType)), // age > 21
table
)
)
)
)Bridge between logical and physical planning.
/**
* Base class for physical execution plans
*/
abstract class SparkPlan extends QueryPlan[SparkPlan] {
/** Execute this plan and return RDD of results */
def execute(): RDD[InternalRow]
/** Prepare this plan for execution */
def prepare(): Unit
/** Reset statistics and metrics */
def resetMetrics(): Unit
}
/**
* Physical plan statistics
*/
case class Statistics(sizeInBytes: BigInt, rowCount: Option[BigInt] = None) {
/** Whether these statistics are considered big */
def isBroadcastable: Boolean = sizeInBytes <= autoBroadcastJoinThreshold
}Usage Examples:
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.plans.physical._
// Logical plan can be converted to physical plan
val logicalPlan = Project(
Seq(AttributeReference("id", IntegerType, false)()),
LocalRelation(AttributeReference("id", IntegerType, false)())
)
// Statistics computation
val stats = Statistics(sizeInBytes = 1000, rowCount = Some(100))
val broadcastable = stats.isBroadcastable // Check if suitable for broadcast
// Physical properties like partitioning and ordering are preserved
// during logical to physical plan conversion