Apache Flink's Table API Planner Blink module providing sophisticated SQL and Table API query planning and execution engine with advanced query optimization, code generation, and comprehensive support for both streaming and batch workloads.
—
The expression system handles SQL expressions and Table API expressions, providing conversion between different expression representations, type inference, validation, and integration with code generation for optimal execution performance.
Core functionality for converting between different expression representations used throughout the planning pipeline.
/**
* Expression converter for transforming between expression representations
*/
trait ExpressionConverter {
/**
* Converts Table API expression to Calcite RexNode representation
* @param expression Table API expression to convert
* @return RexNode representation for Calcite optimization
*/
def convertToRexNode(expression: Expression): RexNode
/**
* Converts Calcite RexNode back to Table API expression
* @param rexNode Calcite RexNode to convert
* @return Table API expression representation
*/
def convertToExpression(rexNode: RexNode): Expression
/**
* Converts SQL expression string to RexNode
* @param sqlExpression SQL expression as string
* @param inputRowType Input row type for context
* @return Parsed and validated RexNode
*/
def convertSqlToRexNode(sqlExpression: String, inputRowType: RelDataType): RexNode
/**
* Converts expression with type coercion
* @param expression Expression to convert
* @param targetType Target type for coercion
* @return Converted expression with type coercion applied
*/
def convertWithCoercion(expression: Expression, targetType: DataType): RexNode
}Validation system for ensuring expression correctness and type safety:
/**
* Expression validation utilities
*/
object ExpressionValidator {
/**
* Validates expression syntax and semantics
* @param expression Expression to validate
* @param inputType Input data type context
* @throws ValidationException if expression is invalid
*/
def validateExpression(expression: Expression, inputType: RowType): Unit
/**
* Validates RexNode expression in Calcite context
* @param rexNode RexNode to validate
* @param rexBuilder RexBuilder for validation context
* @throws CalciteException if RexNode is invalid
*/
def validateRexNode(rexNode: RexNode, rexBuilder: RexBuilder): Unit
/**
* Validates expression type compatibility
* @param sourceType Source expression type
* @param targetType Target expected type
* @return True if types are compatible, false otherwise
*/
def isTypeCompatible(sourceType: DataType, targetType: DataType): Boolean
/**
* Validates function call expression
* @param functionCall Function call expression
* @param functionCatalog Function catalog for resolution
* @throws ValidationException if function call is invalid
*/
def validateFunctionCall(
functionCall: CallExpression,
functionCatalog: FunctionCatalog
): Unit
}Comprehensive type inference for expressions and operations:
/**
* Type inference engine for expressions
*/
object TypeInferenceEngine {
/**
* Infers result type of binary operation
* @param leftType Left operand type
* @param rightType Right operand type
* @param operator Binary operator
* @return Inferred result type
*/
def inferBinaryOperationType(
leftType: DataType,
rightType: DataType,
operator: BinaryOperator
): DataType
/**
* Infers result type of function call
* @param functionDefinition Function definition
* @param argumentTypes Argument types
* @return Inferred result type
*/
def inferFunctionCallType(
functionDefinition: FunctionDefinition,
argumentTypes: List[DataType]
): DataType
/**
* Infers type with nullability consideration
* @param operandTypes Types of operands
* @param operation Operation being performed
* @return Inferred type with proper nullability
*/
def inferTypeWithNullability(
operandTypes: List[DataType],
operation: Operation
): DataType
/**
* Finds common type for multiple expressions (for CASE, UNION, etc.)
* @param types List of types to find common type for
* @return Common super type, or failure if no common type exists
*/
def findCommonType(types: List[DataType]): Option[DataType]
}Utility functions for working with expressions:
/**
* Expression utility functions
*/
object ExpressionUtils {
/**
* Extracts referenced field names from expression
* @param expression Expression to analyze
* @return Set of field names referenced in expression
*/
def extractFieldNames(expression: Expression): Set[String]
/**
* Checks if expression is deterministic (always returns same result for same input)
* @param expression Expression to check
* @return True if expression is deterministic
*/
def isDeterministic(expression: Expression): Boolean
/**
* Simplifies expression by applying constant folding and other optimizations
* @param expression Expression to simplify
* @param rexBuilder RexBuilder for creating simplified expressions
* @return Simplified expression
*/
def simplifyExpression(expression: RexNode, rexBuilder: RexBuilder): RexNode
/**
* Converts expression to conjunctive normal form (CNF)
* @param expression Boolean expression to convert
* @return Expression in CNF
*/
def toCNF(expression: RexNode): RexNode
/**
* Splits conjunctive expression into individual conjuncts
* @param expression AND-connected expression
* @return List of individual conjunct expressions
*/
def splitConjunction(expression: RexNode): List[RexNode]
}Deep integration with Apache Calcite's RexNode expression system:
/**
* Calcite RexNode utilities for Flink integration
*/
public class FlinkRexUtil {
/**
* Converts Flink DataType to Calcite RelDataType
* @param dataType Flink data type
* @param typeFactory Calcite type factory
* @return Calcite RelDataType representation
*/
public static RelDataType toRelDataType(DataType dataType, RelDataTypeFactory typeFactory);
/**
* Converts Calcite RelDataType to Flink DataType
* @param relDataType Calcite rel data type
* @return Flink DataType representation
*/
public static DataType toDataType(RelDataType relDataType);
/**
* Creates RexNode for field access
* @param fieldIndex Index of field to access
* @param inputRowType Input row type
* @param rexBuilder RexBuilder for node creation
* @return RexNode for field access
*/
public static RexNode createFieldAccess(
int fieldIndex,
RelDataType inputRowType,
RexBuilder rexBuilder
);
/**
* Creates RexNode for literal value
* @param value Literal value
* @param dataType Type of the literal
* @param rexBuilder RexBuilder for node creation
* @return RexNode for literal value
*/
public static RexNode createLiteral(Object value, DataType dataType, RexBuilder rexBuilder);
}Integration with the code generation system for optimized expression evaluation:
/**
* Expression code generation utilities
*/
object ExpressionCodeGenerator {
/**
* Generates code for expression evaluation
* @param expression Expression to generate code for
* @param inputType Input row type
* @param config Table configuration
* @param classLoader Class loader for generated code
* @return Generated expression evaluation code
*/
def generateExpression(
expression: RexNode,
inputType: RowType,
config: TableConfig,
classLoader: ClassLoader
): GeneratedExpression
/**
* Generates code for conditional expression (CASE/IF)
* @param condition Condition expression
* @param trueExpr Expression for true case
* @param falseExpr Expression for false case
* @return Generated conditional expression code
*/
def generateConditional(
condition: GeneratedExpression,
trueExpr: GeneratedExpression,
falseExpr: GeneratedExpression
): GeneratedExpression
/**
* Generates code for null-safe expression evaluation
* @param expression Expression that may produce nulls
* @param nullDefault Default value for null results
* @return Generated null-safe expression code
*/
def generateNullSafeExpression(
expression: GeneratedExpression,
nullDefault: GeneratedExpression
): GeneratedExpression
}
/**
* Generated expression representation
*/
case class GeneratedExpression(
resultTerm: String, // Generated code term for result
resultType: DataType, // Result data type
nullTerm: String, // Generated code term for null check
code: String // Generated Java code
) {
/**
* Deep copy of generated expression with new terms
* @param newResultTerm New result term
* @param newNullTerm New null term
* @return Copied expression with updated terms
*/
def copy(newResultTerm: String = resultTerm, newNullTerm: String = nullTerm): GeneratedExpression
}Support for standard SQL expressions and operators:
/**
* Built-in expression definitions
*/
object BuiltInExpressions {
// Arithmetic operators
val PLUS: BinaryOperator
val MINUS: BinaryOperator
val MULTIPLY: BinaryOperator
val DIVIDE: BinaryOperator
val MOD: BinaryOperator
// Comparison operators
val EQUALS: BinaryOperator
val NOT_EQUALS: BinaryOperator
val LESS_THAN: BinaryOperator
val LESS_THAN_OR_EQUAL: BinaryOperator
val GREATER_THAN: BinaryOperator
val GREATER_THAN_OR_EQUAL: BinaryOperator
// Logical operators
val AND: BinaryOperator
val OR: BinaryOperator
val NOT: UnaryOperator
// String operators
val LIKE: BinaryOperator
val SIMILAR_TO: BinaryOperator
val SUBSTRING: FunctionDefinition
val TRIM: FunctionDefinition
// Null handling
val IS_NULL: UnaryOperator
val IS_NOT_NULL: UnaryOperator
val COALESCE: FunctionDefinition
val NULLIF: FunctionDefinition
// Case expression
val CASE: FunctionDefinition
// Type conversion
val CAST: FunctionDefinition
val TRY_CAST: FunctionDefinition
}Support for window function expressions in streaming and batch contexts:
/**
* Window function expression utilities
*/
object WindowExpressionUtils {
/**
* Creates window function call expression
* @param functionDefinition Window function definition
* @param arguments Function arguments
* @param partitionKeys Partition by keys
* @param orderKeys Order by keys
* @param windowFrame Window frame specification
* @return Window function call expression
*/
def createWindowFunctionCall(
functionDefinition: FunctionDefinition,
arguments: List[Expression],
partitionKeys: List[Expression],
orderKeys: List[OrderByExpression],
windowFrame: WindowFrame
): CallExpression
}Special support for time-related expressions in streaming contexts:
/**
* Time and watermark expression utilities
*/
object TimeExpressionUtils {
/**
* Creates watermark expression for event time processing
* @param rowtimeExpression Rowtime field expression
* @param delayExpression Watermark delay expression
* @return Watermark generation expression
*/
def createWatermarkExpression(
rowtimeExpression: Expression,
delayExpression: Expression
): Expression
/**
* Creates proctime attribute expression
* @return Processing time attribute expression
*/
def createProctimeExpression(): Expression
/**
* Validates time attribute expression
* @param expression Time attribute expression
* @return True if valid time attribute
*/
def isValidTimeAttribute(expression: Expression): Boolean
}Expression system error handling and debugging:
// Common expression validation errors
try {
ExpressionValidator.validateExpression(expr, inputType)
} catch {
case e: ValidationException =>
// Handle validation errors:
// - Type mismatch
// - Unknown function reference
// - Invalid field reference
// - Null handling issues
case e: CalciteException =>
// Handle Calcite-specific errors:
// - SQL parsing errors
// - RexNode validation failures
// - Type system inconsistencies
}
// Type inference failures
val resultType = try {
TypeInferenceEngine.inferFunctionCallType(funcDef, argTypes)
} catch {
case e: TypeInferenceException =>
// Handle type inference failures:
// - No matching function signature
// - Ambiguous function overloads
// - Incompatible argument types
None
}Install with Tessl CLI
npx tessl i tessl/maven-org-apache-flink--flink-table-planner-blink-2-12