0
# Expression System
1
2
The expression system handles SQL expressions and Table API expressions, providing conversion between different expression representations, type inference, validation, and integration with code generation for optimal execution performance.
3
4
## Capabilities
5
6
### Expression Conversion
7
8
Core functionality for converting between different expression representations used throughout the planning pipeline.
9
10
```scala { .api }
11
/**
12
* Expression converter for transforming between expression representations
13
*/
14
trait ExpressionConverter {
15
16
/**
17
* Converts Table API expression to Calcite RexNode representation
18
* @param expression Table API expression to convert
19
* @return RexNode representation for Calcite optimization
20
*/
21
def convertToRexNode(expression: Expression): RexNode
22
23
/**
24
* Converts Calcite RexNode back to Table API expression
25
* @param rexNode Calcite RexNode to convert
26
* @return Table API expression representation
27
*/
28
def convertToExpression(rexNode: RexNode): Expression
29
30
/**
31
* Converts SQL expression string to RexNode
32
* @param sqlExpression SQL expression as string
33
* @param inputRowType Input row type for context
34
* @return Parsed and validated RexNode
35
*/
36
def convertSqlToRexNode(sqlExpression: String, inputRowType: RelDataType): RexNode
37
38
/**
39
* Converts expression with type coercion
40
* @param expression Expression to convert
41
* @param targetType Target type for coercion
42
* @return Converted expression with type coercion applied
43
*/
44
def convertWithCoercion(expression: Expression, targetType: DataType): RexNode
45
}
46
```
47
48
### Expression Validation
49
50
Validation system for ensuring expression correctness and type safety:
51
52
```scala { .api }
53
/**
54
* Expression validation utilities
55
*/
56
object ExpressionValidator {
57
58
/**
59
* Validates expression syntax and semantics
60
* @param expression Expression to validate
61
* @param inputType Input data type context
62
* @throws ValidationException if expression is invalid
63
*/
64
def validateExpression(expression: Expression, inputType: RowType): Unit
65
66
/**
67
* Validates RexNode expression in Calcite context
68
* @param rexNode RexNode to validate
69
* @param rexBuilder RexBuilder for validation context
70
* @throws CalciteException if RexNode is invalid
71
*/
72
def validateRexNode(rexNode: RexNode, rexBuilder: RexBuilder): Unit
73
74
/**
75
* Validates expression type compatibility
76
* @param sourceType Source expression type
77
* @param targetType Target expected type
78
* @return True if types are compatible, false otherwise
79
*/
80
def isTypeCompatible(sourceType: DataType, targetType: DataType): Boolean
81
82
/**
83
* Validates function call expression
84
* @param functionCall Function call expression
85
* @param functionCatalog Function catalog for resolution
86
* @throws ValidationException if function call is invalid
87
*/
88
def validateFunctionCall(
89
functionCall: CallExpression,
90
functionCatalog: FunctionCatalog
91
): Unit
92
}
93
```
94
95
### Type Inference System
96
97
Comprehensive type inference for expressions and operations:
98
99
```scala { .api }
100
/**
101
* Type inference engine for expressions
102
*/
103
object TypeInferenceEngine {
104
105
/**
106
* Infers result type of binary operation
107
* @param leftType Left operand type
108
* @param rightType Right operand type
109
* @param operator Binary operator
110
* @return Inferred result type
111
*/
112
def inferBinaryOperationType(
113
leftType: DataType,
114
rightType: DataType,
115
operator: BinaryOperator
116
): DataType
117
118
/**
119
* Infers result type of function call
120
* @param functionDefinition Function definition
121
* @param argumentTypes Argument types
122
* @return Inferred result type
123
*/
124
def inferFunctionCallType(
125
functionDefinition: FunctionDefinition,
126
argumentTypes: List[DataType]
127
): DataType
128
129
/**
130
* Infers type with nullability consideration
131
* @param operandTypes Types of operands
132
* @param operation Operation being performed
133
* @return Inferred type with proper nullability
134
*/
135
def inferTypeWithNullability(
136
operandTypes: List[DataType],
137
operation: Operation
138
): DataType
139
140
/**
141
* Finds common type for multiple expressions (for CASE, UNION, etc.)
142
* @param types List of types to find common type for
143
* @return Common super type, or failure if no common type exists
144
*/
145
def findCommonType(types: List[DataType]): Option[DataType]
146
}
147
```
148
149
### Expression Utilities
150
151
Utility functions for working with expressions:
152
153
```scala { .api }
154
/**
155
* Expression utility functions
156
*/
157
object ExpressionUtils {
158
159
/**
160
* Extracts referenced field names from expression
161
* @param expression Expression to analyze
162
* @return Set of field names referenced in expression
163
*/
164
def extractFieldNames(expression: Expression): Set[String]
165
166
/**
167
* Checks if expression is deterministic (always returns same result for same input)
168
* @param expression Expression to check
169
* @return True if expression is deterministic
170
*/
171
def isDeterministic(expression: Expression): Boolean
172
173
/**
174
* Simplifies expression by applying constant folding and other optimizations
175
* @param expression Expression to simplify
176
* @param rexBuilder RexBuilder for creating simplified expressions
177
* @return Simplified expression
178
*/
179
def simplifyExpression(expression: RexNode, rexBuilder: RexBuilder): RexNode
180
181
/**
182
* Converts expression to conjunctive normal form (CNF)
183
* @param expression Boolean expression to convert
184
* @return Expression in CNF
185
*/
186
def toCNF(expression: RexNode): RexNode
187
188
/**
189
* Splits conjunctive expression into individual conjuncts
190
* @param expression AND-connected expression
191
* @return List of individual conjunct expressions
192
*/
193
def splitConjunction(expression: RexNode): List[RexNode]
194
}
195
```
196
197
## RexNode Integration
198
199
Deep integration with Apache Calcite's RexNode expression system:
200
201
```java { .api }
202
/**
203
* Calcite RexNode utilities for Flink integration
204
*/
205
public class FlinkRexUtil {
206
207
/**
208
* Converts Flink DataType to Calcite RelDataType
209
* @param dataType Flink data type
210
* @param typeFactory Calcite type factory
211
* @return Calcite RelDataType representation
212
*/
213
public static RelDataType toRelDataType(DataType dataType, RelDataTypeFactory typeFactory);
214
215
/**
216
* Converts Calcite RelDataType to Flink DataType
217
* @param relDataType Calcite rel data type
218
* @return Flink DataType representation
219
*/
220
public static DataType toDataType(RelDataType relDataType);
221
222
/**
223
* Creates RexNode for field access
224
* @param fieldIndex Index of field to access
225
* @param inputRowType Input row type
226
* @param rexBuilder RexBuilder for node creation
227
* @return RexNode for field access
228
*/
229
public static RexNode createFieldAccess(
230
int fieldIndex,
231
RelDataType inputRowType,
232
RexBuilder rexBuilder
233
);
234
235
/**
236
* Creates RexNode for literal value
237
* @param value Literal value
238
* @param dataType Type of the literal
239
* @param rexBuilder RexBuilder for node creation
240
* @return RexNode for literal value
241
*/
242
public static RexNode createLiteral(Object value, DataType dataType, RexBuilder rexBuilder);
243
}
244
```
245
246
### Expression Code Generation Integration
247
248
Integration with the code generation system for optimized expression evaluation:
249
250
```scala { .api }
251
/**
252
* Expression code generation utilities
253
*/
254
object ExpressionCodeGenerator {
255
256
/**
257
* Generates code for expression evaluation
258
* @param expression Expression to generate code for
259
* @param inputType Input row type
260
* @param config Table configuration
261
* @param classLoader Class loader for generated code
262
* @return Generated expression evaluation code
263
*/
264
def generateExpression(
265
expression: RexNode,
266
inputType: RowType,
267
config: TableConfig,
268
classLoader: ClassLoader
269
): GeneratedExpression
270
271
/**
272
* Generates code for conditional expression (CASE/IF)
273
* @param condition Condition expression
274
* @param trueExpr Expression for true case
275
* @param falseExpr Expression for false case
276
* @return Generated conditional expression code
277
*/
278
def generateConditional(
279
condition: GeneratedExpression,
280
trueExpr: GeneratedExpression,
281
falseExpr: GeneratedExpression
282
): GeneratedExpression
283
284
/**
285
* Generates code for null-safe expression evaluation
286
* @param expression Expression that may produce nulls
287
* @param nullDefault Default value for null results
288
* @return Generated null-safe expression code
289
*/
290
def generateNullSafeExpression(
291
expression: GeneratedExpression,
292
nullDefault: GeneratedExpression
293
): GeneratedExpression
294
}
295
296
/**
297
* Generated expression representation
298
*/
299
case class GeneratedExpression(
300
resultTerm: String, // Generated code term for result
301
resultType: DataType, // Result data type
302
nullTerm: String, // Generated code term for null check
303
code: String // Generated Java code
304
) {
305
/**
306
* Deep copy of generated expression with new terms
307
* @param newResultTerm New result term
308
* @param newNullTerm New null term
309
* @return Copied expression with updated terms
310
*/
311
def copy(newResultTerm: String = resultTerm, newNullTerm: String = nullTerm): GeneratedExpression
312
}
313
```
314
315
## Built-in Expression Support
316
317
Support for standard SQL expressions and operators:
318
319
```scala { .api }
320
/**
321
* Built-in expression definitions
322
*/
323
object BuiltInExpressions {
324
325
// Arithmetic operators
326
val PLUS: BinaryOperator
327
val MINUS: BinaryOperator
328
val MULTIPLY: BinaryOperator
329
val DIVIDE: BinaryOperator
330
val MOD: BinaryOperator
331
332
// Comparison operators
333
val EQUALS: BinaryOperator
334
val NOT_EQUALS: BinaryOperator
335
val LESS_THAN: BinaryOperator
336
val LESS_THAN_OR_EQUAL: BinaryOperator
337
val GREATER_THAN: BinaryOperator
338
val GREATER_THAN_OR_EQUAL: BinaryOperator
339
340
// Logical operators
341
val AND: BinaryOperator
342
val OR: BinaryOperator
343
val NOT: UnaryOperator
344
345
// String operators
346
val LIKE: BinaryOperator
347
val SIMILAR_TO: BinaryOperator
348
val SUBSTRING: FunctionDefinition
349
val TRIM: FunctionDefinition
350
351
// Null handling
352
val IS_NULL: UnaryOperator
353
val IS_NOT_NULL: UnaryOperator
354
val COALESCE: FunctionDefinition
355
val NULLIF: FunctionDefinition
356
357
// Case expression
358
val CASE: FunctionDefinition
359
360
// Type conversion
361
val CAST: FunctionDefinition
362
val TRY_CAST: FunctionDefinition
363
}
364
```
365
366
## Advanced Expression Features
367
368
### Window Function Expressions
369
370
Support for window function expressions in streaming and batch contexts:
371
372
```scala { .api }
373
/**
374
* Window function expression utilities
375
*/
376
object WindowExpressionUtils {
377
378
/**
379
* Creates window function call expression
380
* @param functionDefinition Window function definition
381
* @param arguments Function arguments
382
* @param partitionKeys Partition by keys
383
* @param orderKeys Order by keys
384
* @param windowFrame Window frame specification
385
* @return Window function call expression
386
*/
387
def createWindowFunctionCall(
388
functionDefinition: FunctionDefinition,
389
arguments: List[Expression],
390
partitionKeys: List[Expression],
391
orderKeys: List[OrderByExpression],
392
windowFrame: WindowFrame
393
): CallExpression
394
}
395
```
396
397
### Time and Watermark Expressions
398
399
Special support for time-related expressions in streaming contexts:
400
401
```scala { .api }
402
/**
403
* Time and watermark expression utilities
404
*/
405
object TimeExpressionUtils {
406
407
/**
408
* Creates watermark expression for event time processing
409
* @param rowtimeExpression Rowtime field expression
410
* @param delayExpression Watermark delay expression
411
* @return Watermark generation expression
412
*/
413
def createWatermarkExpression(
414
rowtimeExpression: Expression,
415
delayExpression: Expression
416
): Expression
417
418
/**
419
* Creates proctime attribute expression
420
* @return Processing time attribute expression
421
*/
422
def createProctimeExpression(): Expression
423
424
/**
425
* Validates time attribute expression
426
* @param expression Time attribute expression
427
* @return True if valid time attribute
428
*/
429
def isValidTimeAttribute(expression: Expression): Boolean
430
}
431
```
432
433
## Error Handling
434
435
Expression system error handling and debugging:
436
437
```scala
438
// Common expression validation errors
439
try {
440
ExpressionValidator.validateExpression(expr, inputType)
441
} catch {
442
case e: ValidationException =>
443
// Handle validation errors:
444
// - Type mismatch
445
// - Unknown function reference
446
// - Invalid field reference
447
// - Null handling issues
448
449
case e: CalciteException =>
450
// Handle Calcite-specific errors:
451
// - SQL parsing errors
452
// - RexNode validation failures
453
// - Type system inconsistencies
454
}
455
456
// Type inference failures
457
val resultType = try {
458
TypeInferenceEngine.inferFunctionCallType(funcDef, argTypes)
459
} catch {
460
case e: TypeInferenceException =>
461
// Handle type inference failures:
462
// - No matching function signature
463
// - Ambiguous function overloads
464
// - Incompatible argument types
465
None
466
}
467
```