Catalyst is a library for manipulating relational query plans within Apache Spark SQL
The Expression APIs provide a comprehensive framework for creating and manipulating expressions in Apache Spark Catalyst. These APIs support everything from simple literals and column references to complex transformations, aggregations, and custom user-defined functions.
Base interface for all expressions in the Catalyst system:
package org.apache.spark.sql.connector.expressions;
public interface Expression {
Expression[] EMPTY_EXPRESSION = new Expression[0];
NamedReference[] EMPTY_NAMED_REFERENCE = new NamedReference[0];
/**
* Human-readable description of this expression
*/
String describe();
/**
* Child expressions of this expression
*/
Expression[] children();
/**
* Named references used by this expression
*/
NamedReference[] references();
}Reference to a named field or column:
public interface NamedReference extends Expression {
/**
* Field name path (supporting nested fields)
*/
String[] fieldNames();
}Represents transformation functions:
public interface Transform extends Expression {
/**
* Arguments to this transformation
*/
NamedReference[] arguments();
}Central factory for creating common expressions:
public class Expressions {
// Literal values
public static Literal literal(Object value);
// Column references
public static NamedReference column(String name);
// Partitioning transforms
public static Transform identity(String column);
public static Transform bucket(int numBuckets, String... columns);
public static Transform years(String column);
public static Transform months(String column);
public static Transform days(String column);
public static Transform hours(String column);
}Basic Expression Usage:
// Create literal expressions
Literal intLiteral = Expressions.literal(42);
Literal stringLiteral = Expressions.literal("hello");
Literal boolLiteral = Expressions.literal(true);
Literal nullLiteral = Expressions.literal(null);
// Create column references
NamedReference userIdCol = Expressions.column("user_id");
NamedReference nameCol = Expressions.column("name");
// Create nested column references (for struct fields)
NamedReference nestedField = Expressions.column("address.street");
// Create transformation expressions
Transform identityTransform = Expressions.identity("partition_key");
Transform bucketTransform = Expressions.bucket(10, "user_id");
Transform yearTransform = Expressions.years("created_at");
Transform monthTransform = Expressions.months("created_at");
Transform dayTransform = Expressions.days("created_at");
Transform hourTransform = Expressions.hours("created_at");Container for aggregate operations:
package org.apache.spark.sql.connector.expressions.aggregate;
public class Aggregation {
/**
* Aggregate functions to apply
*/
public AggregateFunc[] aggregateExpressions();
/**
* Expressions to group by
*/
public Expression[] groupByExpressions();
}Base interface for aggregate functions:
public interface AggregateFunc extends Expression {
// Marker interface for aggregate expressions
}public class Count implements AggregateFunc {
public Expression column();
public boolean isDistinct();
}public class CountStar implements AggregateFunc {
// Count all rows (COUNT(*))
}public class Sum implements AggregateFunc {
public Expression column();
public boolean isDistinct();
}public class Avg implements AggregateFunc {
public Expression column();
public boolean isDistinct();
}public class Max implements AggregateFunc {
public Expression column();
}public class Min implements AggregateFunc {
public Expression column();
}Aggregate Usage Examples:
// Create aggregate expressions
Count countUsers = new Count(Expressions.column("user_id"), false);
CountStar countAll = new CountStar();
Sum totalRevenue = new Sum(Expressions.column("revenue"), false);
Avg avgAge = new Avg(Expressions.column("age"), false);
Max maxSalary = new Max(Expressions.column("salary"));
Min minDate = new Min(Expressions.column("start_date"));
// Create aggregation with grouping
Expression[] groupBy = new Expression[] {
Expressions.column("department"),
Expressions.years("hire_date")
};
AggregateFunc[] aggregates = new AggregateFunc[] {
new Count(Expressions.column("employee_id"), false),
new Avg(Expressions.column("salary"), false),
new Max(Expressions.column("salary")),
new Min(Expressions.column("salary"))
};
Aggregation aggregation = new Aggregation(aggregates, groupBy);Base interface for filter predicates:
package org.apache.spark.sql.connector.expressions.filter;
public interface Predicate extends Expression {
// Base interface for filter expressions
}public class AlwaysTrue implements Predicate {
// Predicate that always evaluates to true
}
public class AlwaysFalse implements Predicate {
// Predicate that always evaluates to false
}public class And implements Predicate {
public Predicate left();
public Predicate right();
}public class Or implements Predicate {
public Predicate left();
public Predicate right();
}public class Not implements Predicate {
public Predicate child();
}Complex Predicate Examples:
// Create basic predicates
Predicate activeUsers = new EqualTo(
Expressions.column("status"),
Expressions.literal("active")
);
Predicate adultUsers = new GreaterThan(
Expressions.column("age"),
Expressions.literal(18)
);
Predicate seniorUsers = new LessThan(
Expressions.column("age"),
Expressions.literal(65)
);
// Combine with logical operators
Predicate workingAge = new And(adultUsers, seniorUsers);
Predicate activeWorkingAge = new And(activeUsers, workingAge);
// Complex logical combinations
Predicate vipUsers = new EqualTo(
Expressions.column("tier"),
Expressions.literal("VIP")
);
Predicate eligibleUsers = new Or(activeWorkingAge, vipUsers);
// Negation
Predicate ineligibleUsers = new Not(eligibleUsers);Implement the Expression interface for custom logic:
public class CustomStringLength implements Expression {
private final NamedReference column;
public CustomStringLength(NamedReference column) {
this.column = column;
}
@Override
public String describe() {
return String.format("string_length(%s)", column.describe());
}
@Override
public Expression[] children() {
return new Expression[] { column };
}
@Override
public NamedReference[] references() {
return new NamedReference[] { column };
}
public NamedReference getColumn() {
return column;
}
}public class CustomMedian implements AggregateFunc {
private final NamedReference column;
public CustomMedian(NamedReference column) {
this.column = column;
}
@Override
public String describe() {
return String.format("median(%s)", column.describe());
}
@Override
public Expression[] children() {
return new Expression[] { column };
}
@Override
public NamedReference[] references() {
return new NamedReference[] { column };
}
public NamedReference getColumn() {
return column;
}
}public class CustomHashTransform implements Transform {
private final NamedReference[] columns;
private final int seed;
public CustomHashTransform(int seed, NamedReference... columns) {
this.seed = seed;
this.columns = columns;
}
@Override
public String describe() {
return String.format("custom_hash(%d, %s)", seed,
Arrays.stream(columns)
.map(NamedReference::describe)
.collect(Collectors.joining(", ")));
}
@Override
public NamedReference[] arguments() {
return columns.clone();
}
@Override
public Expression[] children() {
return columns.clone();
}
@Override
public NamedReference[] references() {
return columns.clone();
}
}For advanced custom expressions, you can extend Catalyst's internal expression hierarchy:
// Scala internal expression interfaces
package org.apache.spark.sql.catalyst.expressions
abstract class Expression {
def dataType: DataType
def nullable: Boolean
def eval(input: InternalRow): Any
def children: Seq[Expression]
}
abstract class LeafExpression extends Expression {
override final def children: Seq[Expression] = Nil
}
abstract class UnaryExpression extends Expression {
def child: Expression
override final def children: Seq[Expression] = child :: Nil
}
abstract class BinaryExpression extends Expression {
def left: Expression
def right: Expression
override final def children: Seq[Expression] = left :: right :: Nil
}case class CustomUpper(child: Expression) extends UnaryExpression {
override def dataType: DataType = StringType
override def nullable: Boolean = child.nullable
override def eval(input: InternalRow): Any = {
val value = child.eval(input)
if (value == null) null else value.toString.toUpperCase
}
// Code generation for performance
override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = {
val childGen = child.genCode(ctx)
val upperFunc = ctx.addNewFunction("upperCase",
s"""
|private String upperCase(String input) {
| return input == null ? null : input.toUpperCase();
|}
""".stripMargin)
ev.copy(code = s"""
|${childGen.code}
|boolean ${ev.isNull} = ${childGen.isNull};
|String ${ev.value} = ${ev.isNull} ? null : $upperFunc(${childGen.value});
""".stripMargin)
}
}case class Literal(value: Any, dataType: DataType) extends LeafExpression {
override def nullable: Boolean = value == null
override def eval(input: InternalRow): Any = value
}
object Literal {
val TrueLiteral: Literal = Literal(true, BooleanType)
val FalseLiteral: Literal = Literal(false, BooleanType)
def apply(v: Any): Literal = v match {
case null => Literal(null, NullType)
case i: Int => Literal(i, IntegerType)
case l: Long => Literal(l, LongType)
case d: Double => Literal(d, DoubleType)
case s: String => Literal(UTF8String.fromString(s), StringType)
// ... other types
}
}Efficient map keyed by expression attributes:
class AttributeMap[A](val baseMap: Map[ExprId, (Attribute, A)])
extends Map[Attribute, A] {
def get(k: Attribute): Option[A]
def contains(k: Attribute): Boolean
def +(kv: (Attribute, A)): AttributeMap[A]
def ++(other: AttributeMap[A]): AttributeMap[A]
}Set collection with semantic equality for expressions:
class ExpressionSet(val baseSet: Set[Expression]) extends Set[Expression] {
// Provides set operations with expression semantic equality
}High-performance row implementation:
public final class UnsafeRow extends InternalRow {
public static final int WORD_SIZE = 8;
public static int calculateBitSetWidthInBytes(int numFields);
public static boolean isFixedLength(DataType dt);
public static boolean isMutable(DataType dt);
// Efficient field access methods
public boolean isNullAt(int ordinal);
public int getInt(int ordinal);
public long getLong(int ordinal);
public UTF8String getUTF8String(int ordinal);
// ... other typed getters
}For high-performance expression evaluation:
public final class BufferHolder {
public BufferHolder(UnsafeRow row, int initialSize);
public void grow(int neededSize);
public byte[] getBuffer();
}
public final class UnsafeRowWriter {
public UnsafeRowWriter(BufferHolder holder, int numFields);
public void initialize();
public void write(int ordinal, boolean value);
public void write(int ordinal, int value);
public void write(int ordinal, long value);
public void write(int ordinal, UTF8String value);
// ... other write methods
}Code Generation Example:
public class OptimizedExpressionEvaluator {
public static InternalRow evaluateRow(Expression[] expressions, InternalRow input) {
UnsafeRow result = new UnsafeRow(expressions.length);
BufferHolder bufferHolder = new BufferHolder(result, 64);
UnsafeRowWriter writer = new UnsafeRowWriter(bufferHolder, expressions.length);
writer.initialize();
for (int i = 0; i < expressions.length; i++) {
Object value = expressions[i].eval(input);
if (value == null) {
writer.setNullAt(i);
} else {
// Write typed value based on expression data type
DataType dataType = expressions[i].dataType();
writeTypedValue(writer, i, value, dataType);
}
}
result.pointTo(bufferHolder.getBuffer(), bufferHolder.totalSize());
return result;
}
}Build complex expression trees:
public class ExpressionTreeBuilder {
public static Expression buildComplexFilter(Map<String, Object> criteria) {
List<Predicate> predicates = new ArrayList<>();
for (Map.Entry<String, Object> entry : criteria.entrySet()) {
String column = entry.getKey();
Object value = entry.getValue();
if (value instanceof List) {
// IN predicate for multiple values
List<?> values = (List<?>) value;
predicates.add(new In(
Expressions.column(column),
values.stream()
.map(Expressions::literal)
.toArray(Expression[]::new)
));
} else {
// Equality predicate
predicates.add(new EqualTo(
Expressions.column(column),
Expressions.literal(value)
));
}
}
// Combine all predicates with AND
return predicates.stream()
.reduce((p1, p2) -> new And(p1, p2))
.orElse(new AlwaysTrue());
}
}public abstract class ExpressionVisitor<T> {
public T visit(Expression expr) {
if (expr instanceof NamedReference) {
return visitNamedReference((NamedReference) expr);
} else if (expr instanceof Literal) {
return visitLiteral((Literal) expr);
} else if (expr instanceof And) {
return visitAnd((And) expr);
} else if (expr instanceof Or) {
return visitOr((Or) expr);
}
// ... handle other expression types
return visitDefault(expr);
}
protected abstract T visitNamedReference(NamedReference ref);
protected abstract T visitLiteral(Literal literal);
protected abstract T visitAnd(And and);
protected abstract T visitOr(Or or);
protected abstract T visitDefault(Expression expr);
}
// Example: Extract all column references
public class ColumnExtractor extends ExpressionVisitor<Set<String>> {
@Override
protected Set<String> visitNamedReference(NamedReference ref) {
return Set.of(String.join(".", ref.fieldNames()));
}
@Override
protected Set<String> visitAnd(And and) {
Set<String> result = new HashSet<>();
result.addAll(visit(and.left()));
result.addAll(visit(and.right()));
return result;
}
@Override
protected Set<String> visitOr(Or or) {
Set<String> result = new HashSet<>();
result.addAll(visit(or.left()));
result.addAll(visit(or.right()));
return result;
}
@Override
protected Set<String> visitDefault(Expression expr) {
Set<String> result = new HashSet<>();
for (Expression child : expr.children()) {
result.addAll(visit(child));
}
return result;
}
}public class OptimizedExpressionEvaluator {
private final Expression[] expressions;
private final boolean[] isConstant;
private final Object[] constantValues;
public OptimizedExpressionEvaluator(Expression[] expressions) {
this.expressions = expressions;
this.isConstant = new boolean[expressions.length];
this.constantValues = new Object[expressions.length];
// Pre-evaluate constant expressions
for (int i = 0; i < expressions.length; i++) {
if (isConstantExpression(expressions[i])) {
isConstant[i] = true;
constantValues[i] = expressions[i].eval(EmptyRow.INSTANCE);
}
}
}
public Object[] evaluate(InternalRow row) {
Object[] result = new Object[expressions.length];
for (int i = 0; i < expressions.length; i++) {
if (isConstant[i]) {
result[i] = constantValues[i];
} else {
result[i] = expressions[i].eval(row);
}
}
return result;
}
}The Expression APIs provide a comprehensive, extensible framework for building sophisticated data processing logic with high performance and type safety. They form the foundation for Spark's powerful SQL optimization and execution capabilities.
Install with Tessl CLI
npx tessl i tessl/maven-org-apache-spark--spark-catalyst-2-12