or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

analysis.mdexpressions.mdindex.mdoptimization.mdquery-planning.mdrow-operations.mdtree-operations.mdtypes.md
tile.json

query-planning.mddocs/

Query Planning

Logical and physical query plan representations with transformation and optimization capabilities for building and manipulating query execution trees.

Capabilities

QueryPlan Base Class

Base class for all query plans providing common functionality.

/**
 * Base class for all query plans, extends TreeNode
 */
abstract class QueryPlan[PlanType <: TreeNode[PlanType]] extends TreeNode[PlanType] {
  self: PlanType =>
  
  /** Attributes output by this node */
  def output: Seq[Attribute]
  
  /** Set of output attributes */
  def outputSet: AttributeSet
  
  /** Attributes referenced in expressions */
  def references: AttributeSet
  
  /** Attributes input from children */
  def inputSet: AttributeSet
  
  /** Referenced but missing attributes */
  def missingInput: AttributeSet
  
  /**
   * Transform expressions in this plan using the given rule
   */
  def transformExpressions(rule: PartialFunction[Expression, Expression]): this.type
  
  /**
   * Transform expressions with specific traversal order
   */
  def transformExpressionsDown(rule: PartialFunction[Expression, Expression]): this.type
  def transformExpressionsUp(rule: PartialFunction[Expression, Expression]): this.type
}

LogicalPlan Class

Base class for logical query plans representing query semantics.

/**
 * Base class for logical query plans
 */
abstract class LogicalPlan extends QueryPlan[LogicalPlan] with Logging {
  /** Whether plan has been analyzed */
  def analyzed: Boolean
  
  /** Mark plan as analyzed */
  private[catalyst] def setAnalyzed(): Unit
  
  /**
   * Apply transformation rules to operators, skipping analyzed sub-trees
   */
  def resolveOperators(rule: PartialFunction[LogicalPlan, LogicalPlan]): LogicalPlan
  
  /**
   * Transform expressions, skipping analyzed sub-trees
   */
  def resolveExpressions(r: PartialFunction[Expression, Expression]): LogicalPlan
  
  /** Compute plan statistics */
  def statistics: Statistics
}

Usage Examples:

import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.types._

// Create a simple logical plan
val relation = LocalRelation(
  AttributeReference("id", IntegerType, false)(),
  AttributeReference("name", StringType, true)()  
)

// Access plan properties
val output = relation.output          // Seq of attributes
val outputSet = relation.outputSet    // AttributeSet
val analyzed = relation.analyzed      // Boolean

// Create filter plan
val filterExpr = GreaterThan(
  AttributeReference("id", IntegerType, false)(),
  Literal(10, IntegerType)
)
val filtered = Filter(filterExpr, relation)

// Transform expressions in plan
val transformed = filtered.transformExpressions {
  case Literal(value: Int, dataType) => Literal(value * 2, dataType)
}

Join Types and Operations

Join type definitions and join plan operations.

/** Base trait for join types */
sealed abstract class JoinType {
  def sql: String
}

/** Inner join */
case object Inner extends JoinType {
  override def sql: String = "INNER"
}

/** Left outer join */
case object LeftOuter extends JoinType {
  override def sql: String = "LEFT OUTER"
}

/** Right outer join */
case object RightOuter extends JoinType {
  override def sql: String = "RIGHT OUTER"
}

/** Full outer join */
case object FullOuter extends JoinType {
  override def sql: String = "FULL OUTER"
}

/** Left semi join */
case object LeftSemi extends JoinType {
  override def sql: String = "LEFT SEMI"
}

/** Left anti join */
case object LeftAnti extends JoinType {
  override def sql: String = "LEFT ANTI"
}

/**
 * Join logical plan node
 */
case class Join(
    left: LogicalPlan,
    right: LogicalPlan,
    joinType: JoinType,
    condition: Option[Expression]) extends LogicalPlan {
  
  override def children: Seq[LogicalPlan] = Seq(left, right)
  
  override def output: Seq[Attribute] = {
    joinType match {
      case Inner | LeftOuter | RightOuter | FullOuter =>
        left.output ++ right.output
      case LeftSemi | LeftAnti =>
        left.output
    }
  }
}

Usage Examples:

import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.plans._
import org.apache.spark.sql.catalyst.expressions._

// Create relations
val users = LocalRelation(
  AttributeReference("user_id", IntegerType, false)(),
  AttributeReference("name", StringType, true)()
)

val orders = LocalRelation(
  AttributeReference("order_id", IntegerType, false)(),
  AttributeReference("user_id", IntegerType, false)(),
  AttributeReference("amount", DoubleType, true)()
)

// Create join condition
val joinCondition = EqualTo(
  users.output.find(_.name == "user_id").get,
  orders.output.find(_.name == "user_id").get
)

// Create different join types
val innerJoin = Join(users, orders, Inner, Some(joinCondition))
val leftJoin = Join(users, orders, LeftOuter, Some(joinCondition))
val rightJoin = Join(users, orders, RightOuter, Some(joinCondition))
val fullJoin = Join(users, orders, FullOuter, Some(joinCondition))
val semiJoin = Join(users, orders, LeftSemi, Some(joinCondition))

