Catalyst is Spark's library for manipulating relational query plans and expressions
—
Catalyst's query planning system provides logical and physical representations of SQL queries through a tree-based structure. The planning framework enables sophisticated query optimization including predicate pushdown, join reordering, and cost-based optimization.
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.plans.physical._
import org.apache.spark.sql.catalyst.plans._
import org.apache.spark.sql.catalyst.expressions._abstract class LogicalPlan extends QueryPlan[LogicalPlan] {
def output: Seq[Attribute]
def children: Seq[LogicalPlan]
def resolved: Boolean
def childrenResolved: Boolean
def outputSet: AttributeSet
def references: AttributeSet
def inputSet: AttributeSet
def producedAttributes: AttributeSet
def missingInput: AttributeSet
def schema: StructType
def allAttributes: AttributeSet
def isStreaming: Boolean
def refresh(): Unit
}
abstract class QueryPlan[PlanType <: QueryPlan[PlanType]] extends TreeNode[PlanType] {
def output: Seq[Attribute]
def outputSet: AttributeSet
def schema: StructType
def printSchema(): Unit
def simpleString: String
}abstract class LeafNode extends LogicalPlan {
override final def children: Seq[LogicalPlan] = Nil
}
abstract class UnaryNode extends LogicalPlan {
def child: LogicalPlan
override final def children: Seq[LogicalPlan] = child :: Nil
}
abstract class BinaryNode extends LogicalPlan {
def left: LogicalPlan
def right: LogicalPlan
override final def children: Seq[LogicalPlan] = Seq(left, right)
}case class LocalRelation(output: Seq[Attribute], data: Seq[InternalRow] = Nil, isStreaming: Boolean = false) extends LeafNode
case class OneRowRelation() extends LeafNode {
override def output: Seq[Attribute] = Nil
}
case class Range(
start: Long,
end: Long,
step: Long,
numSlices: Option[Int],
output: Seq[Attribute],
isStreaming: Boolean = false
) extends LeafNode
case class UnresolvedRelation(
multipartIdentifier: Seq[String],
options: CaseInsensitiveStringMap = CaseInsensitiveStringMap.empty(),
isStreaming: Boolean = false
) extends LeafNodeUsage Example:
// Create a local relation with data
val attributes = Seq(
AttributeReference("id", IntegerType, false)(),
AttributeReference("name", StringType, false)()
)
val data = Seq(
InternalRow(1, UTF8String.fromString("Alice")),
InternalRow(2, UTF8String.fromString("Bob"))
)
val localRelation = LocalRelation(attributes, data)
// Create a range relation
val rangeRelation = Range(1, 100, 1, Some(4),
Seq(AttributeReference("id", LongType, false)()))
// Reference a table by name
val tableRef = UnresolvedRelation(Seq("my_database", "my_table"))case class Project(projectList: Seq[NamedExpression], child: LogicalPlan) extends UnaryNode {
override def output: Seq[Attribute] = projectList.map(_.toAttribute)
}
case class Filter(condition: Expression, child: LogicalPlan) extends UnaryNode {
override def output: Seq[Attribute] = child.output
}
case class SubqueryAlias(identifier: String, child: LogicalPlan) extends UnaryNode {
override def output: Seq[Attribute] = child.output.map(_.withQualifier(Seq(identifier)))
}
case class Limit(limitExpr: Expression, child: LogicalPlan) extends UnaryNode {
override def output: Seq[Attribute] = child.output
}Usage Example:
val baseRelation = UnresolvedRelation(Seq("users"))
val nameAttr = AttributeReference("name", StringType, false)()
val ageAttr = AttributeReference("age", IntegerType, true)()
// Project specific columns
val projection = Project(Seq(nameAttr, ageAttr), baseRelation)
// Filter rows
val filterCondition = GreaterThan(ageAttr, Literal(18))
val filteredPlan = Filter(filterCondition, projection)
// Add alias
val aliasedPlan = SubqueryAlias("u", filteredPlan)
// Limit results
val limitedPlan = Limit(Literal(100), aliasedPlan)case class Sort(order: Seq[SortOrder], global: Boolean, child: LogicalPlan) extends UnaryNode
case class SortOrder(child: Expression, direction: SortDirection, nullOrdering: NullOrdering, sameOrderExpressions: Set[Expression] = Set.empty)
case class Aggregate(
groupingExpressions: Seq[Expression],
aggregateExpressions: Seq[NamedExpression],
child: LogicalPlan
) extends UnaryNode
case class Expand(
projections: Seq[Seq[Expression]],
output: Seq[Attribute],
child: LogicalPlan
) extends UnaryNodeUsage Example:
val baseRelation = UnresolvedRelation(Seq("sales"))
val customerAttr = AttributeReference("customer_id", IntegerType, false)()
val amountAttr = AttributeReference("amount", DecimalType(10, 2), false)()
val dateAttr = AttributeReference("sale_date", DateType, false)()
// Sort by date descending
val sortOrder = SortOrder(dateAttr, Descending, NullsLast)
val sortedPlan = Sort(Seq(sortOrder), global = true, baseRelation)
// Group by customer and aggregate amount
val groupingExprs = Seq(customerAttr)
val aggregateExprs = Seq(
customerAttr,
Alias(Sum(amountAttr), "total_amount")()
)
val aggregatePlan = Aggregate(groupingExprs, aggregateExprs, baseRelation)case class Window(
windowExpressions: Seq[NamedExpression],
partitionSpec: Seq[Expression],
orderSpec: Seq[SortOrder],
child: LogicalPlan
) extends UnaryNode
case class WindowExpression(
windowFunction: Expression,
windowSpec: WindowSpecDefinition
) extends Expression
case class WindowSpecDefinition(
partitionSpec: Seq[Expression],
orderSpec: Seq[SortOrder],
frameSpecification: WindowFrame
)Usage Example:
val baseRelation = UnresolvedRelation(Seq("employees"))
val deptAttr = AttributeReference("department", StringType, false)()
val salaryAttr = AttributeReference("salary", DecimalType(10, 2), false)()
// Window function: rank employees by salary within each department
val windowSpec = WindowSpecDefinition(
partitionSpec = Seq(deptAttr),
orderSpec = Seq(SortOrder(salaryAttr, Descending, NullsLast)),
frameSpecification = UnspecifiedFrame
)
val rankExpr = WindowExpression(Rank(Seq(salaryAttr)), windowSpec)
val windowExprs = Seq(Alias(rankExpr, "salary_rank")())
val windowPlan = Window(windowExprs, Seq(deptAttr),
Seq(SortOrder(salaryAttr, Descending, NullsLast)), baseRelation)case class Join(
left: LogicalPlan,
right: LogicalPlan,
joinType: JoinType,
condition: Option[Expression],
hint: JoinHint = JoinHint.NONE
) extends BinaryNode
sealed abstract class JoinType {
def sql: String
}
case object Inner extends JoinType
case object LeftOuter extends JoinType
case object RightOuter extends JoinType
case object FullOuter extends JoinType
case object LeftSemi extends JoinType
case object LeftAnti extends JoinType
case object Cross extends JoinTypeUsage Example:
val usersTable = UnresolvedRelation(Seq("users"))
val ordersTable = UnresolvedRelation(Seq("orders"))
val userIdAttr = AttributeReference("user_id", IntegerType, false)()
val orderUserIdAttr = AttributeReference("user_id", IntegerType, false)()
// Inner join users and orders
val joinCondition = EqualTo(userIdAttr, orderUserIdAttr)
val innerJoin = Join(usersTable, ordersTable, Inner, Some(joinCondition))
// Left outer join
val leftJoin = Join(usersTable, ordersTable, LeftOuter, Some(joinCondition))
// Cross join (Cartesian product)
val crossJoin = Join(usersTable, ordersTable, Cross, None)case class Union(children: Seq[LogicalPlan], byName: Boolean = false, allowMissingCol: Boolean = false) extends LogicalPlan
case class Intersect(left: LogicalPlan, right: LogicalPlan, isAll: Boolean) extends BinaryNode
case class Except(left: LogicalPlan, right: LogicalPlan, isAll: Boolean) extends BinaryNodeUsage Example:
val currentUsers = UnresolvedRelation(Seq("current_users"))
val formerUsers = UnresolvedRelation(Seq("former_users"))
val activeUsers = UnresolvedRelation(Seq("active_users"))
// Union all users
val allUsers = Union(Seq(currentUsers, formerUsers))
// Find users in both current and active
val commonUsers = Intersect(currentUsers, activeUsers, isAll = false)
// Find current users who are not active
val inactiveUsers = Except(currentUsers, activeUsers, isAll = false)case class SubqueryExpression(
plan: LogicalPlan,
children: Seq[Expression] = Seq.empty,
exprId: ExprId = NamedExpression.newExprId,
joinCond: Seq[Expression] = Seq.empty,
hint: Option[HintInfo] = None
) extends PlanExpression[LogicalPlan]
case class Exists(plan: LogicalPlan, children: Seq[Expression] = Seq.empty, exprId: ExprId = NamedExpression.newExprId) extends SubqueryExpression(plan, children, exprId)
case class ScalarSubquery(plan: LogicalPlan, children: Seq[Expression] = Seq.empty, exprId: ExprId = NamedExpression.newExprId) extends SubqueryExpression(plan, children, exprId)
case class ListQuery(plan: LogicalPlan, children: Seq[Expression] = Seq.empty, exprId: ExprId = NamedExpression.newExprId) extends SubqueryExpression(plan, children, exprId)Usage Example:
val usersTable = UnresolvedRelation(Seq("users"))
val ordersTable = UnresolvedRelation(Seq("orders"))
val userIdAttr = AttributeReference("user_id", IntegerType, false)()
// EXISTS subquery
val existsSubquery = Filter(
EqualTo(AttributeReference("user_id", IntegerType, false)(), userIdAttr),
ordersTable
)
val usersWithOrders = Filter(Exists(existsSubquery), usersTable)
// Scalar subquery
val countSubquery = Aggregate(
Seq.empty,
Seq(Alias(Count(Literal(1)), "order_count")()),
Filter(EqualTo(AttributeReference("user_id", IntegerType, false)(), userIdAttr), ordersTable)
)
val usersWithOrderCount = Project(
Seq(userIdAttr, Alias(ScalarSubquery(countSubquery), "order_count")()),
usersTable
)case class InsertIntoStatement(
table: LogicalPlan,
partition: Map[String, Option[String]],
userSpecifiedCols: Seq[String],
query: LogicalPlan,
overwrite: Boolean,
ifPartitionNotExists: Boolean = false
) extends Command
case class DeleteFromTable(table: LogicalPlan, condition: Option[Expression]) extends Command
case class UpdateTable(table: LogicalPlan, assignments: Seq[Assignment], condition: Option[Expression]) extends Command
case class MergeIntoTable(
targetTable: LogicalPlan,
sourceTable: LogicalPlan,
mergeCondition: Expression,
matchedActions: Seq[MergeAction],
notMatchedActions: Seq[MergeAction]
) extends Commandcase class CreateTable(
tableDesc: CatalogTable,
mode: SaveMode,
query: Option[LogicalPlan]
) extends Command
case class DropTable(
identifier: Seq[String],
ifExists: Boolean,
isView: Boolean
) extends Command
case class AlterTable(
table: LogicalPlan,
changes: Seq[TableChange]
) extends Commandabstract class SparkPlan extends QueryPlan[SparkPlan] {
def execute(): RDD[InternalRow]
def executeCollect(): Array[InternalRow]
def executeBroadcast[T](): broadcast.Broadcast[T]
def executeColumnar(): RDD[ColumnarBatch]
def requiredChildDistribution: Seq[Distribution]
def requiredChildOrdering: Seq[Seq[SortOrder]]
def outputPartitioning: Partitioning
def outputOrdering: Seq[SortOrder]
def metrics: Map[String, SQLMetric]
}sealed abstract class Distribution
case object UnspecifiedDistribution extends Distribution
case object AllTuples extends Distribution
case object BroadcastDistribution extends Distribution
case class HashClusteredDistribution(expressions: Seq[Expression]) extends Distribution
case class RangeDistribution(ordering: Seq[SortOrder]) extends Distribution
sealed abstract class Partitioning {
def numPartitions: Int
def satisfies(required: Distribution): Boolean
}
case class HashPartitioning(expressions: Seq[Expression], numPartitions: Int) extends Partitioning
case class RangePartitioning(ordering: Seq[SortOrder], numPartitions: Int) extends Partitioning
case class RoundRobinPartitioning(numPartitions: Int) extends Partitioning
case object SinglePartition extends Partitioning
case object UnknownPartitioning extends Partitioningabstract class Rule[TreeType <: TreeNode[TreeType]] {
val ruleName: String
def apply(plan: TreeType): TreeType
}
case class Batch(name: String, strategy: Strategy, rules: Rule[LogicalPlan]*)
sealed abstract class Strategy
case object Once extends Strategy
case class FixedPoint(maxIterations: Int) extends StrategyUsage Example:
// Custom optimization rule
object PushDownFilters extends Rule[LogicalPlan] {
val ruleName = "PushDownFilters"
def apply(plan: LogicalPlan): LogicalPlan = plan transform {
case Filter(condition, Join(left, right, joinType, joinCondition, hint)) =>
// Logic to push filter conditions down past joins
pushFilterThroughJoin(condition, Join(left, right, joinType, joinCondition, hint))
case other => other
}
private def pushFilterThroughJoin(filter: Expression, join: Join): LogicalPlan = {
// Implementation details for filter pushdown
join
}
}
// Apply optimization rules in batches
val optimizer = Seq(
Batch("Filter Pushdown", FixedPoint(10), PushDownFilters),
Batch("Constant Folding", Once, ConstantFolding)
)// Build a complex query plan
val usersTable = UnresolvedRelation(Seq("users"))
val ordersTable = UnresolvedRelation(Seq("orders"))
val userIdAttr = AttributeReference("user_id", IntegerType, false)()
val nameAttr = AttributeReference("name", StringType, false)()
val orderAmountAttr = AttributeReference("amount", DecimalType(10, 2), false)()
// SELECT u.name, SUM(o.amount) as total
// FROM users u JOIN orders o ON u.user_id = o.user_id
// WHERE u.active = true
// GROUP BY u.user_id, u.name
// HAVING SUM(o.amount) > 1000
// ORDER BY total DESC
// LIMIT 10
val joinCondition = EqualTo(userIdAttr, orderAmountAttr)
val joinedPlan = Join(usersTable, ordersTable, Inner, Some(joinCondition))
val filterCondition = EqualTo(AttributeReference("active", BooleanType, false)(), Literal(true))
val filteredPlan = Filter(filterCondition, joinedPlan)
val groupingExprs = Seq(userIdAttr, nameAttr)
val aggregateExprs = Seq(
nameAttr,
Alias(Sum(orderAmountAttr), "total")()
)
val aggregatedPlan = Aggregate(groupingExprs, aggregateExprs, filteredPlan)
val havingCondition = GreaterThan(Sum(orderAmountAttr), Literal(1000))
val havingPlan = Filter(havingCondition, aggregatedPlan)
val sortOrder = SortOrder(AttributeReference("total", DecimalType(10, 2), false)(), Descending, NullsLast)
val sortedPlan = Sort(Seq(sortOrder), global = true, havingPlan)
val finalPlan = Limit(Literal(10), sortedPlan)The query planning system in Catalyst provides a comprehensive framework for representing and optimizing SQL queries through a rich tree-based structure that enables sophisticated transformations and optimizations.
Install with Tessl CLI
npx tessl i tessl/maven-org-apache-spark--spark-catalyst-2-13