Catalyst query optimization framework and expression evaluation engine for Apache Spark SQL
—
This section covers the query plan system in Spark Catalyst, including logical and physical plan representations with tree-based transformations and optimization support. Query plans form the core of Catalyst's query processing pipeline.
import org.apache.spark.sql.catalyst.plans._
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.plans.physical._
import org.apache.spark.sql.catalyst.plans.joinTypes._
import org.apache.spark.sql.catalyst.expressions._Base class for all query plans (logical and physical), extending the tree framework.
abstract class QueryPlan[PlanType <: QueryPlan[PlanType]] extends TreeNode[PlanType] {
def output: Seq[Attribute]
def outputSet: AttributeSet
def references: AttributeSet
def inputSet: AttributeSet
def schema: StructType
def outputOrdering: Seq[SortOrder]
def maxRows: Option[Long]
def transformExpressions(rule: PartialFunction[Expression, Expression]): PlanType
def transformExpressionsDown(rule: PartialFunction[Expression, Expression]): PlanType
def transformExpressionsUp(rule: PartialFunction[Expression, Expression]): PlanType
}Base class for all logical query plans representing the query structure before physical optimization.
abstract class LogicalPlan extends QueryPlan[LogicalPlan] {
def resolved: Boolean
def childrenResolved: Boolean
def statistics: Statistics
}abstract class LeafNode extends LogicalPlan {
override final def children: Seq[LogicalPlan] = Nil
}
abstract class UnaryNode extends LogicalPlan {
def child: LogicalPlan
override final def children: Seq[LogicalPlan] = child :: Nil
}
abstract class BinaryNode extends LogicalPlan {
def left: LogicalPlan
def right: LogicalPlan
override final def children: Seq[LogicalPlan] = left :: right :: Nil
}import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.expressions._
// Create a simple query plan
val relation = UnresolvedRelation(TableIdentifier("users"))
val filter = Filter(EqualTo(UnresolvedAttribute("age"), Literal(25)), relation)
val project = Project(Seq(UnresolvedAttribute("name")), filter)
// Check if plan is resolved
val isResolved = project.resolved
// Get output schema
val schema = project.schemasealed abstract class JoinType {
def sql: String
}
case object Inner extends JoinType
case object LeftOuter extends JoinType
case object RightOuter extends JoinType
case object FullOuter extends JoinType
case object LeftSemi extends JoinType
case object LeftAnti extends JoinType
case object Cross extends JoinTypecase class Join(
left: LogicalPlan,
right: LogicalPlan,
joinType: JoinType,
condition: Option[Expression]
) extends BinaryNode {
override def output: Seq[Attribute] = {
joinType match {
case LeftSemi | LeftAnti => left.output
case _ => left.output ++ right.output
}
}
}import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.plans.joinTypes._
// Create join operation
val users = UnresolvedRelation(TableIdentifier("users"))
val orders = UnresolvedRelation(TableIdentifier("orders"))
val joinCondition = EqualTo(
UnresolvedAttribute("users.id"),
UnresolvedAttribute("orders.user_id")
)
val join = Join(users, orders, Inner, Some(joinCondition))case class Project(
projectList: Seq[NamedExpression],
child: LogicalPlan
) extends UnaryNode {
override def output: Seq[Attribute] = projectList.map(_.toAttribute)
}case class Filter(
condition: Expression,
child: LogicalPlan
) extends UnaryNode {
override def output: Seq[Attribute] = child.output
}case class Aggregate(
groupingExpressions: Seq[Expression],
aggregateExpressions: Seq[NamedExpression],
child: LogicalPlan
) extends UnaryNode {
override def output: Seq[Attribute] = aggregateExpressions.map(_.toAttribute)
}case class Sort(
order: Seq[SortOrder],
global: Boolean,
child: LogicalPlan
) extends UnaryNode {
override def output: Seq[Attribute] = child.output
}
case class SortOrder(
child: Expression,
direction: SortDirection,
nullOrdering: NullOrdering,
sameOrderExpressions: Set[Expression] = Set.empty
) extends Expression with Unevaluable {
override def dataType: DataType = child.dataType
override def nullable: Boolean = child.nullable
}
abstract class SortDirection {
def sql: String
}
case object Ascending extends SortDirection
case object Descending extends SortDirection
abstract class NullOrdering {
def sql: String
}
case object NullsFirst extends NullOrdering
case object NullsLast extends NullOrderingcase class Limit(
limitExpr: Expression,
child: LogicalPlan
) extends UnaryNode {
override def output: Seq[Attribute] = child.output
}case class Union(children: Seq[LogicalPlan]) extends LogicalPlan {
override def output: Seq[Attribute] = children.head.output
}case class Distinct(child: LogicalPlan) extends UnaryNode {
override def output: Seq[Attribute] = child.output
}import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.expressions._
// Build a complex query plan: SELECT name FROM users WHERE age > 18 ORDER BY name LIMIT 10
val relation = UnresolvedRelation(TableIdentifier("users"))
val filter = Filter(
GreaterThan(UnresolvedAttribute("age"), Literal(18)),
relation
)
val project = Project(
Seq(UnresolvedAttribute("name")),
filter
)
val sort = Sort(
Seq(SortOrder(UnresolvedAttribute("name"), Ascending, NullsLast)),
global = true,
project
)
val limit = Limit(Literal(10), sort)
// The final plan represents: SELECT name FROM users WHERE age > 18 ORDER BY name LIMIT 10case class UnresolvedRelation(
tableIdentifier: TableIdentifier,
alias: Option[String] = None
) extends LeafNode {
override def output: Seq[Attribute] = Nil
}
case class TableIdentifier(
table: String,
database: Option[String] = None
) {
def identifier: String = database.map(_ + ".").getOrElse("") + table
def quotedString: String = database.map(quoteIdentifier).map(_ + ".").getOrElse("") + quoteIdentifier(table)
def unquotedString: String = identifier
}case class LocalRelation(
output: Seq[Attribute],
data: Seq[InternalRow] = Nil,
isStreaming: Boolean = false
) extends LeafNode {
// Constructor for single row
def this(output: Seq[Attribute], data: InternalRow) = this(output, data :: Nil)
}import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.types._
import org.apache.spark.sql.catalyst.expressions._
// Create a local relation with data
val schema = Seq(
AttributeReference("id", IntegerType, nullable = false)(),
AttributeReference("name", StringType, nullable = true)()
)
val data = Seq(
InternalRow(1, UTF8String.fromString("Alice")),
InternalRow(2, UTF8String.fromString("Bob"))
)
val localRelation = LocalRelation(schema, data)case class Statistics(
sizeInBytes: BigInt,
rowCount: Option[BigInt] = None,
attributeStats: AttributeMap[ColumnStat] = AttributeMap.empty,
hints: HintInfo = HintInfo()
) {
def simpleString: String = {
s"sizeInBytes=${Utils.bytesToString(sizeInBytes)}, " +
s"rowCount=${rowCount.map(_.toString).getOrElse("unknown")}"
}
}
case class ColumnStat(
distinctCount: Option[BigInt] = None,
min: Option[Any] = None,
max: Option[Any] = None,
nullCount: Option[BigInt] = None,
avgLen: Option[Long] = None,
maxLen: Option[Long] = None,
histogram: Option[Histogram] = None
)
case class HintInfo(
broadcast: Boolean = false,
cartesianProduct: Boolean = false
)import org.apache.spark.sql.catalyst.plans.logical._
// Create statistics for a relation
val stats = Statistics(
sizeInBytes = 1024 * 1024, // 1MB
rowCount = Some(1000),
hints = HintInfo(broadcast = true)
)
// Attach statistics to a plan
val relation = LocalRelation(schema)
val relationWithStats = relation.copy().withNewChildren(relation.children).transform {
case plan => plan.withStats(stats)
}While most physical plan details are in the execution engine, Catalyst defines the base physical plan structure:
abstract class SparkPlan extends QueryPlan[SparkPlan] {
def execute(): RDD[InternalRow]
def executeCollect(): Array[InternalRow]
def executeTake(n: Int): Array[InternalRow]
def executeToIterator(): Iterator[InternalRow]
def metrics: Map[String, SQLMetric]
def longMetric(name: String): SQLMetric
}Query plans inherit tree transformation capabilities:
// Transform all expressions in a plan
val transformedPlan = plan.transformExpressions {
case expr if expr.dataType == StringType =>
Upper(expr)
}
// Transform the plan structure
val optimizedPlan = plan.transform {
case Filter(condition, child) if condition == Literal(true) =>
child // Remove always-true filters
}
// Collect information from the plan
val allFilters = plan.collect {
case filter: Filter => filter.condition
}import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.expressions._
// Original plan with nested filters
val relation = UnresolvedRelation(TableIdentifier("users"))
val filter1 = Filter(GreaterThan(UnresolvedAttribute("age"), Literal(18)), relation)
val filter2 = Filter(EqualTo(UnresolvedAttribute("active"), Literal(true)), filter1)
// Combine filters into a single AND condition
val combinedPlan = filter2.transform {
case Filter(condition1, Filter(condition2, child)) =>
Filter(And(condition1, condition2), child)
}
// Result: Filter(And(active = true, age > 18), UnresolvedRelation(users))// Check if a plan is fully resolved
def isFullyResolved(plan: LogicalPlan): Boolean = {
plan.resolved && plan.children.forall(isFullyResolved)
}
// Find unresolved references
def findUnresolvedReferences(plan: LogicalPlan): Seq[UnresolvedAttribute] = {
plan.collect {
case expr: UnresolvedAttribute => expr
}
}This comprehensive query plan system enables Catalyst to represent, transform, and optimize SQL queries through a flexible tree-based structure that supports both logical planning and physical execution strategies.
Install with Tessl CLI
npx tessl i tessl/maven-org-apache-spark--spark-catalyst