// Access join properties
val joinOutput = innerJoin.output    // Combined output from both sides
val joinChildren = innerJoin.children // Seq(users, orders)

Basic Logical Plan Operators

Fundamental logical plan operators for query construction.

/**
 * Filter (WHERE clause) logical plan
 */
case class Filter(condition: Expression, child: LogicalPlan) extends UnaryNode {
  override def output: Seq[Attribute] = child.output
}

/**
 * Projection (SELECT clause) logical plan
 */
case class Project(projectList: Seq[NamedExpression], child: LogicalPlan) extends UnaryNode {
  override def output: Seq[Attribute] = projectList.map(_.toAttribute)
}

/**
 * Aggregation (GROUP BY clause) logical plan
 */
case class Aggregate(
    groupingExpressions: Seq[Expression],
    aggregateExpressions: Seq[NamedExpression],
    child: LogicalPlan) extends UnaryNode {
  
  override def output: Seq[Attribute] = aggregateExpressions.map(_.toAttribute)
}

/**
 * Sort (ORDER BY clause) logical plan
 */
case class Sort(
    order: Seq[SortOrder],
    global: Boolean,
    child: LogicalPlan) extends UnaryNode {
  
  override def output: Seq[Attribute] = child.output
}

/**
 * Limit logical plan
 */
case class Limit(limitExpr: Expression, child: LogicalPlan) extends UnaryNode {
  override def output: Seq[Attribute] = child.output
}

/**
 * Union logical plan
 */
case class Union(children: Seq[LogicalPlan]) extends LogicalPlan {
  override def output: Seq[Attribute] = children.head.output
}

/**
 * Local relation with in-memory data
 */
case class LocalRelation(output: Seq[Attribute], data: Seq[InternalRow] = Nil) extends LeafNode {
  // Leaf node with no children
}

Usage Examples:

import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.types._

// Base relation
val table = LocalRelation(
  AttributeReference("id", IntegerType, false)(),
  AttributeReference("name", StringType, true)(),
  AttributeReference("age", IntegerType, true)(),
  AttributeReference("score", DoubleType, true)()
)

// Filter: WHERE age > 18
val filterExpr = GreaterThan(table.output(2), Literal(18, IntegerType))
val filtered = Filter(filterExpr, table)

// Project: SELECT id, name
val projectList = Seq(table.output(0), table.output(1))
val projected = Project(projectList, filtered)

// Aggregate: SELECT age, COUNT(*), AVG(score) GROUP BY age
val groupExpr = Seq(table.output(2))  // age
val aggExprs = Seq(
  table.output(2).as("age"),
  Count(Literal(1)).as("count"),
  Average(table.output(3)).as("avg_score")
)
val aggregated = Aggregate(groupExpr, aggExprs, table)

// Sort: ORDER BY score DESC
val sortOrder = Seq(SortOrder(table.output(3), Descending))
val sorted = Sort(sortOrder, global = true, table)

// Limit: LIMIT 10
val limited = Limit(Literal(10, IntegerType), sorted)

// Union: Combine two relations
val table2 = LocalRelation(table.output)  // Same schema
val unioned = Union(Seq(table, table2))

// Complex query: SELECT name FROM table WHERE age > 21 ORDER BY score LIMIT 5
val complexQuery = Limit(
  Literal(5, IntegerType),
  Sort(
    Seq(SortOrder(table.output(3), Ascending)),
    global = true,
    Project(
      Seq(table.output(1)),  // name
      Filter(
        GreaterThan(table.output(2), Literal(21, IntegerType)),  // age > 21
        table
      )
    )
  )
)

Physical Plan Integration

Bridge between logical and physical planning.

/**
 * Base class for physical execution plans
 */
abstract class SparkPlan extends QueryPlan[SparkPlan] {
  /** Execute this plan and return RDD of results */
  def execute(): RDD[InternalRow]
  
  /** Prepare this plan for execution */
  def prepare(): Unit
  
  /** Reset statistics and metrics */  
  def resetMetrics(): Unit
}

/**
 * Physical plan statistics
 */
case class Statistics(sizeInBytes: BigInt, rowCount: Option[BigInt] = None) {
  /** Whether these statistics are considered big */
  def isBroadcastable: Boolean = sizeInBytes <= autoBroadcastJoinThreshold
}

Usage Examples:

import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.plans.physical._

// Logical plan can be converted to physical plan
val logicalPlan = Project(
  Seq(AttributeReference("id", IntegerType, false)()),
  LocalRelation(AttributeReference("id", IntegerType, false)())
)

// Statistics computation
val stats = Statistics(sizeInBytes = 1000, rowCount = Some(100))
val broadcastable = stats.isBroadcastable  // Check if suitable for broadcast

// Physical properties like partitioning and ordering are preserved
// during logical to physical plan conversion