CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-apache-flink--flink-table-planner-blink-2-12

Apache Flink's Table API Planner Blink module providing sophisticated SQL and Table API query planning and execution engine with advanced query optimization, code generation, and comprehensive support for both streaming and batch workloads.

Pending
Overview
Eval results
Files

expression-system.mddocs/

Expression System

The expression system handles SQL expressions and Table API expressions, providing conversion between different expression representations, type inference, validation, and integration with code generation for optimal execution performance.

Capabilities

Expression Conversion

Core functionality for converting between different expression representations used throughout the planning pipeline.

/**
 * Expression converter for transforming between expression representations
 */
trait ExpressionConverter {
  
  /**
   * Converts Table API expression to Calcite RexNode representation
   * @param expression Table API expression to convert
   * @return RexNode representation for Calcite optimization
   */
  def convertToRexNode(expression: Expression): RexNode
  
  /**
   * Converts Calcite RexNode back to Table API expression
   * @param rexNode Calcite RexNode to convert
   * @return Table API expression representation
   */
  def convertToExpression(rexNode: RexNode): Expression
  
  /**
   * Converts SQL expression string to RexNode
   * @param sqlExpression SQL expression as string
   * @param inputRowType Input row type for context
   * @return Parsed and validated RexNode
   */
  def convertSqlToRexNode(sqlExpression: String, inputRowType: RelDataType): RexNode
  
  /**
   * Converts expression with type coercion
   * @param expression Expression to convert
   * @param targetType Target type for coercion
   * @return Converted expression with type coercion applied
   */
  def convertWithCoercion(expression: Expression, targetType: DataType): RexNode
}

Expression Validation

Validation system for ensuring expression correctness and type safety:

/**
 * Expression validation utilities
 */
object ExpressionValidator {
  
  /**
   * Validates expression syntax and semantics
   * @param expression Expression to validate
   * @param inputType Input data type context
   * @throws ValidationException if expression is invalid
   */
  def validateExpression(expression: Expression, inputType: RowType): Unit
  
  /**
   * Validates RexNode expression in Calcite context
   * @param rexNode RexNode to validate
   * @param rexBuilder RexBuilder for validation context
   * @throws CalciteException if RexNode is invalid
   */
  def validateRexNode(rexNode: RexNode, rexBuilder: RexBuilder): Unit
  
  /**
   * Validates expression type compatibility
   * @param sourceType Source expression type
   * @param targetType Target expected type
   * @return True if types are compatible, false otherwise
   */
  def isTypeCompatible(sourceType: DataType, targetType: DataType): Boolean
  
  /**
   * Validates function call expression
   * @param functionCall Function call expression
   * @param functionCatalog Function catalog for resolution
   * @throws ValidationException if function call is invalid
   */
  def validateFunctionCall(
    functionCall: CallExpression, 
    functionCatalog: FunctionCatalog
  ): Unit
}

Type Inference System

Comprehensive type inference for expressions and operations:

/**
 * Type inference engine for expressions
 */
object TypeInferenceEngine {
  
  /**
   * Infers result type of binary operation
   * @param leftType Left operand type
   * @param rightType Right operand type  
   * @param operator Binary operator
   * @return Inferred result type
   */
  def inferBinaryOperationType(
    leftType: DataType,
    rightType: DataType,
    operator: BinaryOperator
  ): DataType
  
  /**
   * Infers result type of function call
   * @param functionDefinition Function definition
   * @param argumentTypes Argument types
   * @return Inferred result type
   */
  def inferFunctionCallType(
    functionDefinition: FunctionDefinition,
    argumentTypes: List[DataType]
  ): DataType
  
  /**
   * Infers type with nullability consideration
   * @param operandTypes Types of operands
   * @param operation Operation being performed
   * @return Inferred type with proper nullability
   */
  def inferTypeWithNullability(
    operandTypes: List[DataType],
    operation: Operation
  ): DataType
  
  /**
   * Finds common type for multiple expressions (for CASE, UNION, etc.)
   * @param types List of types to find common type for
   * @return Common super type, or failure if no common type exists
   */
  def findCommonType(types: List[DataType]): Option[DataType]
}

Expression Utilities

