or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

catalog-apis.mddata-source-v2-apis.mddistributions-api.mdexpression-apis.mdindex.mdlegacy-data-source-v1.mdmetrics-api.mdstreaming-apis.mdutilities-helpers.mdvectorized-processing.md
tile.json

expression-apis.mddocs/

Expression APIs

The Expression APIs provide a comprehensive framework for creating and manipulating expressions in Apache Spark Catalyst. These APIs support everything from simple literals and column references to complex transformations, aggregations, and custom user-defined functions.

Core Expression Interfaces

Expression

Base interface for all expressions in the Catalyst system:

package org.apache.spark.sql.connector.expressions;

public interface Expression {
    Expression[] EMPTY_EXPRESSION = new Expression[0];
    NamedReference[] EMPTY_NAMED_REFERENCE = new NamedReference[0];
    
    /**
     * Human-readable description of this expression
     */
    String describe();
    
    /**
     * Child expressions of this expression
     */
    Expression[] children();
    
    /**
     * Named references used by this expression
     */
    NamedReference[] references();
}

NamedReference

Reference to a named field or column:

public interface NamedReference extends Expression {
    /**
     * Field name path (supporting nested fields)
     */
    String[] fieldNames();
}

Transform

Represents transformation functions:

public interface Transform extends Expression {
    /**
     * Arguments to this transformation
     */
    NamedReference[] arguments();
}

Expression Factory

Expressions Class

Central factory for creating common expressions:

public class Expressions {
    // Literal values
    public static Literal literal(Object value);
    
    // Column references
    public static NamedReference column(String name);
    
    // Partitioning transforms
    public static Transform identity(String column);
    public static Transform bucket(int numBuckets, String... columns);
    public static Transform years(String column);
    public static Transform months(String column);
    public static Transform days(String column);
    public static Transform hours(String column);
}

Basic Expression Usage:

// Create literal expressions
Literal intLiteral = Expressions.literal(42);
Literal stringLiteral = Expressions.literal("hello");
Literal boolLiteral = Expressions.literal(true);
Literal nullLiteral = Expressions.literal(null);

// Create column references
NamedReference userIdCol = Expressions.column("user_id");
NamedReference nameCol = Expressions.column("name");

// Create nested column references (for struct fields)
NamedReference nestedField = Expressions.column("address.street");

// Create transformation expressions
Transform identityTransform = Expressions.identity("partition_key");
Transform bucketTransform = Expressions.bucket(10, "user_id");
Transform yearTransform = Expressions.years("created_at");
Transform monthTransform = Expressions.months("created_at");
Transform dayTransform = Expressions.days("created_at");
Transform hourTransform = Expressions.hours("created_at");

Aggregate Expressions

Aggregation

Container for aggregate operations:

package org.apache.spark.sql.connector.expressions.aggregate;

public class Aggregation {
    /**
     * Aggregate functions to apply
     */
    public AggregateFunc[] aggregateExpressions();
    
    /**
     * Expressions to group by
     */
    public Expression[] groupByExpressions();
}

AggregateFunc

Base interface for aggregate functions:

public interface AggregateFunc extends Expression {
    // Marker interface for aggregate expressions
}

Built-in Aggregate Functions

Count

public class Count implements AggregateFunc {
    public Expression column();
    public boolean isDistinct();
}

CountStar

public class CountStar implements AggregateFunc {
    // Count all rows (COUNT(*))
}

Sum

public class Sum implements AggregateFunc {
    public Expression column();
    public boolean isDistinct();
}

Avg

public class Avg implements AggregateFunc {
    public Expression column();
    public boolean isDistinct();
}

Max

public class Max implements AggregateFunc {
    public Expression column();
}

Min

public class Min implements AggregateFunc {
    public Expression column();
}

Aggregate Usage Examples:

// Create aggregate expressions
Count countUsers = new Count(Expressions.column("user_id"), false);
CountStar countAll = new CountStar();
Sum totalRevenue = new Sum(Expressions.column("revenue"), false);
Avg avgAge = new Avg(Expressions.column("age"), false);
Max maxSalary = new Max(Expressions.column("salary"));
Min minDate = new Min(Expressions.column("start_date"));

