CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-apache-spark--spark-catalyst-2-12

Catalyst is a library for manipulating relational query plans within Apache Spark SQL

Overview
Eval results
Files

expression-apis.mddocs/

Expression APIs

The Expression APIs provide a comprehensive framework for creating and manipulating expressions in Apache Spark Catalyst. These APIs support everything from simple literals and column references to complex transformations, aggregations, and custom user-defined functions.

Core Expression Interfaces

Expression

Base interface for all expressions in the Catalyst system:

package org.apache.spark.sql.connector.expressions;

public interface Expression {
    Expression[] EMPTY_EXPRESSION = new Expression[0];
    NamedReference[] EMPTY_NAMED_REFERENCE = new NamedReference[0];
    
    /**
     * Human-readable description of this expression
     */
    String describe();
    
    /**
     * Child expressions of this expression
     */
    Expression[] children();
    
    /**
     * Named references used by this expression
     */
    NamedReference[] references();
}

NamedReference

Reference to a named field or column:

public interface NamedReference extends Expression {
    /**
     * Field name path (supporting nested fields)
     */
    String[] fieldNames();
}

Transform

Represents transformation functions:

public interface Transform extends Expression {
    /**
     * Arguments to this transformation
     */
    NamedReference[] arguments();
}

Expression Factory

Expressions Class

Central factory for creating common expressions:

public class Expressions {
    // Literal values
    public static Literal literal(Object value);
    
    // Column references
    public static NamedReference column(String name);
    
    // Partitioning transforms
    public static Transform identity(String column);
    public static Transform bucket(int numBuckets, String... columns);
    public static Transform years(String column);
    public static Transform months(String column);
    public static Transform days(String column);
    public static Transform hours(String column);
}

Basic Expression Usage:

// Create literal expressions
Literal intLiteral = Expressions.literal(42);
Literal stringLiteral = Expressions.literal("hello");
Literal boolLiteral = Expressions.literal(true);
Literal nullLiteral = Expressions.literal(null);

// Create column references
NamedReference userIdCol = Expressions.column("user_id");
NamedReference nameCol = Expressions.column("name");

// Create nested column references (for struct fields)
NamedReference nestedField = Expressions.column("address.street");

// Create transformation expressions
Transform identityTransform = Expressions.identity("partition_key");
Transform bucketTransform = Expressions.bucket(10, "user_id");
Transform yearTransform = Expressions.years("created_at");
Transform monthTransform = Expressions.months("created_at");
Transform dayTransform = Expressions.days("created_at");
Transform hourTransform = Expressions.hours("created_at");

Aggregate Expressions

Aggregation

Container for aggregate operations:

package org.apache.spark.sql.connector.expressions.aggregate;

public class Aggregation {
    /**
     * Aggregate functions to apply
     */
    public AggregateFunc[] aggregateExpressions();
    
    /**
     * Expressions to group by
     */
    public Expression[] groupByExpressions();
}

AggregateFunc

Base interface for aggregate functions:

public interface AggregateFunc extends Expression {
    // Marker interface for aggregate expressions
}

Built-in Aggregate Functions

Count

public class Count implements AggregateFunc {
    public Expression column();
    public boolean isDistinct();
}

CountStar

public class CountStar implements AggregateFunc {
    // Count all rows (COUNT(*))
}

Sum

public class Sum implements AggregateFunc {
    public Expression column();
    public boolean isDistinct();
}

Avg

public class Avg implements AggregateFunc {
    public Expression column();
    public boolean isDistinct();
}

Max

public class Max implements AggregateFunc {
    public Expression column();
}

Min

public class Min implements AggregateFunc {
    public Expression column();
}

Aggregate Usage Examples:

// Create aggregate expressions
Count countUsers = new Count(Expressions.column("user_id"), false);
CountStar countAll = new CountStar();
Sum totalRevenue = new Sum(Expressions.column("revenue"), false);
Avg avgAge = new Avg(Expressions.column("age"), false);
Max maxSalary = new Max(Expressions.column("salary"));
Min minDate = new Min(Expressions.column("start_date"));

// Create aggregation with grouping
Expression[] groupBy = new Expression[] {
    Expressions.column("department"),
    Expressions.years("hire_date")
};