Utility functions for working with expressions:

/**
 * Expression utility functions
 */
object ExpressionUtils {
  
  /**
   * Extracts referenced field names from expression
   * @param expression Expression to analyze
   * @return Set of field names referenced in expression
   */
  def extractFieldNames(expression: Expression): Set[String]
  
  /**
   * Checks if expression is deterministic (always returns same result for same input)
   * @param expression Expression to check
   * @return True if expression is deterministic
   */
  def isDeterministic(expression: Expression): Boolean
  
  /**
   * Simplifies expression by applying constant folding and other optimizations
   * @param expression Expression to simplify
   * @param rexBuilder RexBuilder for creating simplified expressions
   * @return Simplified expression
   */
  def simplifyExpression(expression: RexNode, rexBuilder: RexBuilder): RexNode
  
  /**
   * Converts expression to conjunctive normal form (CNF)
   * @param expression Boolean expression to convert
   * @return Expression in CNF
   */
  def toCNF(expression: RexNode): RexNode
  
  /**
   * Splits conjunctive expression into individual conjuncts
   * @param expression AND-connected expression
   * @return List of individual conjunct expressions
   */
  def splitConjunction(expression: RexNode): List[RexNode]
}

RexNode Integration

Deep integration with Apache Calcite's RexNode expression system:

/**
 * Calcite RexNode utilities for Flink integration
 */
public class FlinkRexUtil {
  
  /**
   * Converts Flink DataType to Calcite RelDataType
   * @param dataType Flink data type
   * @param typeFactory Calcite type factory
   * @return Calcite RelDataType representation
   */
  public static RelDataType toRelDataType(DataType dataType, RelDataTypeFactory typeFactory);
  
  /**
   * Converts Calcite RelDataType to Flink DataType
   * @param relDataType Calcite rel data type
   * @return Flink DataType representation
   */
  public static DataType toDataType(RelDataType relDataType);
  
  /**
   * Creates RexNode for field access
   * @param fieldIndex Index of field to access
   * @param inputRowType Input row type
   * @param rexBuilder RexBuilder for node creation
   * @return RexNode for field access
   */
  public static RexNode createFieldAccess(
    int fieldIndex, 
    RelDataType inputRowType,
    RexBuilder rexBuilder
  );
  
  /**
   * Creates RexNode for literal value
   * @param value Literal value
   * @param dataType Type of the literal
   * @param rexBuilder RexBuilder for node creation
   * @return RexNode for literal value
   */
  public static RexNode createLiteral(Object value, DataType dataType, RexBuilder rexBuilder);
}

Expression Code Generation Integration

Integration with the code generation system for optimized expression evaluation:

/**
 * Expression code generation utilities
 */
object ExpressionCodeGenerator {
  
  /**
   * Generates code for expression evaluation
   * @param expression Expression to generate code for
   * @param inputType Input row type
   * @param config Table configuration
   * @param classLoader Class loader for generated code
   * @return Generated expression evaluation code
   */
  def generateExpression(
    expression: RexNode,
    inputType: RowType,
    config: TableConfig,
    classLoader: ClassLoader
  ): GeneratedExpression
  
  /**
   * Generates code for conditional expression (CASE/IF)
   * @param condition Condition expression
   * @param trueExpr Expression for true case
   * @param falseExpr Expression for false case
   * @return Generated conditional expression code
   */
  def generateConditional(
    condition: GeneratedExpression,
    trueExpr: GeneratedExpression,
    falseExpr: GeneratedExpression
  ): GeneratedExpression
  
  /**
   * Generates code for null-safe expression evaluation
   * @param expression Expression that may produce nulls
   * @param nullDefault Default value for null results
   * @return Generated null-safe expression code
   */
  def generateNullSafeExpression(
    expression: GeneratedExpression,
    nullDefault: GeneratedExpression
  ): GeneratedExpression
}

/**
 * Generated expression representation
 */
case class GeneratedExpression(
  resultTerm: String,          // Generated code term for result
  resultType: DataType,        // Result data type
  nullTerm: String,           // Generated code term for null check
  code: String                // Generated Java code
) {
  /**
   * Deep copy of generated expression with new terms
   * @param newResultTerm New result term
   * @param newNullTerm New null term
   * @return Copied expression with updated terms
   */
  def copy(newResultTerm: String = resultTerm, newNullTerm: String = nullTerm): GeneratedExpression
}

