The Expression APIs provide a comprehensive framework for creating and manipulating expressions in Apache Spark Catalyst. These APIs support everything from simple literals and column references to complex transformations, aggregations, and custom user-defined functions.
Base interface for all expressions in the Catalyst system:
package org.apache.spark.sql.connector.expressions;
public interface Expression {
Expression[] EMPTY_EXPRESSION = new Expression[0];
NamedReference[] EMPTY_NAMED_REFERENCE = new NamedReference[0];
/**
* Human-readable description of this expression
*/
String describe();
/**
* Child expressions of this expression
*/
Expression[] children();
/**
* Named references used by this expression
*/
NamedReference[] references();
}Reference to a named field or column:
public interface NamedReference extends Expression {
/**
* Field name path (supporting nested fields)
*/
String[] fieldNames();
}Represents transformation functions:
public interface Transform extends Expression {
/**
* Arguments to this transformation
*/
NamedReference[] arguments();
}Central factory for creating common expressions:
public class Expressions {
// Literal values
public static Literal literal(Object value);
// Column references
public static NamedReference column(String name);
// Partitioning transforms
public static Transform identity(String column);
public static Transform bucket(int numBuckets, String... columns);
public static Transform years(String column);
public static Transform months(String column);
public static Transform days(String column);
public static Transform hours(String column);
}Basic Expression Usage:
// Create literal expressions
Literal intLiteral = Expressions.literal(42);
Literal stringLiteral = Expressions.literal("hello");
Literal boolLiteral = Expressions.literal(true);
Literal nullLiteral = Expressions.literal(null);
// Create column references
NamedReference userIdCol = Expressions.column("user_id");
NamedReference nameCol = Expressions.column("name");
// Create nested column references (for struct fields)
NamedReference nestedField = Expressions.column("address.street");
// Create transformation expressions
Transform identityTransform = Expressions.identity("partition_key");
Transform bucketTransform = Expressions.bucket(10, "user_id");
Transform yearTransform = Expressions.years("created_at");
Transform monthTransform = Expressions.months("created_at");
Transform dayTransform = Expressions.days("created_at");
Transform hourTransform = Expressions.hours("created_at");Container for aggregate operations:
package org.apache.spark.sql.connector.expressions.aggregate;
public class Aggregation {
/**
* Aggregate functions to apply
*/
public AggregateFunc[] aggregateExpressions();
/**
* Expressions to group by
*/
public Expression[] groupByExpressions();
}Base interface for aggregate functions:
public interface AggregateFunc extends Expression {
// Marker interface for aggregate expressions
}public class Count implements AggregateFunc {
public Expression column();
public boolean isDistinct();
}public class CountStar implements AggregateFunc {
// Count all rows (COUNT(*))
}public class Sum implements AggregateFunc {
public Expression column();
public boolean isDistinct();
}public class Avg implements AggregateFunc {
public Expression column();
public boolean isDistinct();
}public class Max implements AggregateFunc {
public Expression column();
}public class Min implements AggregateFunc {
public Expression column();
}Aggregate Usage Examples:
// Create aggregate expressions
Count countUsers = new Count(Expressions.column("user_id"), false);
CountStar countAll = new CountStar();
Sum totalRevenue = new Sum(Expressions.column("revenue"), false);
Avg avgAge = new Avg(Expressions.column("age"), false);
Max maxSalary = new Max(Expressions.column("salary"));
Min minDate = new Min(Expressions.column("start_date"));
// Create aggregation with grouping
Expression[] groupBy = new Expression[] {
Expressions.column("department"),
Expressions.years("hire_date")
};
AggregateFunc[] aggregates = new AggregateFunc[] {
new Count(Expressions.column("employee_id"), false),
new Avg(Expressions.column("salary"), false),
new Max(Expressions.column("salary")),
new Min(Expressions.column("salary"))
};
Aggregation aggregation = new Aggregation(aggregates, groupBy);Base interface for filter predicates:
package org.apache.spark.sql.connector.expressions.filter;
public interface Predicate extends Expression {
// Base interface for filter expressions
}public class AlwaysTrue implements Predicate {
// Predicate that always evaluates to true
}
public class AlwaysFalse implements Predicate {
// Predicate that always evaluates to false
}public class And implements Predicate {
public Predicate left();
public Predicate right();
}public class Or implements Predicate {
public Predicate left();
public Predicate right();
}public class Not implements Predicate {
public Predicate child();
}Complex Predicate Examples:
// Create basic predicates
Predicate activeUsers = new EqualTo(
Expressions.column("status"),
Expressions.literal("active")
);
Predicate adultUsers = new GreaterThan(
Expressions.column("age"),
Expressions.literal(18)
);
Predicate seniorUsers = new LessThan(
Expressions.column("age"),
Expressions.literal(65)
);
// Combine with logical operators
Predicate workingAge = new And(adultUsers, seniorUsers);
Predicate activeWorkingAge = new And(activeUsers, workingAge);
// Complex logical combinations
Predicate vipUsers = new EqualTo(
Expressions.column("tier"),
Expressions.literal("VIP")
);
Predicate eligibleUsers = new Or(activeWorkingAge, vipUsers);
// Negation
Predicate ineligibleUsers = new Not(eligibleUsers);Implement the Expression interface for custom logic:
public class CustomStringLength implements Expression {
private final NamedReference column;
public CustomStringLength(NamedReference column) {
this.column = column;
}
@Override
public String describe() {
return String.format("string_length(%s)", column.describe());
}
@Override
public Expression[] children() {
return new Expression[] { column };
}
@Override
public NamedReference[] references() {
return new NamedReference[] { column };
}
public NamedReference getColumn() {
return column;
}
}public class CustomMedian implements AggregateFunc {
private final NamedReference column;
public CustomMedian(NamedReference column) {
this.column = column;
}
@Override
public String describe() {
return String.format("median(%s)", column.describe());
}
@Override
public Expression[] children() {
return new Expression[] { column };
}
@Override
public NamedReference[] references() {
return new NamedReference[] { column };
}
public NamedReference getColumn() {
return column;
}
}public class CustomHashTransform implements Transform {
private final NamedReference[] columns;
private final int seed;
public CustomHashTransform(int seed, NamedReference... columns) {
this.seed = seed;
this.columns = columns;
}
@Override
public String describe() {
return String.format("custom_hash(%d, %s)", seed,
Arrays.stream(columns)
.map(NamedReference::describe)
.collect(Collectors.joining(", ")));
}
@Override
public NamedReference[] arguments() {
return columns.clone();
}
@Override
public Expression[] children() {
return columns.clone();
}
@Override
public NamedReference[] references() {
return columns.clone();
}
}For advanced custom expressions, you can extend Catalyst's internal expression hierarchy:
// Scala internal expression interfaces
package org.apache.spark.sql.catalyst.expressions
abstract class Expression {
def dataType: DataType
def nullable: Boolean
def eval(input: InternalRow): Any
def children: Seq[Expression]
}
abstract class LeafExpression extends Expression {
override final def children: Seq[Expression] = Nil
}
abstract class UnaryExpression extends Expression {
def child: Expression
override final def children: Seq[Expression] = child :: Nil
}
abstract class BinaryExpression extends Expression {
def left: Expression
def right: Expression
override final def children: Seq[Expression] = left :: right :: Nil
}case class CustomUpper(child: Expression) extends UnaryExpression {
override def dataType: DataType = StringType
override def nullable: Boolean = child.nullable
override def eval(input: InternalRow): Any = {
val value = child.eval(input)
if (value == null) null else value.toString.toUpperCase
}
// Code generation for performance
override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = {
val childGen = child.genCode(ctx)
val upperFunc = ctx.addNewFunction("upperCase",
s"""
|private String upperCase(String input) {
| return input == null ? null : input.toUpperCase();
|}
""".stripMargin)
ev.copy(code = s"""
|${childGen.code}
|boolean ${ev.isNull} = ${childGen.isNull};
|String ${ev.value} = ${ev.isNull} ? null : $upperFunc(${childGen.value});
""".stripMargin)
}
}case class Literal(value: Any, dataType: DataType) extends LeafExpression {
override def nullable: Boolean = value == null
override def eval(input: InternalRow): Any = value
}
object Literal {
val TrueLiteral: Literal = Literal(true, BooleanType)
val FalseLiteral: Literal = Literal(false, BooleanType)
def apply(v: Any): Literal = v match {
case null => Literal(null, NullType)
case i: Int => Literal(i, IntegerType)
case l: Long => Literal(l, LongType)
case d: Double => Literal(d, DoubleType)
case s: String => Literal(UTF8String.fromString(s), StringType)
// ... other types
}
}Efficient map keyed by expression attributes:
class AttributeMap[A](val baseMap: Map[ExprId, (Attribute, A)])
extends Map[Attribute, A] {
def get(k: Attribute): Option[A]
def contains(k: Attribute): Boolean
def +(kv: (Attribute, A)): AttributeMap[A]
def ++(other: AttributeMap[A]): AttributeMap[A]
}Set collection with semantic equality for expressions:
class ExpressionSet(val baseSet: Set[Expression]) extends Set[Expression] {
// Provides set operations with expression semantic equality
}High-performance row implementation:
public final class UnsafeRow extends InternalRow {
public static final int WORD_SIZE = 8;
public static int calculateBitSetWidthInBytes(int numFields);
public static boolean isFixedLength(DataType dt);
public static boolean isMutable(DataType dt);
// Efficient field access methods
public boolean isNullAt(int ordinal);
public int getInt(int ordinal);
public long getLong(int ordinal);
public UTF8String getUTF8String(int ordinal);
// ... other typed getters
}For high-performance expression evaluation:
public final class BufferHolder {
public BufferHolder(UnsafeRow row, int initialSize);
public void grow(int neededSize);
public byte[] getBuffer();
}
public final class UnsafeRowWriter {
public UnsafeRowWriter(BufferHolder holder, int numFields);
public void initialize();
public void write(int ordinal, boolean value);
public void write(int ordinal, int value);
public void write(int ordinal, long value);
public void write(int ordinal, UTF8String value);
// ... other write methods
}Code Generation Example:
public class OptimizedExpressionEvaluator {
public static InternalRow evaluateRow(Expression[] expressions, InternalRow input) {
UnsafeRow result = new UnsafeRow(expressions.length);
BufferHolder bufferHolder = new BufferHolder(result, 64);
UnsafeRowWriter writer = new UnsafeRowWriter(bufferHolder, expressions.length);
writer.initialize();
for (int i = 0; i < expressions.length; i++) {
Object value = expressions[i].eval(input);
if (value == null) {
writer.setNullAt(i);
} else {
// Write typed value based on expression data type
DataType dataType = expressions[i].dataType();
writeTypedValue(writer, i, value, dataType);
}
}
result.pointTo(bufferHolder.getBuffer(), bufferHolder.totalSize());
return result;
}
}Build complex expression trees:
public class ExpressionTreeBuilder {
public static Expression buildComplexFilter(Map<String, Object> criteria) {
List<Predicate> predicates = new ArrayList<>();
for (Map.Entry<String, Object> entry : criteria.entrySet()) {
String column = entry.getKey();
Object value = entry.getValue();
if (value instanceof List) {
// IN predicate for multiple values
List<?> values = (List<?>) value;
predicates.add(new In(
Expressions.column(column),
values.stream()
.map(Expressions::literal)
.toArray(Expression[]::new)
));
} else {
// Equality predicate
predicates.add(new EqualTo(
Expressions.column(column),
Expressions.literal(value)
));
}
}
// Combine all predicates with AND
return predicates.stream()
.reduce((p1, p2) -> new And(p1, p2))
.orElse(new AlwaysTrue());
}
}public abstract class ExpressionVisitor<T> {
public T visit(Expression expr) {
if (expr instanceof NamedReference) {
return visitNamedReference((NamedReference) expr);
} else if (expr instanceof Literal) {
return visitLiteral((Literal) expr);
} else if (expr instanceof And) {
return visitAnd((And) expr);
} else if (expr instanceof Or) {
return visitOr((Or) expr);
}
// ... handle other expression types
return visitDefault(expr);
}
protected abstract T visitNamedReference(NamedReference ref);
protected abstract T visitLiteral(Literal literal);
protected abstract T visitAnd(And and);
protected abstract T visitOr(Or or);
protected abstract T visitDefault(Expression expr);
}
// Example: Extract all column references
public class ColumnExtractor extends ExpressionVisitor<Set<String>> {
@Override
protected Set<String> visitNamedReference(NamedReference ref) {
return Set.of(String.join(".", ref.fieldNames()));
}
@Override
protected Set<String> visitAnd(And and) {
Set<String> result = new HashSet<>();
result.addAll(visit(and.left()));
result.addAll(visit(and.right()));
return result;
}
@Override
protected Set<String> visitOr(Or or) {
Set<String> result = new HashSet<>();
result.addAll(visit(or.left()));
result.addAll(visit(or.right()));
return result;
}
@Override
protected Set<String> visitDefault(Expression expr) {
Set<String> result = new HashSet<>();
for (Expression child : expr.children()) {
result.addAll(visit(child));
}
return result;
}
}public class OptimizedExpressionEvaluator {
private final Expression[] expressions;
private final boolean[] isConstant;
private final Object[] constantValues;
public OptimizedExpressionEvaluator(Expression[] expressions) {
this.expressions = expressions;
this.isConstant = new boolean[expressions.length];
this.constantValues = new Object[expressions.length];
// Pre-evaluate constant expressions
for (int i = 0; i < expressions.length; i++) {
if (isConstantExpression(expressions[i])) {
isConstant[i] = true;
constantValues[i] = expressions[i].eval(EmptyRow.INSTANCE);
}
}
}
public Object[] evaluate(InternalRow row) {
Object[] result = new Object[expressions.length];
for (int i = 0; i < expressions.length; i++) {
if (isConstant[i]) {
result[i] = constantValues[i];
} else {
result[i] = expressions[i].eval(row);
}
}
return result;
}
}The Expression APIs provide a comprehensive, extensible framework for building sophisticated data processing logic with high performance and type safety. They form the foundation for Spark's powerful SQL optimization and execution capabilities.