CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-apache-spark--spark-catalyst-2-13

Catalyst is Spark's library for manipulating relational query plans and expressions

Pending
Overview
Eval results
Files

query-plans.mddocs/

Query Plans

Catalyst's query planning system provides logical and physical representations of SQL queries through a tree-based structure. The planning framework enables sophisticated query optimization including predicate pushdown, join reordering, and cost-based optimization.

Core Imports

import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.plans.physical._
import org.apache.spark.sql.catalyst.plans._
import org.apache.spark.sql.catalyst.expressions._

Plan Hierarchy

Base Plan Classes

abstract class LogicalPlan extends QueryPlan[LogicalPlan] {
  def output: Seq[Attribute]
  def children: Seq[LogicalPlan]  
  def resolved: Boolean
  def childrenResolved: Boolean
  def outputSet: AttributeSet
  def references: AttributeSet
  def inputSet: AttributeSet
  def producedAttributes: AttributeSet
  def missingInput: AttributeSet
  def schema: StructType
  def allAttributes: AttributeSet
  def isStreaming: Boolean
  def refresh(): Unit
}

abstract class QueryPlan[PlanType <: QueryPlan[PlanType]] extends TreeNode[PlanType] {
  def output: Seq[Attribute]
  def outputSet: AttributeSet
  def schema: StructType
  def printSchema(): Unit
  def simpleString: String
}

Plan Node Types by Arity

abstract class LeafNode extends LogicalPlan {
  override final def children: Seq[LogicalPlan] = Nil
}

abstract class UnaryNode extends LogicalPlan {
  def child: LogicalPlan
  override final def children: Seq[LogicalPlan] = child :: Nil
}

abstract class BinaryNode extends LogicalPlan {
  def left: LogicalPlan
  def right: LogicalPlan
  override final def children: Seq[LogicalPlan] = Seq(left, right)
}

Leaf Nodes

Data Sources

case class LocalRelation(output: Seq[Attribute], data: Seq[InternalRow] = Nil, isStreaming: Boolean = false) extends LeafNode

case class OneRowRelation() extends LeafNode {
  override def output: Seq[Attribute] = Nil
}

case class Range(
  start: Long,
  end: Long, 
  step: Long,
  numSlices: Option[Int],
  output: Seq[Attribute],
  isStreaming: Boolean = false
) extends LeafNode

case class UnresolvedRelation(
  multipartIdentifier: Seq[String],
  options: CaseInsensitiveStringMap = CaseInsensitiveStringMap.empty(),
  isStreaming: Boolean = false
) extends LeafNode

Usage Example:

// Create a local relation with data
val attributes = Seq(
  AttributeReference("id", IntegerType, false)(),
  AttributeReference("name", StringType, false)()
)
val data = Seq(
  InternalRow(1, UTF8String.fromString("Alice")),
  InternalRow(2, UTF8String.fromString("Bob"))
)
val localRelation = LocalRelation(attributes, data)

// Create a range relation
val rangeRelation = Range(1, 100, 1, Some(4), 
  Seq(AttributeReference("id", LongType, false)()))

// Reference a table by name
val tableRef = UnresolvedRelation(Seq("my_database", "my_table"))

Unary Nodes

Projection and Filtering

case class Project(projectList: Seq[NamedExpression], child: LogicalPlan) extends UnaryNode {
  override def output: Seq[Attribute] = projectList.map(_.toAttribute)
}

case class Filter(condition: Expression, child: LogicalPlan) extends UnaryNode {
  override def output: Seq[Attribute] = child.output
}

case class SubqueryAlias(identifier: String, child: LogicalPlan) extends UnaryNode {
  override def output: Seq[Attribute] = child.output.map(_.withQualifier(Seq(identifier)))
}

case class Limit(limitExpr: Expression, child: LogicalPlan) extends UnaryNode {
  override def output: Seq[Attribute] = child.output
}

Usage Example:

val baseRelation = UnresolvedRelation(Seq("users"))
val nameAttr = AttributeReference("name", StringType, false)()
val ageAttr = AttributeReference("age", IntegerType, true)()

// Project specific columns
val projection = Project(Seq(nameAttr, ageAttr), baseRelation)

// Filter rows
val filterCondition = GreaterThan(ageAttr, Literal(18))
val filteredPlan = Filter(filterCondition, projection)