AggregateFunc[] aggregates = new AggregateFunc[] {
    new Count(Expressions.column("employee_id"), false),
    new Avg(Expressions.column("salary"), false),
    new Max(Expressions.column("salary")),
    new Min(Expressions.column("salary"))
};

Aggregation aggregation = new Aggregation(aggregates, groupBy);

Filter Predicates

Predicate

Base interface for filter predicates:

package org.apache.spark.sql.connector.expressions.filter;

public interface Predicate extends Expression {
    // Base interface for filter expressions
}

Basic Predicates

AlwaysTrue and AlwaysFalse

public class AlwaysTrue implements Predicate {
    // Predicate that always evaluates to true
}

public class AlwaysFalse implements Predicate {  
    // Predicate that always evaluates to false
}

Logical Predicates

And

public class And implements Predicate {
    public Predicate left();
    public Predicate right();
}

Or

public class Or implements Predicate {
    public Predicate left();
    public Predicate right();
}

Not

public class Not implements Predicate {
    public Predicate child();
}

Complex Predicate Examples:

// Create basic predicates
Predicate activeUsers = new EqualTo(
    Expressions.column("status"), 
    Expressions.literal("active")
);

Predicate adultUsers = new GreaterThan(
    Expressions.column("age"), 
    Expressions.literal(18)
);

Predicate seniorUsers = new LessThan(
    Expressions.column("age"), 
    Expressions.literal(65)
);

// Combine with logical operators
Predicate workingAge = new And(adultUsers, seniorUsers);
Predicate activeWorkingAge = new And(activeUsers, workingAge);

// Complex logical combinations
Predicate vipUsers = new EqualTo(
    Expressions.column("tier"), 
    Expressions.literal("VIP")
);

Predicate eligibleUsers = new Or(activeWorkingAge, vipUsers);

// Negation
Predicate ineligibleUsers = new Not(eligibleUsers);

Custom Expression Implementation

Creating Custom Expressions

Implement the Expression interface for custom logic:

public class CustomStringLength implements Expression {
    private final NamedReference column;
    
    public CustomStringLength(NamedReference column) {
        this.column = column;
    }
    
    @Override
    public String describe() {
        return String.format("string_length(%s)", column.describe());
    }
    
    @Override
    public Expression[] children() {
        return new Expression[] { column };
    }
    
    @Override
    public NamedReference[] references() {
        return new NamedReference[] { column };
    }
    
    public NamedReference getColumn() {
        return column;
    }
}

Custom Aggregate Function

public class CustomMedian implements AggregateFunc {
    private final NamedReference column;
    
    public CustomMedian(NamedReference column) {
        this.column = column;
    }
    
    @Override
    public String describe() {
        return String.format("median(%s)", column.describe());
    }
    
    @Override
    public Expression[] children() {
        return new Expression[] { column };
    }
    
    @Override
    public NamedReference[] references() {
        return new NamedReference[] { column };
    }
    
    public NamedReference getColumn() {
        return column;
    }
}

Custom Transform Function

public class CustomHashTransform implements Transform {
    private final NamedReference[] columns;
    private final int seed;
    
    public CustomHashTransform(int seed, NamedReference... columns) {
        this.seed = seed;
        this.columns = columns;
    }
    
    @Override
    public String describe() {
        return String.format("custom_hash(%d, %s)", seed, 
            Arrays.stream(columns)
                  .map(NamedReference::describe)
                  .collect(Collectors.joining(", ")));
    }
    
    @Override
    public NamedReference[] arguments() {
        return columns.clone();
    }
    
    @Override
    public Expression[] children() {
        return columns.clone();
    }
    
    @Override
    public NamedReference[] references() {
        return columns.clone();
    }
}

Working with Catalyst Internal Expressions (Scala)

Expression Base Classes

For advanced custom expressions, you can extend Catalyst's internal expression hierarchy:

// Scala internal expression interfaces
package org.apache.spark.sql.catalyst.expressions

abstract class Expression {
  def dataType: DataType
  def nullable: Boolean
  def eval(input: InternalRow): Any
  def children: Seq[Expression]
}

abstract class LeafExpression extends Expression {
  override final def children: Seq[Expression] = Nil
}

abstract class UnaryExpression extends Expression {
  def child: Expression
  override final def children: Seq[Expression] = child :: Nil
}

