CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-apache-spark--spark-catalyst

Catalyst query optimization framework and expression evaluation engine for Apache Spark SQL

Pending
Overview
Eval results
Files

query-plans.mddocs/

Query Plans

This section covers the query plan system in Spark Catalyst, including logical and physical plan representations with tree-based transformations and optimization support. Query plans form the core of Catalyst's query processing pipeline.

Core Imports

import org.apache.spark.sql.catalyst.plans._
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.plans.physical._
import org.apache.spark.sql.catalyst.plans.joinTypes._
import org.apache.spark.sql.catalyst.expressions._

Query Plan Hierarchy

QueryPlan (abstract class)

Base class for all query plans (logical and physical), extending the tree framework.

abstract class QueryPlan[PlanType <: QueryPlan[PlanType]] extends TreeNode[PlanType] {
  def output: Seq[Attribute]
  def outputSet: AttributeSet
  def references: AttributeSet
  def inputSet: AttributeSet
  def schema: StructType
  def outputOrdering: Seq[SortOrder]
  def maxRows: Option[Long]
  def transformExpressions(rule: PartialFunction[Expression, Expression]): PlanType
  def transformExpressionsDown(rule: PartialFunction[Expression, Expression]): PlanType
  def transformExpressionsUp(rule: PartialFunction[Expression, Expression]): PlanType
}

LogicalPlan (abstract class)

Base class for all logical query plans representing the query structure before physical optimization.

abstract class LogicalPlan extends QueryPlan[LogicalPlan] {
  def resolved: Boolean
  def childrenResolved: Boolean
  def statistics: Statistics
}

Plan Node Types

abstract class LeafNode extends LogicalPlan {
  override final def children: Seq[LogicalPlan] = Nil
}

abstract class UnaryNode extends LogicalPlan {
  def child: LogicalPlan
  override final def children: Seq[LogicalPlan] = child :: Nil
}

abstract class BinaryNode extends LogicalPlan {
  def left: LogicalPlan
  def right: LogicalPlan
  override final def children: Seq[LogicalPlan] = left :: right :: Nil
}

Usage Example

import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.expressions._

// Create a simple query plan
val relation = UnresolvedRelation(TableIdentifier("users"))
val filter = Filter(EqualTo(UnresolvedAttribute("age"), Literal(25)), relation)
val project = Project(Seq(UnresolvedAttribute("name")), filter)

// Check if plan is resolved
val isResolved = project.resolved

// Get output schema
val schema = project.schema

Join Operations

Join Types

sealed abstract class JoinType {
  def sql: String
}

case object Inner extends JoinType
case object LeftOuter extends JoinType
case object RightOuter extends JoinType
case object FullOuter extends JoinType
case object LeftSemi extends JoinType
case object LeftAnti extends JoinType
case object Cross extends JoinType

Join Plan

case class Join(
  left: LogicalPlan,
  right: LogicalPlan,
  joinType: JoinType,
  condition: Option[Expression]
) extends BinaryNode {
  override def output: Seq[Attribute] = {
    joinType match {
      case LeftSemi | LeftAnti => left.output
      case _ => left.output ++ right.output
    }
  }
}

Usage Example

import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.plans.joinTypes._

// Create join operation
val users = UnresolvedRelation(TableIdentifier("users"))
val orders = UnresolvedRelation(TableIdentifier("orders"))
val joinCondition = EqualTo(
  UnresolvedAttribute("users.id"),
  UnresolvedAttribute("orders.user_id")
)
val join = Join(users, orders, Inner, Some(joinCondition))

Basic Logical Operators

Project (SELECT)

case class Project(
  projectList: Seq[NamedExpression],
  child: LogicalPlan
) extends UnaryNode {
  override def output: Seq[Attribute] = projectList.map(_.toAttribute)
}

Filter (WHERE)

case class Filter(
  condition: Expression,
  child: LogicalPlan
) extends UnaryNode {
  override def output: Seq[Attribute] = child.output
}

Aggregate (GROUP BY)

case class Aggregate(
  groupingExpressions: Seq[Expression],
  aggregateExpressions: Seq[NamedExpression],
  child: LogicalPlan
) extends UnaryNode {
  override def output: Seq[Attribute] = aggregateExpressions.map(_.toAttribute)
}

Sort (ORDER BY)

case class Sort(
  order: Seq[SortOrder],
  global: Boolean,
  child: LogicalPlan
) extends UnaryNode {
  override def output: Seq[Attribute] = child.output
}

case class SortOrder(
  child: Expression,
  direction: SortDirection,
  nullOrdering: NullOrdering,
  sameOrderExpressions: Set[Expression] = Set.empty
) extends Expression with Unevaluable {
  override def dataType: DataType = child.dataType
  override def nullable: Boolean = child.nullable
}

abstract class SortDirection {
  def sql: String
}
case object Ascending extends SortDirection
case object Descending extends SortDirection

abstract class NullOrdering {
  def sql: String
}
case object NullsFirst extends NullOrdering
case object NullsLast extends NullOrdering

Limit

case class Limit(
  limitExpr: Expression,
  child: LogicalPlan
) extends UnaryNode {
  override def output: Seq[Attribute] = child.output
}

Union

case class Union(children: Seq[LogicalPlan]) extends LogicalPlan {
  override def output: Seq[Attribute] = children.head.output
}

Distinct

case class Distinct(child: LogicalPlan) extends UnaryNode {
  override def output: Seq[Attribute] = child.output
}

Usage Example

import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.expressions._

// Build a complex query plan: SELECT name FROM users WHERE age > 18 ORDER BY name LIMIT 10
val relation = UnresolvedRelation(TableIdentifier("users"))

