Catalyst is Spark's library for manipulating relational query plans and expressions
—
Catalyst's expression system provides a tree-based representation for all SQL operations, functions, and computations. The expression framework supports type-safe evaluation, code generation, and optimization transformations essential for query processing.
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.types._abstract class Expression extends TreeNode[Expression] {
def dataType: DataType
def nullable: Boolean
def eval(input: InternalRow): Any
def genCode(ctx: CodegenContext): ExprCode
def children: Seq[Expression]
def references: AttributeSet
def prettyName: String
def sql: String
def semanticEquals(other: Expression): Boolean
def deterministic: Boolean
def foldable: Boolean
def semanticHash(): Int
}The base Expression class provides evaluation capabilities, type information, and code generation support for all SQL expressions.
abstract class LeafExpression extends Expression {
final override def children: Seq[Expression] = Nil
}Expressions with no child expressions, such as literals and column references.
abstract class UnaryExpression extends Expression {
def child: Expression
final override def children: Seq[Expression] = child :: Nil
}Expressions with one child expression, such as mathematical functions and type casts.
abstract class BinaryExpression extends Expression {
def left: Expression
def right: Expression
final override def children: Seq[Expression] = Seq(left, right)
}Expressions with two child expressions, such as arithmetic and comparison operators.
abstract class TernaryExpression extends Expression {
def first: Expression
def second: Expression
def third: Expression
final override def children: Seq[Expression] = Seq(first, second, third)
}Expressions with three child expressions, such as conditional expressions and substring operations.
case class Literal(value: Any, dataType: DataType) extends LeafExpression {
def this(v: Any) = this(v, Literal.inferType(v))
}
object Literal {
def apply(v: Any): Literal
def create(v: Any, dataType: DataType): Literal
def default(dataType: DataType): Literal
def fromObject(obj: Any): Literal
val TrueLiteral: Literal
val FalseLiteral: Literal
}
case class AttributeReference(
name: String,
dataType: DataType,
nullable: Boolean = true,
metadata: Metadata = Metadata.empty
)(val exprId: ExprId = NamedExpression.newExprId,
val qualifier: Seq[String] = Seq.empty) extends AttributeUsage Example:
// Create literals
val intLit = Literal(42)
val stringLit = Literal("hello")
val nullLit = Literal.create(null, StringType)
// Create attribute references
val nameAttr = AttributeReference("name", StringType, nullable = false)()
val ageAttr = AttributeReference("age", IntegerType, nullable = true)()case class Add(left: Expression, right: Expression) extends BinaryArithmetic {
override def inputType: AbstractDataType = NumericType
protected override def nullSafeEval(input1: Any, input2: Any): Any
}
case class Subtract(left: Expression, right: Expression) extends BinaryArithmetic
case class Multiply(left: Expression, right: Expression) extends BinaryArithmetic
case class Divide(left: Expression, right: Expression) extends BinaryArithmetic
case class Remainder(left: Expression, right: Expression) extends BinaryArithmetic
case class UnaryMinus(child: Expression) extends UnaryArithmetic
case class UnaryPlus(child: Expression) extends UnaryArithmetic
case class Abs(child: Expression) extends UnaryMathExpressionUsage Example:
val col1 = AttributeReference("a", IntegerType, false)()
val col2 = AttributeReference("b", IntegerType, false)()
val addExpr = Add(col1, col2)
val subtractExpr = Subtract(col1, Literal(10))
val multiplyExpr = Multiply(col1, col2)
val absExpr = Abs(UnaryMinus(col1))case class EqualTo(left: Expression, right: Expression) extends BinaryComparison
case class EqualNullSafe(left: Expression, right: Expression) extends BinaryComparison
case class LessThan(left: Expression, right: Expression) extends BinaryComparison
case class LessThanOrEqual(left: Expression, right: Expression) extends BinaryComparison
case class GreaterThan(left: Expression, right: Expression) extends BinaryComparison
case class GreaterThanOrEqual(left: Expression, right: Expression) extends BinaryComparison
case class In(value: Expression, list: Seq[Expression]) extends Predicate
case class InSet(child: Expression, hset: Set[Any]) extends UnaryExpression with PredicateUsage Example:
val nameCol = AttributeReference("name", StringType, false)()
val ageCol = AttributeReference("age", IntegerType, true)()
val equalExpr = EqualTo(nameCol, Literal("Alice"))
val rangeExpr = And(GreaterThan(ageCol, Literal(18)), LessThan(ageCol, Literal(65)))
val inExpr = In(nameCol, Seq(Literal("Alice"), Literal("Bob"), Literal("Charlie")))case class And(left: Expression, right: Expression) extends BinaryExpression with Predicate {
override def dataType: DataType = BooleanType
}
case class Or(left: Expression, right: Expression) extends BinaryExpression with Predicate {
override def dataType: DataType = BooleanType
}
case class Not(child: Expression) extends UnaryExpression with Predicate {
override def dataType: DataType = BooleanType
}
case class IsNull(child: Expression) extends UnaryExpression with Predicate
case class IsNotNull(child: Expression) extends UnaryExpression with PredicateUsage Example:
val nameCol = AttributeReference("name", StringType, true)()
val ageCol = AttributeReference("age", IntegerType, true)()
val notNullName = IsNotNull(nameCol)
val validAge = And(GreaterThan(ageCol, Literal(0)), LessThan(ageCol, Literal(150)))
val condition = And(notNullName, validAge)case class Upper(child: Expression) extends UnaryExpression with ImplicitCastInputTypes
case class Lower(child: Expression) extends UnaryExpression with ImplicitCastInputTypes
case class Length(child: Expression) extends UnaryExpression with ImplicitCastInputTypes
case class Substring(str: Expression, pos: Expression, len: Expression) extends TernaryExpression with ImplicitCastInputTypes
case class Concat(children: Seq[Expression]) extends Expression with ImplicitCastInputTypes
case class RegExpReplace(subject: Expression, regexp: Expression, rep: Expression) extends TernaryExpression with ImplicitCastInputTypes
case class StartsWith(left: Expression, right: Expression) extends BinaryExpression with Predicate
case class EndsWith(left: Expression, right: Expression) extends BinaryExpression with Predicate
case class Contains(left: Expression, right: Expression) extends BinaryExpression with PredicateUsage Example:
val nameCol = AttributeReference("name", StringType, false)()
val descCol = AttributeReference("description", StringType, true)()
val upperName = Upper(nameCol)
val nameLength = Length(nameCol)
val substring = Substring(nameCol, Literal(1), Literal(3))
val concat = Concat(Seq(nameCol, Literal(" - "), descCol))
val startsWithA = StartsWith(nameCol, Literal("A"))case class CurrentDate(timeZoneId: Option[String] = None) extends LeafExpression with ImplicitCastInputTypes
case class CurrentTimestamp() extends LeafExpression with ImplicitCastInputTypes
case class Year(child: Expression) extends UnaryExpression with ImplicitCastInputTypes
case class Month(child: Expression) extends UnaryExpression with ImplicitCastInputTypes
case class DayOfMonth(child: Expression) extends UnaryExpression with ImplicitCastInputTypes
case class Hour(child: Expression) extends UnaryExpression with ImplicitCastInputTypes
case class Minute(child: Expression) extends UnaryExpression with ImplicitCastInputTypes
case class Second(child: Expression) extends UnaryExpression with ImplicitCastInputTypes
case class DateAdd(startDate: Expression, days: Expression) extends BinaryExpression with ImplicitCastInputTypes
case class DateSub(startDate: Expression, days: Expression) extends BinaryExpression with ImplicitCastInputTypesUsage Example:
val dateCol = AttributeReference("created_at", DateType, false)()
val timestampCol = AttributeReference("updated_at", TimestampType, true)()
val currentDate = CurrentDate()
val extractYear = Year(dateCol)
val extractMonth = Month(dateCol)
val addDays = DateAdd(dateCol, Literal(30))
val hourFromTimestamp = Hour(timestampCol)case class If(predicate: Expression, trueValue: Expression, falseValue: Expression) extends TernaryExpression
case class CaseWhen(branches: Seq[(Expression, Expression)], elseValue: Option[Expression] = None) extends Expression
case class Coalesce(children: Seq[Expression]) extends Expression
case class Greatest(children: Seq[Expression]) extends Expression
case class Least(children: Seq[Expression]) extends ExpressionUsage Example:
val ageCol = AttributeReference("age", IntegerType, true)()
val statusCol = AttributeReference("status", StringType, true)()
val ifExpr = If(GreaterThan(ageCol, Literal(18)), Literal("Adult"), Literal("Minor"))
val caseWhen = CaseWhen(Seq(
(EqualTo(statusCol, Literal("A")), Literal("Active")),
(EqualTo(statusCol, Literal("I")), Literal("Inactive"))
), Some(Literal("Unknown")))
val coalesce = Coalesce(Seq(statusCol, Literal("Default")))case class Cast(child: Expression, dataType: DataType, timeZoneId: Option[String] = None) extends UnaryExpression with TimeZoneAwareExpression
case class CheckOverflow(child: Expression, dataType: DecimalType, nullOnOverflow: Boolean) extends UnaryExpression
case class PromotePrecision(child: Expression) extends UnaryExpressionUsage Example:
val stringCol = AttributeReference("value", StringType, false)()
val intCol = AttributeReference("count", IntegerType, false)()
val castToInt = Cast(stringCol, IntegerType)
val castToDecimal = Cast(intCol, DecimalType(10, 2))
val castToString = Cast(intCol, StringType)case class Sum(child: Expression) extends DeclarativeAggregate
case class Count(children: Seq[Expression]) extends DeclarativeAggregate
case class Average(child: Expression) extends DeclarativeAggregate
case class Min(child: Expression) extends DeclarativeAggregate
case class Max(child: Expression) extends DeclarativeAggregate
case class First(child: Expression, ignoreNulls: Expression) extends DeclarativeAggregate
case class Last(child: Expression, ignoreNulls: Expression) extends DeclarativeAggregate
case class CollectList(child: Expression) extends TypedImperativeAggregate[mutable.ArrayBuffer[Any]]
case class CollectSet(child: Expression) extends TypedImperativeAggregate[mutable.Set[Any]]Usage Example:
val salaryCol = AttributeReference("salary", DecimalType(10, 2), true)()
val nameCol = AttributeReference("name", StringType, false)()
val sumSalary = Sum(salaryCol)
val countNames = Count(Seq(nameCol))
val avgSalary = Average(salaryCol)
val minSalary = Min(salaryCol)
val collectNames = CollectList(nameCol)case class RowNumber() extends RowNumberLike
case class Rank(children: Seq[Expression]) extends RankLike
case class DenseRank(children: Seq[Expression]) extends RankLike
case class Lead(input: Expression, offset: Expression, default: Expression) extends OffsetWindowFunction
case class Lag(input: Expression, offset: Expression, default: Expression) extends OffsetWindowFunction
case class FirstValue(child: Expression, ignoreNulls: Expression) extends AggregateWindowFunction
case class LastValue(child: Expression, ignoreNulls: Expression) extends AggregateWindowFunctionUsage Example:
val salaryCol = AttributeReference("salary", DecimalType(10, 2), false)()
val dateCol = AttributeReference("hire_date", DateType, false)()
val rowNum = RowNumber()
val rankBySalary = Rank(Seq(salaryCol))
val denseRankBySalary = DenseRank(Seq(salaryCol))
val prevSalary = Lag(salaryCol, Literal(1), Literal(0))
val nextSalary = Lead(salaryCol, Literal(1), salaryCol)case class CreateArray(children: Seq[Expression]) extends Expression
case class ArrayContains(left: Expression, right: Expression) extends BinaryExpression with Predicate
case class GetArrayItem(child: Expression, ordinal: Expression) extends BinaryExpression
case class Size(child: Expression) extends UnaryExpression
case class ArraySort(child: Expression) extends UnaryExpression
case class ArrayMin(child: Expression) extends UnaryExpression
case class ArrayMax(child: Expression) extends UnaryExpressionUsage Example:
val arrayCol = AttributeReference("tags", ArrayType(StringType), true)()
val indexCol = AttributeReference("index", IntegerType, false)()
val createArray = CreateArray(Seq(Literal("tag1"), Literal("tag2"), Literal("tag3")))
val arrayContains = ArrayContains(arrayCol, Literal("important"))
val getItem = GetArrayItem(arrayCol, indexCol)
val arraySize = Size(arrayCol)
val sortedArray = ArraySort(arrayCol)case class CreateMap(children: Seq[Expression]) extends Expression
case class GetMapValue(child: Expression, key: Expression) extends BinaryExpression
case class MapKeys(child: Expression) extends UnaryExpression
case class MapValues(child: Expression) extends UnaryExpressionUsage Example:
val mapCol = AttributeReference("properties", MapType(StringType, StringType), true)()
val keyCol = AttributeReference("key", StringType, false)()
val createMap = CreateMap(Seq(
Literal("name"), Literal("John"),
Literal("age"), Literal("30")
))
val getValue = GetMapValue(mapCol, keyCol)
val getKeys = MapKeys(mapCol)
val getValues = MapValues(mapCol)object ExpressionSet {
def apply(expressions: Seq[Expression]): ExpressionSet
}
case class AttributeSet(baseSet: Set[Attribute]) extends Traversable[Attribute] {
def +(attribute: Attribute): AttributeSet
def ++(other: AttributeSet): AttributeSet
def -(attribute: Attribute): AttributeSet
def --(other: AttributeSet): AttributeSet
def contains(attribute: Attribute): Boolean
def intersect(other: AttributeSet): AttributeSet
def subsetOf(other: AttributeSet): Boolean
}case class ExprCode(code: String, isNull: String, value: String)
class CodegenContext {
def freshName(name: String): String
def addReferenceObj(objName: String, obj: Any, className: String = null): String
def addNewFunction(funcName: String, funcCode: String): String
}val nameCol = AttributeReference("name", StringType, false)()
val ageCol = AttributeReference("age", IntegerType, true)()
val salaryCol = AttributeReference("salary", DecimalType(10, 2), true)()
// Complex filtering condition
val complexFilter = And(
IsNotNull(nameCol),
And(
GreaterThan(ageCol, Literal(21)),
Or(
GreaterThan(salaryCol, Literal(50000)),
StartsWith(nameCol, Literal("Senior"))
)
)
)
// Computed column
val computedSalary = If(
IsNull(salaryCol),
Literal(0),
Add(salaryCol, Multiply(salaryCol, Literal(0.1)))
)// Transform expressions using pattern matching
def removeRedundantCasts(expr: Expression): Expression = {
expr.transform {
case Cast(child, dataType, _) if child.dataType == dataType => child
case other => other
}
}
// Fold constant expressions
def foldConstants(expr: Expression): Expression = {
expr.transform {
case e if e.foldable => Literal.create(e.eval(EmptyRow), e.dataType)
case other => other
}
}The expression system in Catalyst provides a comprehensive framework for representing and evaluating all SQL operations with support for optimization, code generation, and type safety.
Install with Tessl CLI
npx tessl i tessl/maven-org-apache-spark--spark-catalyst-2-13