0
# Expression APIs
1
2
The Expression APIs provide a comprehensive framework for creating and manipulating expressions in Apache Spark Catalyst. These APIs support everything from simple literals and column references to complex transformations, aggregations, and custom user-defined functions.
3
4
## Core Expression Interfaces
5
6
### Expression
7
8
Base interface for all expressions in the Catalyst system:
9
10
```java { .api }
11
package org.apache.spark.sql.connector.expressions;
12
13
public interface Expression {
14
Expression[] EMPTY_EXPRESSION = new Expression[0];
15
NamedReference[] EMPTY_NAMED_REFERENCE = new NamedReference[0];
16
17
/**
18
* Human-readable description of this expression
19
*/
20
String describe();
21
22
/**
23
* Child expressions of this expression
24
*/
25
Expression[] children();
26
27
/**
28
* Named references used by this expression
29
*/
30
NamedReference[] references();
31
}
32
```
33
34
### NamedReference
35
36
Reference to a named field or column:
37
38
```java { .api }
39
public interface NamedReference extends Expression {
40
/**
41
* Field name path (supporting nested fields)
42
*/
43
String[] fieldNames();
44
}
45
```
46
47
### Transform
48
49
Represents transformation functions:
50
51
```java { .api }
52
public interface Transform extends Expression {
53
/**
54
* Arguments to this transformation
55
*/
56
NamedReference[] arguments();
57
}
58
```
59
60
## Expression Factory
61
62
### Expressions Class
63
64
Central factory for creating common expressions:
65
66
```java { .api }
67
public class Expressions {
68
// Literal values
69
public static Literal literal(Object value);
70
71
// Column references
72
public static NamedReference column(String name);
73
74
// Partitioning transforms
75
public static Transform identity(String column);
76
public static Transform bucket(int numBuckets, String... columns);
77
public static Transform years(String column);
78
public static Transform months(String column);
79
public static Transform days(String column);
80
public static Transform hours(String column);
81
}
82
```
83
84
**Basic Expression Usage:**
85
86
```java
87
// Create literal expressions
88
Literal intLiteral = Expressions.literal(42);
89
Literal stringLiteral = Expressions.literal("hello");
90
Literal boolLiteral = Expressions.literal(true);
91
Literal nullLiteral = Expressions.literal(null);
92
93
// Create column references
94
NamedReference userIdCol = Expressions.column("user_id");
95
NamedReference nameCol = Expressions.column("name");
96
97
// Create nested column references (for struct fields)
98
NamedReference nestedField = Expressions.column("address.street");
99
100
// Create transformation expressions
101
Transform identityTransform = Expressions.identity("partition_key");
102
Transform bucketTransform = Expressions.bucket(10, "user_id");
103
Transform yearTransform = Expressions.years("created_at");
104
Transform monthTransform = Expressions.months("created_at");
105
Transform dayTransform = Expressions.days("created_at");
106
Transform hourTransform = Expressions.hours("created_at");
107
```
108
109
## Aggregate Expressions
110
111
### Aggregation
112
113
Container for aggregate operations:
114
115
```java { .api }
116
package org.apache.spark.sql.connector.expressions.aggregate;
117
118
public class Aggregation {
119
/**
120
* Aggregate functions to apply
121
*/
122
public AggregateFunc[] aggregateExpressions();
123
124
/**
125
* Expressions to group by
126
*/
127
public Expression[] groupByExpressions();
128
}
129
```
130
131
### AggregateFunc
132
133
Base interface for aggregate functions:
134
135
```java { .api }
136
public interface AggregateFunc extends Expression {
137
// Marker interface for aggregate expressions
138
}
139
```
140
141
### Built-in Aggregate Functions
142
143
#### Count
144
```java { .api }
145
public class Count implements AggregateFunc {
146
public Expression column();
147
public boolean isDistinct();
148
}
149
```
150
151
#### CountStar
152
```java { .api }
153
public class CountStar implements AggregateFunc {
154
// Count all rows (COUNT(*))
155
}
156
```
157
158
#### Sum
159
```java { .api }
160
public class Sum implements AggregateFunc {
161
public Expression column();
162
public boolean isDistinct();
163
}
164
```
165
166
#### Avg
167
```java { .api }
168
public class Avg implements AggregateFunc {
169
public Expression column();
170
public boolean isDistinct();
171
}
172
```
173
174
#### Max
175
```java { .api }
176
public class Max implements AggregateFunc {
177
public Expression column();
178
}
179
```
180
181
#### Min
182
```java { .api }
183
public class Min implements AggregateFunc {
184
public Expression column();
185
}
186
```
187
188
**Aggregate Usage Examples:**
189
190
```java
191
// Create aggregate expressions
192
Count countUsers = new Count(Expressions.column("user_id"), false);
193
CountStar countAll = new CountStar();
194
Sum totalRevenue = new Sum(Expressions.column("revenue"), false);
195
Avg avgAge = new Avg(Expressions.column("age"), false);
196
Max maxSalary = new Max(Expressions.column("salary"));
197
Min minDate = new Min(Expressions.column("start_date"));
198
199
// Create aggregation with grouping
200
Expression[] groupBy = new Expression[] {
201
Expressions.column("department"),
202
Expressions.years("hire_date")
203
};
204
205
AggregateFunc[] aggregates = new AggregateFunc[] {
206
new Count(Expressions.column("employee_id"), false),
207
new Avg(Expressions.column("salary"), false),
208
new Max(Expressions.column("salary")),
209
new Min(Expressions.column("salary"))
210
};
211
212
Aggregation aggregation = new Aggregation(aggregates, groupBy);
213
```
214
215
## Filter Predicates
216
217
### Predicate
218
219
Base interface for filter predicates:
220
221
```java { .api }
222
package org.apache.spark.sql.connector.expressions.filter;
223
224
public interface Predicate extends Expression {
225
// Base interface for filter expressions
226
}
227
```
228
229
### Basic Predicates
230
231
#### AlwaysTrue and AlwaysFalse
232
```java { .api }
233
public class AlwaysTrue implements Predicate {
234
// Predicate that always evaluates to true
235
}
236
237
public class AlwaysFalse implements Predicate {
238
// Predicate that always evaluates to false
239
}
240
```
241
242
### Logical Predicates
243
244
#### And
245
```java { .api }
246
public class And implements Predicate {
247
public Predicate left();
248
public Predicate right();
249
}
250
```
251
252
#### Or
253
```java { .api }
254
public class Or implements Predicate {
255
public Predicate left();
256
public Predicate right();
257
}
258
```
259
260
#### Not
261
```java { .api }
262
public class Not implements Predicate {
263
public Predicate child();
264
}
265
```
266
267
**Complex Predicate Examples:**
268
269
```java
270
// Create basic predicates
271
Predicate activeUsers = new EqualTo(
272
Expressions.column("status"),
273
Expressions.literal("active")
274
);
275
276
Predicate adultUsers = new GreaterThan(
277
Expressions.column("age"),
278
Expressions.literal(18)
279
);
280
281
Predicate seniorUsers = new LessThan(
282
Expressions.column("age"),
283
Expressions.literal(65)
284
);
285
286
// Combine with logical operators
287
Predicate workingAge = new And(adultUsers, seniorUsers);
288
Predicate activeWorkingAge = new And(activeUsers, workingAge);
289
290
// Complex logical combinations
291
Predicate vipUsers = new EqualTo(
292
Expressions.column("tier"),
293
Expressions.literal("VIP")
294
);
295
296
Predicate eligibleUsers = new Or(activeWorkingAge, vipUsers);
297
298
// Negation
299
Predicate ineligibleUsers = new Not(eligibleUsers);
300
```
301
302
## Custom Expression Implementation
303
304
### Creating Custom Expressions
305
306
Implement the Expression interface for custom logic:
307
308
```java
309
public class CustomStringLength implements Expression {
310
private final NamedReference column;
311
312
public CustomStringLength(NamedReference column) {
313
this.column = column;
314
}
315
316
@Override
317
public String describe() {
318
return String.format("string_length(%s)", column.describe());
319
}
320
321
@Override
322
public Expression[] children() {
323
return new Expression[] { column };
324
}
325
326
@Override
327
public NamedReference[] references() {
328
return new NamedReference[] { column };
329
}
330
331
public NamedReference getColumn() {
332
return column;
333
}
334
}
335
```
336
337
### Custom Aggregate Function
338
339
```java
340
public class CustomMedian implements AggregateFunc {
341
private final NamedReference column;
342
343
public CustomMedian(NamedReference column) {
344
this.column = column;
345
}
346
347
@Override
348
public String describe() {
349
return String.format("median(%s)", column.describe());
350
}
351
352
@Override
353
public Expression[] children() {
354
return new Expression[] { column };
355
}
356
357
@Override
358
public NamedReference[] references() {
359
return new NamedReference[] { column };
360
}
361
362
public NamedReference getColumn() {
363
return column;
364
}
365
}
366
```
367
368
### Custom Transform Function
369
370
```java
371
public class CustomHashTransform implements Transform {
372
private final NamedReference[] columns;
373
private final int seed;
374
375
public CustomHashTransform(int seed, NamedReference... columns) {
376
this.seed = seed;
377
this.columns = columns;
378
}
379
380
@Override
381
public String describe() {
382
return String.format("custom_hash(%d, %s)", seed,
383
Arrays.stream(columns)
384
.map(NamedReference::describe)
385
.collect(Collectors.joining(", ")));
386
}
387
388
@Override
389
public NamedReference[] arguments() {
390
return columns.clone();
391
}
392
393
@Override
394
public Expression[] children() {
395
return columns.clone();
396
}
397
398
@Override
399
public NamedReference[] references() {
400
return columns.clone();
401
}
402
}
403
```
404
405
## Working with Catalyst Internal Expressions (Scala)
406
407
### Expression Base Classes
408
409
For advanced custom expressions, you can extend Catalyst's internal expression hierarchy:
410
411
```scala { .api }
412
// Scala internal expression interfaces
413
package org.apache.spark.sql.catalyst.expressions
414
415
abstract class Expression {
416
def dataType: DataType
417
def nullable: Boolean
418
def eval(input: InternalRow): Any
419
def children: Seq[Expression]
420
}
421
422
abstract class LeafExpression extends Expression {
423
override final def children: Seq[Expression] = Nil
424
}
425
426
abstract class UnaryExpression extends Expression {
427
def child: Expression
428
override final def children: Seq[Expression] = child :: Nil
429
}
430
431
abstract class BinaryExpression extends Expression {
432
def left: Expression
433
def right: Expression
434
override final def children: Seq[Expression] = left :: right :: Nil
435
}
436
```
437
438
### Custom Catalyst Expression Example
439
440
```scala
441
case class CustomUpper(child: Expression) extends UnaryExpression {
442
override def dataType: DataType = StringType
443
override def nullable: Boolean = child.nullable
444
445
override def eval(input: InternalRow): Any = {
446
val value = child.eval(input)
447
if (value == null) null else value.toString.toUpperCase
448
}
449
450
// Code generation for performance
451
override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = {
452
val childGen = child.genCode(ctx)
453
val upperFunc = ctx.addNewFunction("upperCase",
454
s"""
455
|private String upperCase(String input) {
456
| return input == null ? null : input.toUpperCase();
457
|}
458
""".stripMargin)
459
460
ev.copy(code = s"""
461
|${childGen.code}
462
|boolean ${ev.isNull} = ${childGen.isNull};
463
|String ${ev.value} = ${ev.isNull} ? null : $upperFunc(${childGen.value});
464
""".stripMargin)
465
}
466
}
467
```
468
469
### Literal Expressions
470
471
```scala { .api }
472
case class Literal(value: Any, dataType: DataType) extends LeafExpression {
473
override def nullable: Boolean = value == null
474
override def eval(input: InternalRow): Any = value
475
}
476
477
object Literal {
478
val TrueLiteral: Literal = Literal(true, BooleanType)
479
val FalseLiteral: Literal = Literal(false, BooleanType)
480
481
def apply(v: Any): Literal = v match {
482
case null => Literal(null, NullType)
483
case i: Int => Literal(i, IntegerType)
484
case l: Long => Literal(l, LongType)
485
case d: Double => Literal(d, DoubleType)
486
case s: String => Literal(UTF8String.fromString(s), StringType)
487
// ... other types
488
}
489
}
490
```
491
492
## Expression Utilities
493
494
### AttributeMap
495
496
Efficient map keyed by expression attributes:
497
498
```scala { .api }
499
class AttributeMap[A](val baseMap: Map[ExprId, (Attribute, A)])
500
extends Map[Attribute, A] {
501
502
def get(k: Attribute): Option[A]
503
def contains(k: Attribute): Boolean
504
def +(kv: (Attribute, A)): AttributeMap[A]
505
def ++(other: AttributeMap[A]): AttributeMap[A]
506
}
507
```
508
509
### ExpressionSet
510
511
Set collection with semantic equality for expressions:
512
513
```scala { .api }
514
class ExpressionSet(val baseSet: Set[Expression]) extends Set[Expression] {
515
// Provides set operations with expression semantic equality
516
}
517
```
518
519
### UnsafeRow
520
521
High-performance row implementation:
522
523
```java { .api }
524
public final class UnsafeRow extends InternalRow {
525
public static final int WORD_SIZE = 8;
526
527
public static int calculateBitSetWidthInBytes(int numFields);
528
public static boolean isFixedLength(DataType dt);
529
public static boolean isMutable(DataType dt);
530
531
// Efficient field access methods
532
public boolean isNullAt(int ordinal);
533
public int getInt(int ordinal);
534
public long getLong(int ordinal);
535
public UTF8String getUTF8String(int ordinal);
536
// ... other typed getters
537
}
538
```
539
540
## Code Generation Support
541
542
### BufferHolder and UnsafeRowWriter
543
544
For high-performance expression evaluation:
545
546
```java { .api }
547
public final class BufferHolder {
548
public BufferHolder(UnsafeRow row, int initialSize);
549
public void grow(int neededSize);
550
public byte[] getBuffer();
551
}
552
553
public final class UnsafeRowWriter {
554
public UnsafeRowWriter(BufferHolder holder, int numFields);
555
public void initialize();
556
public void write(int ordinal, boolean value);
557
public void write(int ordinal, int value);
558
public void write(int ordinal, long value);
559
public void write(int ordinal, UTF8String value);
560
// ... other write methods
561
}
562
```
563
564
**Code Generation Example:**
565
566
```java
567
public class OptimizedExpressionEvaluator {
568
public static InternalRow evaluateRow(Expression[] expressions, InternalRow input) {
569
UnsafeRow result = new UnsafeRow(expressions.length);
570
BufferHolder bufferHolder = new BufferHolder(result, 64);
571
UnsafeRowWriter writer = new UnsafeRowWriter(bufferHolder, expressions.length);
572
writer.initialize();
573
574
for (int i = 0; i < expressions.length; i++) {
575
Object value = expressions[i].eval(input);
576
if (value == null) {
577
writer.setNullAt(i);
578
} else {
579
// Write typed value based on expression data type
580
DataType dataType = expressions[i].dataType();
581
writeTypedValue(writer, i, value, dataType);
582
}
583
}
584
585
result.pointTo(bufferHolder.getBuffer(), bufferHolder.totalSize());
586
return result;
587
}
588
}
589
```
590
591
## Advanced Expression Patterns
592
593
### Expression Trees
594
595
Build complex expression trees:
596
597
```java
598
public class ExpressionTreeBuilder {
599
public static Expression buildComplexFilter(Map<String, Object> criteria) {
600
List<Predicate> predicates = new ArrayList<>();
601
602
for (Map.Entry<String, Object> entry : criteria.entrySet()) {
603
String column = entry.getKey();
604
Object value = entry.getValue();
605
606
if (value instanceof List) {
607
// IN predicate for multiple values
608
List<?> values = (List<?>) value;
609
predicates.add(new In(
610
Expressions.column(column),
611
values.stream()
612
.map(Expressions::literal)
613
.toArray(Expression[]::new)
614
));
615
} else {
616
// Equality predicate
617
predicates.add(new EqualTo(
618
Expressions.column(column),
619
Expressions.literal(value)
620
));
621
}
622
}
623
624
// Combine all predicates with AND
625
return predicates.stream()
626
.reduce((p1, p2) -> new And(p1, p2))
627
.orElse(new AlwaysTrue());
628
}
629
}
630
```
631
632
### Expression Visitor Pattern
633
634
```java
635
public abstract class ExpressionVisitor<T> {
636
public T visit(Expression expr) {
637
if (expr instanceof NamedReference) {
638
return visitNamedReference((NamedReference) expr);
639
} else if (expr instanceof Literal) {
640
return visitLiteral((Literal) expr);
641
} else if (expr instanceof And) {
642
return visitAnd((And) expr);
643
} else if (expr instanceof Or) {
644
return visitOr((Or) expr);
645
}
646
// ... handle other expression types
647
return visitDefault(expr);
648
}
649
650
protected abstract T visitNamedReference(NamedReference ref);
651
protected abstract T visitLiteral(Literal literal);
652
protected abstract T visitAnd(And and);
653
protected abstract T visitOr(Or or);
654
protected abstract T visitDefault(Expression expr);
655
}
656
657
// Example: Extract all column references
658
public class ColumnExtractor extends ExpressionVisitor<Set<String>> {
659
@Override
660
protected Set<String> visitNamedReference(NamedReference ref) {
661
return Set.of(String.join(".", ref.fieldNames()));
662
}
663
664
@Override
665
protected Set<String> visitAnd(And and) {
666
Set<String> result = new HashSet<>();
667
result.addAll(visit(and.left()));
668
result.addAll(visit(and.right()));
669
return result;
670
}
671
672
@Override
673
protected Set<String> visitOr(Or or) {
674
Set<String> result = new HashSet<>();
675
result.addAll(visit(or.left()));
676
result.addAll(visit(or.right()));
677
return result;
678
}
679
680
@Override
681
protected Set<String> visitDefault(Expression expr) {
682
Set<String> result = new HashSet<>();
683
for (Expression child : expr.children()) {
684
result.addAll(visit(child));
685
}
686
return result;
687
}
688
}
689
```
690
691
## Performance Considerations
692
693
### Expression Evaluation Optimization
694
695
```java
696
public class OptimizedExpressionEvaluator {
697
private final Expression[] expressions;
698
private final boolean[] isConstant;
699
private final Object[] constantValues;
700
701
public OptimizedExpressionEvaluator(Expression[] expressions) {
702
this.expressions = expressions;
703
this.isConstant = new boolean[expressions.length];
704
this.constantValues = new Object[expressions.length];
705
706
// Pre-evaluate constant expressions
707
for (int i = 0; i < expressions.length; i++) {
708
if (isConstantExpression(expressions[i])) {
709
isConstant[i] = true;
710
constantValues[i] = expressions[i].eval(EmptyRow.INSTANCE);
711
}
712
}
713
}
714
715
public Object[] evaluate(InternalRow row) {
716
Object[] result = new Object[expressions.length];
717
for (int i = 0; i < expressions.length; i++) {
718
if (isConstant[i]) {
719
result[i] = constantValues[i];
720
} else {
721
result[i] = expressions[i].eval(row);
722
}
723
}
724
return result;
725
}
726
}
727
```
728
729
The Expression APIs provide a comprehensive, extensible framework for building sophisticated data processing logic with high performance and type safety. They form the foundation for Spark's powerful SQL optimization and execution capabilities.