abstract class BinaryExpression extends Expression {
  def left: Expression
  def right: Expression
  override final def children: Seq[Expression] = left :: right :: Nil
}

Custom Catalyst Expression Example

case class CustomUpper(child: Expression) extends UnaryExpression {
  override def dataType: DataType = StringType
  override def nullable: Boolean = child.nullable
  
  override def eval(input: InternalRow): Any = {
    val value = child.eval(input)
    if (value == null) null else value.toString.toUpperCase
  }
  
  // Code generation for performance
  override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = {
    val childGen = child.genCode(ctx)
    val upperFunc = ctx.addNewFunction("upperCase",
      s"""
         |private String upperCase(String input) {
         |  return input == null ? null : input.toUpperCase();
         |}
       """.stripMargin)
    
    ev.copy(code = s"""
      |${childGen.code}
      |boolean ${ev.isNull} = ${childGen.isNull};
      |String ${ev.value} = ${ev.isNull} ? null : $upperFunc(${childGen.value});
    """.stripMargin)
  }
}

Literal Expressions

case class Literal(value: Any, dataType: DataType) extends LeafExpression {
  override def nullable: Boolean = value == null
  override def eval(input: InternalRow): Any = value
}

object Literal {
  val TrueLiteral: Literal = Literal(true, BooleanType)
  val FalseLiteral: Literal = Literal(false, BooleanType)
  
  def apply(v: Any): Literal = v match {
    case null => Literal(null, NullType)
    case i: Int => Literal(i, IntegerType)
    case l: Long => Literal(l, LongType)
    case d: Double => Literal(d, DoubleType)
    case s: String => Literal(UTF8String.fromString(s), StringType)
    // ... other types
  }
}

Expression Utilities

AttributeMap

Efficient map keyed by expression attributes:

class AttributeMap[A](val baseMap: Map[ExprId, (Attribute, A)]) 
    extends Map[Attribute, A] {
  
  def get(k: Attribute): Option[A]
  def contains(k: Attribute): Boolean
  def +(kv: (Attribute, A)): AttributeMap[A]
  def ++(other: AttributeMap[A]): AttributeMap[A]
}

ExpressionSet

Set collection with semantic equality for expressions:

class ExpressionSet(val baseSet: Set[Expression]) extends Set[Expression] {
  // Provides set operations with expression semantic equality
}

UnsafeRow

High-performance row implementation:

public final class UnsafeRow extends InternalRow {
    public static final int WORD_SIZE = 8;
    
    public static int calculateBitSetWidthInBytes(int numFields);
    public static boolean isFixedLength(DataType dt);
    public static boolean isMutable(DataType dt);
    
    // Efficient field access methods
    public boolean isNullAt(int ordinal);
    public int getInt(int ordinal);
    public long getLong(int ordinal);
    public UTF8String getUTF8String(int ordinal);
    // ... other typed getters
}

Code Generation Support

BufferHolder and UnsafeRowWriter

For high-performance expression evaluation:

public final class BufferHolder {
    public BufferHolder(UnsafeRow row, int initialSize);
    public void grow(int neededSize);
    public byte[] getBuffer();
}

public final class UnsafeRowWriter {
    public UnsafeRowWriter(BufferHolder holder, int numFields);
    public void initialize();
    public void write(int ordinal, boolean value);
    public void write(int ordinal, int value);
    public void write(int ordinal, long value);
    public void write(int ordinal, UTF8String value);
    // ... other write methods
}

Code Generation Example:

public class OptimizedExpressionEvaluator {
    public static InternalRow evaluateRow(Expression[] expressions, InternalRow input) {
        UnsafeRow result = new UnsafeRow(expressions.length);
        BufferHolder bufferHolder = new BufferHolder(result, 64);
        UnsafeRowWriter writer = new UnsafeRowWriter(bufferHolder, expressions.length);
        writer.initialize();
        
        for (int i = 0; i < expressions.length; i++) {
            Object value = expressions[i].eval(input);
            if (value == null) {
                writer.setNullAt(i);
            } else {
                // Write typed value based on expression data type
                DataType dataType = expressions[i].dataType();
                writeTypedValue(writer, i, value, dataType);
            }
        }
        
        result.pointTo(bufferHolder.getBuffer(), bufferHolder.totalSize());
        return result;
    }
}