// Add alias
val aliasedPlan = SubqueryAlias("u", filteredPlan)

// Limit results
val limitedPlan = Limit(Literal(100), aliasedPlan)

Sorting and Grouping

case class Sort(order: Seq[SortOrder], global: Boolean, child: LogicalPlan) extends UnaryNode

case class SortOrder(child: Expression, direction: SortDirection, nullOrdering: NullOrdering, sameOrderExpressions: Set[Expression] = Set.empty)

case class Aggregate(
  groupingExpressions: Seq[Expression],
  aggregateExpressions: Seq[NamedExpression], 
  child: LogicalPlan
) extends UnaryNode

case class Expand(
  projections: Seq[Seq[Expression]],
  output: Seq[Attribute],
  child: LogicalPlan
) extends UnaryNode

Usage Example:

val baseRelation = UnresolvedRelation(Seq("sales"))
val customerAttr = AttributeReference("customer_id", IntegerType, false)()
val amountAttr = AttributeReference("amount", DecimalType(10, 2), false)()
val dateAttr = AttributeReference("sale_date", DateType, false)()

// Sort by date descending
val sortOrder = SortOrder(dateAttr, Descending, NullsLast)
val sortedPlan = Sort(Seq(sortOrder), global = true, baseRelation)

// Group by customer and aggregate amount
val groupingExprs = Seq(customerAttr)
val aggregateExprs = Seq(
  customerAttr,
  Alias(Sum(amountAttr), "total_amount")()
)
val aggregatePlan = Aggregate(groupingExprs, aggregateExprs, baseRelation)

Window Operations

case class Window(
  windowExpressions: Seq[NamedExpression],
  partitionSpec: Seq[Expression],
  orderSpec: Seq[SortOrder], 
  child: LogicalPlan
) extends UnaryNode

case class WindowExpression(
  windowFunction: Expression,
  windowSpec: WindowSpecDefinition
) extends Expression

case class WindowSpecDefinition(
  partitionSpec: Seq[Expression],
  orderSpec: Seq[SortOrder],
  frameSpecification: WindowFrame
)

Usage Example:

val baseRelation = UnresolvedRelation(Seq("employees"))
val deptAttr = AttributeReference("department", StringType, false)()
val salaryAttr = AttributeReference("salary", DecimalType(10, 2), false)()

// Window function: rank employees by salary within each department
val windowSpec = WindowSpecDefinition(
  partitionSpec = Seq(deptAttr),
  orderSpec = Seq(SortOrder(salaryAttr, Descending, NullsLast)),
  frameSpecification = UnspecifiedFrame
)
val rankExpr = WindowExpression(Rank(Seq(salaryAttr)), windowSpec)
val windowExprs = Seq(Alias(rankExpr, "salary_rank")())

val windowPlan = Window(windowExprs, Seq(deptAttr), 
  Seq(SortOrder(salaryAttr, Descending, NullsLast)), baseRelation)

Binary Nodes

Join Operations

case class Join(
  left: LogicalPlan,
  right: LogicalPlan, 
  joinType: JoinType,
  condition: Option[Expression],
  hint: JoinHint = JoinHint.NONE
) extends BinaryNode

sealed abstract class JoinType {
  def sql: String
}
case object Inner extends JoinType
case object LeftOuter extends JoinType  
case object RightOuter extends JoinType
case object FullOuter extends JoinType
case object LeftSemi extends JoinType
case object LeftAnti extends JoinType
case object Cross extends JoinType

Usage Example:

val usersTable = UnresolvedRelation(Seq("users"))
val ordersTable = UnresolvedRelation(Seq("orders"))

val userIdAttr = AttributeReference("user_id", IntegerType, false)()
val orderUserIdAttr = AttributeReference("user_id", IntegerType, false)()

// Inner join users and orders
val joinCondition = EqualTo(userIdAttr, orderUserIdAttr)
val innerJoin = Join(usersTable, ordersTable, Inner, Some(joinCondition))

// Left outer join
val leftJoin = Join(usersTable, ordersTable, LeftOuter, Some(joinCondition))

// Cross join (Cartesian product)
val crossJoin = Join(usersTable, ordersTable, Cross, None)

Set Operations

case class Union(children: Seq[LogicalPlan], byName: Boolean = false, allowMissingCol: Boolean = false) extends LogicalPlan