Built-in Expression Support

Support for standard SQL expressions and operators:

/**
 * Built-in expression definitions
 */
object BuiltInExpressions {
  
  // Arithmetic operators
  val PLUS: BinaryOperator
  val MINUS: BinaryOperator  
  val MULTIPLY: BinaryOperator
  val DIVIDE: BinaryOperator
  val MOD: BinaryOperator
  
  // Comparison operators
  val EQUALS: BinaryOperator
  val NOT_EQUALS: BinaryOperator
  val LESS_THAN: BinaryOperator
  val LESS_THAN_OR_EQUAL: BinaryOperator
  val GREATER_THAN: BinaryOperator
  val GREATER_THAN_OR_EQUAL: BinaryOperator
  
  // Logical operators
  val AND: BinaryOperator
  val OR: BinaryOperator
  val NOT: UnaryOperator
  
  // String operators
  val LIKE: BinaryOperator
  val SIMILAR_TO: BinaryOperator
  val SUBSTRING: FunctionDefinition
  val TRIM: FunctionDefinition
  
  // Null handling
  val IS_NULL: UnaryOperator
  val IS_NOT_NULL: UnaryOperator
  val COALESCE: FunctionDefinition
  val NULLIF: FunctionDefinition
  
  // Case expression
  val CASE: FunctionDefinition
  
  // Type conversion
  val CAST: FunctionDefinition
  val TRY_CAST: FunctionDefinition
}

Advanced Expression Features

Window Function Expressions

Support for window function expressions in streaming and batch contexts:

/**
 * Window function expression utilities
 */
object WindowExpressionUtils {
  
  /**
   * Creates window function call expression
   * @param functionDefinition Window function definition
   * @param arguments Function arguments
   * @param partitionKeys Partition by keys
   * @param orderKeys Order by keys
   * @param windowFrame Window frame specification
   * @return Window function call expression
   */
  def createWindowFunctionCall(
    functionDefinition: FunctionDefinition,
    arguments: List[Expression],
    partitionKeys: List[Expression],
    orderKeys: List[OrderByExpression],
    windowFrame: WindowFrame
  ): CallExpression
}

Time and Watermark Expressions

Special support for time-related expressions in streaming contexts:

/**
 * Time and watermark expression utilities
 */
object TimeExpressionUtils {
  
  /**
   * Creates watermark expression for event time processing
   * @param rowtimeExpression Rowtime field expression
   * @param delayExpression Watermark delay expression
   * @return Watermark generation expression
   */
  def createWatermarkExpression(
    rowtimeExpression: Expression,
    delayExpression: Expression
  ): Expression
  
  /**
   * Creates proctime attribute expression
   * @return Processing time attribute expression
   */
  def createProctimeExpression(): Expression
  
  /**
   * Validates time attribute expression
   * @param expression Time attribute expression
   * @return True if valid time attribute
   */
  def isValidTimeAttribute(expression: Expression): Boolean
}

Error Handling

Expression system error handling and debugging:

// Common expression validation errors
try {
  ExpressionValidator.validateExpression(expr, inputType)
} catch {
  case e: ValidationException =>
    // Handle validation errors:
    // - Type mismatch
    // - Unknown function reference
    // - Invalid field reference
    // - Null handling issues
    
  case e: CalciteException =>
    // Handle Calcite-specific errors:
    // - SQL parsing errors
    // - RexNode validation failures
    // - Type system inconsistencies
}

// Type inference failures
val resultType = try {
  TypeInferenceEngine.inferFunctionCallType(funcDef, argTypes)
} catch {
  case e: TypeInferenceException =>
    // Handle type inference failures:
    // - No matching function signature
    // - Ambiguous function overloads
    // - Incompatible argument types
    None
}

Install with Tessl CLI

npx tessl i tessl/maven-org-apache-flink--flink-table-planner-blink-2-12

docs

catalog-integration.md

code-generation.md

expression-system.md

function-integration.md

index.md

planner-factory.md

query-planning.md

tile.json