val filter = Filter(
  GreaterThan(UnresolvedAttribute("age"), Literal(18)),
  relation
)

val project = Project(
  Seq(UnresolvedAttribute("name")),
  filter
)

val sort = Sort(
  Seq(SortOrder(UnresolvedAttribute("name"), Ascending, NullsLast)),
  global = true,
  project
)

val limit = Limit(Literal(10), sort)

// The final plan represents: SELECT name FROM users WHERE age > 18 ORDER BY name LIMIT 10

Data Sources

UnresolvedRelation

case class UnresolvedRelation(
  tableIdentifier: TableIdentifier,
  alias: Option[String] = None
) extends LeafNode {
  override def output: Seq[Attribute] = Nil
}

case class TableIdentifier(
  table: String,
  database: Option[String] = None
) {
  def identifier: String = database.map(_ + ".").getOrElse("") + table
  def quotedString: String = database.map(quoteIdentifier).map(_ + ".").getOrElse("") + quoteIdentifier(table)
  def unquotedString: String = identifier
}

LocalRelation

case class LocalRelation(
  output: Seq[Attribute],
  data: Seq[InternalRow] = Nil,
  isStreaming: Boolean = false
) extends LeafNode {
  // Constructor for single row
  def this(output: Seq[Attribute], data: InternalRow) = this(output, data :: Nil)
}

Usage Example

import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.types._
import org.apache.spark.sql.catalyst.expressions._

// Create a local relation with data
val schema = Seq(
  AttributeReference("id", IntegerType, nullable = false)(),
  AttributeReference("name", StringType, nullable = true)()
)

val data = Seq(
  InternalRow(1, UTF8String.fromString("Alice")),
  InternalRow(2, UTF8String.fromString("Bob"))
)

val localRelation = LocalRelation(schema, data)

Statistics and Cost Information

Statistics

case class Statistics(
  sizeInBytes: BigInt,
  rowCount: Option[BigInt] = None,
  attributeStats: AttributeMap[ColumnStat] = AttributeMap.empty,
  hints: HintInfo = HintInfo()
) {
  def simpleString: String = {
    s"sizeInBytes=${Utils.bytesToString(sizeInBytes)}, " +
    s"rowCount=${rowCount.map(_.toString).getOrElse("unknown")}"
  }
}

case class ColumnStat(
  distinctCount: Option[BigInt] = None,
  min: Option[Any] = None,
  max: Option[Any] = None,
  nullCount: Option[BigInt] = None,
  avgLen: Option[Long] = None,
  maxLen: Option[Long] = None,
  histogram: Option[Histogram] = None
)

case class HintInfo(
  broadcast: Boolean = false,
  cartesianProduct: Boolean = false
)

Usage Example

import org.apache.spark.sql.catalyst.plans.logical._

// Create statistics for a relation
val stats = Statistics(
  sizeInBytes = 1024 * 1024, // 1MB
  rowCount = Some(1000),
  hints = HintInfo(broadcast = true)
)

// Attach statistics to a plan
val relation = LocalRelation(schema)
val relationWithStats = relation.copy().withNewChildren(relation.children).transform {
  case plan => plan.withStats(stats)
}

Physical Plans

While most physical plan details are in the execution engine, Catalyst defines the base physical plan structure:

Physical Plan Base

abstract class SparkPlan extends QueryPlan[SparkPlan] {
  def execute(): RDD[InternalRow]
  def executeCollect(): Array[InternalRow]
  def executeTake(n: Int): Array[InternalRow]
  def executeToIterator(): Iterator[InternalRow]
  
  def metrics: Map[String, SQLMetric]
  def longMetric(name: String): SQLMetric
}

Tree Transformations

Query plans inherit tree transformation capabilities:

Common Transformation Patterns

// Transform all expressions in a plan
val transformedPlan = plan.transformExpressions {
  case expr if expr.dataType == StringType =>
    Upper(expr)
}

// Transform the plan structure
val optimizedPlan = plan.transform {
  case Filter(condition, child) if condition == Literal(true) =>
    child  // Remove always-true filters
}

// Collect information from the plan
val allFilters = plan.collect {
  case filter: Filter => filter.condition
}

Usage Example

import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.expressions._

// Original plan with nested filters
val relation = UnresolvedRelation(TableIdentifier("users"))
val filter1 = Filter(GreaterThan(UnresolvedAttribute("age"), Literal(18)), relation)
val filter2 = Filter(EqualTo(UnresolvedAttribute("active"), Literal(true)), filter1)

// Combine filters into a single AND condition
val combinedPlan = filter2.transform {
  case Filter(condition1, Filter(condition2, child)) =>
    Filter(And(condition1, condition2), child)
}

// Result: Filter(And(active = true, age > 18), UnresolvedRelation(users))

Plan Validation

Resolution Status

// Check if a plan is fully resolved
def isFullyResolved(plan: LogicalPlan): Boolean = {
  plan.resolved && plan.children.forall(isFullyResolved)
}

// Find unresolved references
def findUnresolvedReferences(plan: LogicalPlan): Seq[UnresolvedAttribute] = {
  plan.collect {
    case expr: UnresolvedAttribute => expr
  }
}

This comprehensive query plan system enables Catalyst to represent, transform, and optimize SQL queries through a flexible tree-based structure that supports both logical planning and physical execution strategies.

Install with Tessl CLI

npx tessl i tessl/maven-org-apache-spark--spark-catalyst

docs

analysis.md

code-generation.md

data-types.md

expressions.md

index.md

optimization.md

parsing.md

query-plans.md

utilities.md

tile.json