// Create aggregation with grouping
Expression[] groupBy = new Expression[] {
    Expressions.column("department"),
    Expressions.years("hire_date")
};

AggregateFunc[] aggregates = new AggregateFunc[] {
    new Count(Expressions.column("employee_id"), false),
    new Avg(Expressions.column("salary"), false),
    new Max(Expressions.column("salary")),
    new Min(Expressions.column("salary"))
};

Aggregation aggregation = new Aggregation(aggregates, groupBy);

Filter Predicates

Predicate

Base interface for filter predicates:

package org.apache.spark.sql.connector.expressions.filter;

public interface Predicate extends Expression {
    // Base interface for filter expressions
}

Basic Predicates

AlwaysTrue and AlwaysFalse

public class AlwaysTrue implements Predicate {
    // Predicate that always evaluates to true
}

public class AlwaysFalse implements Predicate {  
    // Predicate that always evaluates to false
}

Logical Predicates

And

public class And implements Predicate {
    public Predicate left();
    public Predicate right();
}

Or

public class Or implements Predicate {
    public Predicate left();
    public Predicate right();
}

Not

public class Not implements Predicate {
    public Predicate child();
}

Complex Predicate Examples:

// Create basic predicates
Predicate activeUsers = new EqualTo(
    Expressions.column("status"), 
    Expressions.literal("active")
);

Predicate adultUsers = new GreaterThan(
    Expressions.column("age"), 
    Expressions.literal(18)
);

Predicate seniorUsers = new LessThan(
    Expressions.column("age"), 
    Expressions.literal(65)
);

// Combine with logical operators
Predicate workingAge = new And(adultUsers, seniorUsers);
Predicate activeWorkingAge = new And(activeUsers, workingAge);

// Complex logical combinations
Predicate vipUsers = new EqualTo(
    Expressions.column("tier"), 
    Expressions.literal("VIP")
);

Predicate eligibleUsers = new Or(activeWorkingAge, vipUsers);

// Negation
Predicate ineligibleUsers = new Not(eligibleUsers);

Custom Expression Implementation

Creating Custom Expressions

Implement the Expression interface for custom logic:

public class CustomStringLength implements Expression {
    private final NamedReference column;
    
    public CustomStringLength(NamedReference column) {
        this.column = column;
    }
    
    @Override
    public String describe() {
        return String.format("string_length(%s)", column.describe());
    }
    
    @Override
    public Expression[] children() {
        return new Expression[] { column };
    }
    
    @Override
    public NamedReference[] references() {
        return new NamedReference[] { column };
    }
    
    public NamedReference getColumn() {
        return column;
    }
}

Custom Aggregate Function

public class CustomMedian implements AggregateFunc {
    private final NamedReference column;
    
    public CustomMedian(NamedReference column) {
        this.column = column;
    }
    
    @Override
    public String describe() {
        return String.format("median(%s)", column.describe());
    }
    
    @Override
    public Expression[] children() {
        return new Expression[] { column };
    }
    
    @Override
    public NamedReference[] references() {
        return new NamedReference[] { column };
    }
    
    public NamedReference getColumn() {
        return column;
    }
}

Custom Transform Function

public class CustomHashTransform implements Transform {
    private final NamedReference[] columns;
    private final int seed;
    
    public CustomHashTransform(int seed, NamedReference... columns) {
        this.seed = seed;
        this.columns = columns;
    }
    
    @Override
    public String describe() {
        return String.format("custom_hash(%d, %s)", seed, 
            Arrays.stream(columns)
                  .map(NamedReference::describe)
                  .collect(Collectors.joining(", ")));
    }
    
    @Override
    public NamedReference[] arguments() {
        return columns.clone();
    }
    
    @Override
    public Expression[] children() {
        return columns.clone();
    }
    
    @Override
    public NamedReference[] references() {
        return columns.clone();
    }
}

Working with Catalyst Internal Expressions (Scala)

Expression Base Classes

For advanced custom expressions, you can extend Catalyst's internal expression hierarchy:

// Scala internal expression interfaces
package org.apache.spark.sql.catalyst.expressions

abstract class Expression {
  def dataType: DataType
  def nullable: Boolean
  def eval(input: InternalRow): Any
  def children: Seq[Expression]
}