case class Intersect(left: LogicalPlan, right: LogicalPlan, isAll: Boolean) extends BinaryNode

case class Except(left: LogicalPlan, right: LogicalPlan, isAll: Boolean) extends BinaryNode

Usage Example:

val currentUsers = UnresolvedRelation(Seq("current_users"))
val formerUsers = UnresolvedRelation(Seq("former_users"))
val activeUsers = UnresolvedRelation(Seq("active_users"))

// Union all users
val allUsers = Union(Seq(currentUsers, formerUsers))

// Find users in both current and active
val commonUsers = Intersect(currentUsers, activeUsers, isAll = false)

// Find current users who are not active
val inactiveUsers = Except(currentUsers, activeUsers, isAll = false)

Advanced Logical Plans

Subqueries

case class SubqueryExpression(
  plan: LogicalPlan,
  children: Seq[Expression] = Seq.empty,
  exprId: ExprId = NamedExpression.newExprId,
  joinCond: Seq[Expression] = Seq.empty,
  hint: Option[HintInfo] = None
) extends PlanExpression[LogicalPlan]

case class Exists(plan: LogicalPlan, children: Seq[Expression] = Seq.empty, exprId: ExprId = NamedExpression.newExprId) extends SubqueryExpression(plan, children, exprId)

case class ScalarSubquery(plan: LogicalPlan, children: Seq[Expression] = Seq.empty, exprId: ExprId = NamedExpression.newExprId) extends SubqueryExpression(plan, children, exprId)

case class ListQuery(plan: LogicalPlan, children: Seq[Expression] = Seq.empty, exprId: ExprId = NamedExpression.newExprId) extends SubqueryExpression(plan, children, exprId)

Usage Example:

val usersTable = UnresolvedRelation(Seq("users"))
val ordersTable = UnresolvedRelation(Seq("orders"))
val userIdAttr = AttributeReference("user_id", IntegerType, false)()

// EXISTS subquery
val existsSubquery = Filter(
  EqualTo(AttributeReference("user_id", IntegerType, false)(), userIdAttr),
  ordersTable
)
val usersWithOrders = Filter(Exists(existsSubquery), usersTable)

// Scalar subquery
val countSubquery = Aggregate(
  Seq.empty,
  Seq(Alias(Count(Literal(1)), "order_count")()),
  Filter(EqualTo(AttributeReference("user_id", IntegerType, false)(), userIdAttr), ordersTable)
)
val usersWithOrderCount = Project(
  Seq(userIdAttr, Alias(ScalarSubquery(countSubquery), "order_count")()),
  usersTable
)

Table Modification

case class InsertIntoStatement(
  table: LogicalPlan,
  partition: Map[String, Option[String]],
  userSpecifiedCols: Seq[String],
  query: LogicalPlan,
  overwrite: Boolean,
  ifPartitionNotExists: Boolean = false
) extends Command

case class DeleteFromTable(table: LogicalPlan, condition: Option[Expression]) extends Command

case class UpdateTable(table: LogicalPlan, assignments: Seq[Assignment], condition: Option[Expression]) extends Command

case class MergeIntoTable(
  targetTable: LogicalPlan,
  sourceTable: LogicalPlan, 
  mergeCondition: Expression,
  matchedActions: Seq[MergeAction],
  notMatchedActions: Seq[MergeAction]
) extends Command

Data Definition

case class CreateTable(
  tableDesc: CatalogTable,
  mode: SaveMode,
  query: Option[LogicalPlan]
) extends Command

case class DropTable(
  identifier: Seq[String],
  ifExists: Boolean,
  isView: Boolean
) extends Command

case class AlterTable(
  table: LogicalPlan,
  changes: Seq[TableChange]
) extends Command

Physical Planning Concepts

Physical Plan Base

abstract class SparkPlan extends QueryPlan[SparkPlan] {
  def execute(): RDD[InternalRow]
  def executeCollect(): Array[InternalRow]
  def executeBroadcast[T](): broadcast.Broadcast[T]
  def executeColumnar(): RDD[ColumnarBatch]
  def requiredChildDistribution: Seq[Distribution]
  def requiredChildOrdering: Seq[Seq[SortOrder]]
  def outputPartitioning: Partitioning
  def outputOrdering: Seq[SortOrder]
  def metrics: Map[String, SQLMetric]
}