Advanced Expression Patterns

Expression Trees

Build complex expression trees:

public class ExpressionTreeBuilder {
    public static Expression buildComplexFilter(Map<String, Object> criteria) {
        List<Predicate> predicates = new ArrayList<>();
        
        for (Map.Entry<String, Object> entry : criteria.entrySet()) {
            String column = entry.getKey();
            Object value = entry.getValue();
            
            if (value instanceof List) {
                // IN predicate for multiple values
                List<?> values = (List<?>) value;
                predicates.add(new In(
                    Expressions.column(column),
                    values.stream()
                          .map(Expressions::literal)
                          .toArray(Expression[]::new)
                ));
            } else {
                // Equality predicate
                predicates.add(new EqualTo(
                    Expressions.column(column),
                    Expressions.literal(value)
                ));
            }
        }
        
        // Combine all predicates with AND
        return predicates.stream()
                       .reduce((p1, p2) -> new And(p1, p2))
                       .orElse(new AlwaysTrue());
    }
}

Expression Visitor Pattern

public abstract class ExpressionVisitor<T> {
    public T visit(Expression expr) {
        if (expr instanceof NamedReference) {
            return visitNamedReference((NamedReference) expr);
        } else if (expr instanceof Literal) {
            return visitLiteral((Literal) expr);
        } else if (expr instanceof And) {
            return visitAnd((And) expr);
        } else if (expr instanceof Or) {
            return visitOr((Or) expr);
        }
        // ... handle other expression types
        return visitDefault(expr);
    }
    
    protected abstract T visitNamedReference(NamedReference ref);
    protected abstract T visitLiteral(Literal literal);
    protected abstract T visitAnd(And and);
    protected abstract T visitOr(Or or);
    protected abstract T visitDefault(Expression expr);
}

// Example: Extract all column references
public class ColumnExtractor extends ExpressionVisitor<Set<String>> {
    @Override
    protected Set<String> visitNamedReference(NamedReference ref) {
        return Set.of(String.join(".", ref.fieldNames()));
    }
    
    @Override
    protected Set<String> visitAnd(And and) {
        Set<String> result = new HashSet<>();
        result.addAll(visit(and.left()));
        result.addAll(visit(and.right()));
        return result;
    }
    
    @Override
    protected Set<String> visitOr(Or or) {
        Set<String> result = new HashSet<>();
        result.addAll(visit(or.left()));
        result.addAll(visit(or.right()));
        return result;
    }
    
    @Override
    protected Set<String> visitDefault(Expression expr) {
        Set<String> result = new HashSet<>();
        for (Expression child : expr.children()) {
            result.addAll(visit(child));
        }
        return result;
    }
}

Performance Considerations

Expression Evaluation Optimization

public class OptimizedExpressionEvaluator {
    private final Expression[] expressions;
    private final boolean[] isConstant;
    private final Object[] constantValues;
    
    public OptimizedExpressionEvaluator(Expression[] expressions) {
        this.expressions = expressions;
        this.isConstant = new boolean[expressions.length];
        this.constantValues = new Object[expressions.length];
        
        // Pre-evaluate constant expressions
        for (int i = 0; i < expressions.length; i++) {
            if (isConstantExpression(expressions[i])) {
                isConstant[i] = true;
                constantValues[i] = expressions[i].eval(EmptyRow.INSTANCE);
            }
        }
    }
    
    public Object[] evaluate(InternalRow row) {
        Object[] result = new Object[expressions.length];
        for (int i = 0; i < expressions.length; i++) {
            if (isConstant[i]) {
                result[i] = constantValues[i];
            } else {
                result[i] = expressions[i].eval(row);
            }
        }
        return result;
    }
}

The Expression APIs provide a comprehensive, extensible framework for building sophisticated data processing logic with high performance and type safety. They form the foundation for Spark's powerful SQL optimization and execution capabilities.

Install with Tessl CLI

npx tessl i tessl/maven-org-apache-spark--spark-catalyst-2-12

docs

catalog-apis.md

data-source-v2-apis.md

distributions-api.md

expression-apis.md

index.md

legacy-data-source-v1.md

metrics-api.md

streaming-apis.md

utilities-helpers.md

vectorized-processing.md

tile.json