abstract class LeafExpression extends Expression {
  override final def children: Seq[Expression] = Nil
}

abstract class UnaryExpression extends Expression {
  def child: Expression
  override final def children: Seq[Expression] = child :: Nil
}

abstract class BinaryExpression extends Expression {
  def left: Expression
  def right: Expression
  override final def children: Seq[Expression] = left :: right :: Nil
}

Custom Catalyst Expression Example

case class CustomUpper(child: Expression) extends UnaryExpression {
  override def dataType: DataType = StringType
  override def nullable: Boolean = child.nullable
  
  override def eval(input: InternalRow): Any = {
    val value = child.eval(input)
    if (value == null) null else value.toString.toUpperCase
  }
  
  // Code generation for performance
  override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = {
    val childGen = child.genCode(ctx)
    val upperFunc = ctx.addNewFunction("upperCase",
      s"""
         |private String upperCase(String input) {
         |  return input == null ? null : input.toUpperCase();
         |}
       """.stripMargin)
    
    ev.copy(code = s"""
      |${childGen.code}
      |boolean ${ev.isNull} = ${childGen.isNull};
      |String ${ev.value} = ${ev.isNull} ? null : $upperFunc(${childGen.value});
    """.stripMargin)
  }
}

Literal Expressions

case class Literal(value: Any, dataType: DataType) extends LeafExpression {
  override def nullable: Boolean = value == null
  override def eval(input: InternalRow): Any = value
}

object Literal {
  val TrueLiteral: Literal = Literal(true, BooleanType)
  val FalseLiteral: Literal = Literal(false, BooleanType)
  
  def apply(v: Any): Literal = v match {
    case null => Literal(null, NullType)
    case i: Int => Literal(i, IntegerType)
    case l: Long => Literal(l, LongType)
    case d: Double => Literal(d, DoubleType)
    case s: String => Literal(UTF8String.fromString(s), StringType)
    // ... other types
  }
}

Expression Utilities

AttributeMap

Efficient map keyed by expression attributes:

class AttributeMap[A](val baseMap: Map[ExprId, (Attribute, A)]) 
    extends Map[Attribute, A] {
  
  def get(k: Attribute): Option[A]
  def contains(k: Attribute): Boolean
  def +(kv: (Attribute, A)): AttributeMap[A]
  def ++(other: AttributeMap[A]): AttributeMap[A]
}

ExpressionSet

Set collection with semantic equality for expressions:

class ExpressionSet(val baseSet: Set[Expression]) extends Set[Expression] {
  // Provides set operations with expression semantic equality
}

UnsafeRow

High-performance row implementation:

public final class UnsafeRow extends InternalRow {
    public static final int WORD_SIZE = 8;
    
    public static int calculateBitSetWidthInBytes(int numFields);
    public static boolean isFixedLength(DataType dt);
    public static boolean isMutable(DataType dt);
    
    // Efficient field access methods
    public boolean isNullAt(int ordinal);
    public int getInt(int ordinal);
    public long getLong(int ordinal);
    public UTF8String getUTF8String(int ordinal);
    // ... other typed getters
}

Code Generation Support

BufferHolder and UnsafeRowWriter

For high-performance expression evaluation:

public final class BufferHolder {
    public BufferHolder(UnsafeRow row, int initialSize);
    public void grow(int neededSize);
    public byte[] getBuffer();
}

public final class UnsafeRowWriter {
    public UnsafeRowWriter(BufferHolder holder, int numFields);
    public void initialize();
    public void write(int ordinal, boolean value);
    public void write(int ordinal, int value);
    public void write(int ordinal, long value);
    public void write(int ordinal, UTF8String value);
    // ... other write methods
}

Code Generation Example:

public class OptimizedExpressionEvaluator {
    public static InternalRow evaluateRow(Expression[] expressions, InternalRow input) {
        UnsafeRow result = new UnsafeRow(expressions.length);
        BufferHolder bufferHolder = new BufferHolder(result, 64);
        UnsafeRowWriter writer = new UnsafeRowWriter(bufferHolder, expressions.length);
        writer.initialize();
        
        for (int i = 0; i < expressions.length; i++) {
            Object value = expressions[i].eval(input);
            if (value == null) {
                writer.setNullAt(i);
            } else {
                // Write typed value based on expression data type
                DataType dataType = expressions[i].dataType();
                writeTypedValue(writer, i, value, dataType);
            }
        }
        
        result.pointTo(bufferHolder.getBuffer(), bufferHolder.totalSize());
        return result;
    }
}