Distribution and Partitioning

sealed abstract class Distribution

case object UnspecifiedDistribution extends Distribution
case object AllTuples extends Distribution  
case object BroadcastDistribution extends Distribution
case class HashClusteredDistribution(expressions: Seq[Expression]) extends Distribution
case class RangeDistribution(ordering: Seq[SortOrder]) extends Distribution

sealed abstract class Partitioning {
  def numPartitions: Int
  def satisfies(required: Distribution): Boolean
}

case class HashPartitioning(expressions: Seq[Expression], numPartitions: Int) extends Partitioning
case class RangePartitioning(ordering: Seq[SortOrder], numPartitions: Int) extends Partitioning
case class RoundRobinPartitioning(numPartitions: Int) extends Partitioning
case object SinglePartition extends Partitioning
case object UnknownPartitioning extends Partitioning

Plan Transformations

Rule-Based Optimization

abstract class Rule[TreeType <: TreeNode[TreeType]] {
  val ruleName: String
  def apply(plan: TreeType): TreeType
}

case class Batch(name: String, strategy: Strategy, rules: Rule[LogicalPlan]*)

sealed abstract class Strategy
case object Once extends Strategy  
case class FixedPoint(maxIterations: Int) extends Strategy

Usage Example:

// Custom optimization rule
object PushDownFilters extends Rule[LogicalPlan] {
  val ruleName = "PushDownFilters"
  
  def apply(plan: LogicalPlan): LogicalPlan = plan transform {
    case Filter(condition, Join(left, right, joinType, joinCondition, hint)) =>
      // Logic to push filter conditions down past joins
      pushFilterThroughJoin(condition, Join(left, right, joinType, joinCondition, hint))
    case other => other
  }
  
  private def pushFilterThroughJoin(filter: Expression, join: Join): LogicalPlan = {
    // Implementation details for filter pushdown
    join
  }
}

// Apply optimization rules in batches
val optimizer = Seq(
  Batch("Filter Pushdown", FixedPoint(10), PushDownFilters),
  Batch("Constant Folding", Once, ConstantFolding)
)

Common Query Plan Patterns

Building Query Plans Programmatically

// Build a complex query plan
val usersTable = UnresolvedRelation(Seq("users"))
val ordersTable = UnresolvedRelation(Seq("orders"))

val userIdAttr = AttributeReference("user_id", IntegerType, false)()
val nameAttr = AttributeReference("name", StringType, false)()
val orderAmountAttr = AttributeReference("amount", DecimalType(10, 2), false)()

// SELECT u.name, SUM(o.amount) as total
// FROM users u JOIN orders o ON u.user_id = o.user_id  
// WHERE u.active = true
// GROUP BY u.user_id, u.name
// HAVING SUM(o.amount) > 1000
// ORDER BY total DESC
// LIMIT 10

val joinCondition = EqualTo(userIdAttr, orderAmountAttr)
val joinedPlan = Join(usersTable, ordersTable, Inner, Some(joinCondition))

val filterCondition = EqualTo(AttributeReference("active", BooleanType, false)(), Literal(true))
val filteredPlan = Filter(filterCondition, joinedPlan)

val groupingExprs = Seq(userIdAttr, nameAttr)
val aggregateExprs = Seq(
  nameAttr,
  Alias(Sum(orderAmountAttr), "total")()
)
val aggregatedPlan = Aggregate(groupingExprs, aggregateExprs, filteredPlan)

val havingCondition = GreaterThan(Sum(orderAmountAttr), Literal(1000))
val havingPlan = Filter(havingCondition, aggregatedPlan)

val sortOrder = SortOrder(AttributeReference("total", DecimalType(10, 2), false)(), Descending, NullsLast)
val sortedPlan = Sort(Seq(sortOrder), global = true, havingPlan)

val finalPlan = Limit(Literal(10), sortedPlan)

The query planning system in Catalyst provides a comprehensive framework for representing and optimizing SQL queries through a rich tree-based structure that enables sophisticated transformations and optimizations.

Install with Tessl CLI

npx tessl i tessl/maven-org-apache-spark--spark-catalyst-2-13

docs

connectors.md

data-types.md

expressions.md

index.md

query-plans.md

tile.json