Advanced Expression Patterns

Expression Trees

Build complex expression trees:

public class ExpressionTreeBuilder {
    public static Expression buildComplexFilter(Map<String, Object> criteria) {
        List<Predicate> predicates = new ArrayList<>();
        
        for (Map.Entry<String, Object> entry : criteria.entrySet()) {
            String column = entry.getKey();
            Object value = entry.getValue();
            
            if (value instanceof List) {
                // IN predicate for multiple values
                List<?> values = (List<?>) value;
                predicates.add(new In(
                    Expressions.column(column),
                    values.stream()
                          .map(Expressions::literal)
                          .toArray(Expression[]::new)
                ));
            } else {
                // Equality predicate
                predicates.add(new EqualTo(
                    Expressions.column(column),
                    Expressions.literal(value)
                ));
            }
        }
        
        // Combine all predicates with AND
        return predicates.stream()
                       .reduce((p1, p2) -> new And(p1, p2))
                       .orElse(new AlwaysTrue());
    }
}

Expression Visitor Pattern

public abstract class ExpressionVisitor<T> {
    public T visit(Expression expr) {
        if (expr instanceof NamedReference) {
            return visitNamedReference((NamedReference) expr);
        } else if (expr instanceof Literal) {
            return visitLiteral((Literal) expr);
        } else if (expr instanceof And) {
            return visitAnd((And) expr);
        } else if (expr instanceof Or) {
            return visitOr((Or) expr);
        }
        // ... handle other expression types
        return visitDefault(expr);
    }
    
    protected abstract T visitNamedReference(NamedReference ref);
    protected abstract T visitLiteral(Literal literal);
    protected abstract T visitAnd(And and);
    protected abstract T visitOr(Or or);
    protected abstract T visitDefault(Expression expr);
}

// Example: Extract all column references
public class ColumnExtractor extends ExpressionVisitor<Set<String>> {
    @Override
    protected Set<String> visitNamedReference(NamedReference ref) {
        return Set.of(String.join(".", ref.fieldNames()));
    }
    
    @Override
    protected Set<String> visitAnd(And and) {
        Set<String> result = new HashSet<>();
        result.addAll(visit(and.left()));
        result.addAll(visit(and.right()));
        return result;
    }
    
    @Override
    protected Set<String> visitOr(Or or) {
        Set<String> result = new HashSet<>();
        result.addAll(visit(or.left()));
        result.addAll(visit(or.right()));
        return result;
    }
    
    @Override
    protected Set<String> visitDefault(Expression expr) {
        Set<String> result = new HashSet<>();
        for (Expression child : expr.children()) {
            result.addAll(visit(child));
        }
        return result;
    }
}

Performance Considerations

Expression Evaluation Optimization

public class OptimizedExpressionEvaluator {
    private final Expression[] expressions;
    private final boolean[] isConstant;
    private final Object[] constantValues;
    
    public OptimizedExpressionEvaluator(Expression[] expressions) {
        this.expressions = expressions;
        this.isConstant = new boolean[expressions.length];
        this.constantValues = new Object[expressions.length];
        
        // Pre-evaluate constant expressions
        for (int i = 0; i < expressions.length; i++) {
            if (isConstantExpression(expressions[i])) {
                isConstant[i] = true;
                constantValues[i] = expressions[i].eval(EmptyRow.INSTANCE);
            }
        }
    }
    
    public Object[] evaluate(InternalRow row) {
        Object[] result = new Object[expressions.length];
        for (int i = 0; i < expressions.length; i++) {
            if (isConstant[i]) {
                result[i] = constantValues[i];
            } else {
                result[i] = expressions[i].eval(row);
            }
        }
        return result;
    }
}

The Expression APIs provide a comprehensive, extensible framework for building sophisticated data processing logic with high performance and type safety. They form the foundation for Spark's powerful SQL optimization and execution capabilities.