CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-apache-flink--flink-table-planner-blink-2-12

Apache Flink's Table API Planner Blink module providing sophisticated SQL and Table API query planning and execution engine with advanced query optimization, code generation, and comprehensive support for both streaming and batch workloads.

Pending
Overview
Eval results
Files

code-generation.mddocs/

Code Generation

The code generation system provides dynamic Java code generation for high-performance query execution. It generates specialized code for calculations, aggregations, projections, and other operations using the Janino compiler to achieve optimal runtime performance by eliminating interpretation overhead.

Capabilities

CalcCodeGenerator - Calculation Code Generation

Generates optimized code for general calculations including expressions, filters, and projections.

/**
 * Code generator for calculation operations and expressions
 */
object CalcCodeGenerator {
  
  /**
   * Generates a calc operator for executing calculations and projections
   * @param ctx Code generation context
   * @param inputTransform Input transformation providing the data
   * @param outputType Output row type information
   * @param projection Sequence of RexNodes for projection
   * @param condition Optional condition for filtering
   * @param retainHeader Whether to retain row header information
   * @param opName Name for the generated operator
   * @return Generated operator factory for calculation operations
   */
  def generateCalcOperator(
    ctx: CodeGeneratorContext,
    inputTransform: Transformation[RowData],
    outputType: RowType,
    projection: Seq[RexNode],
    condition: Option[RexNode],
    retainHeader: Boolean = false,
    opName: String
  ): CodeGenOperatorFactory[RowData]
  
  /**
   * Generates internal function for calculation with custom parameters (private API)
   * @param inputType Input row type
   * @param name Function name
   * @param returnType Return row type
   * @param outRowClass Output row data class
   * @param calcProjection Calculation projection nodes
   * @param calcCondition Optional calculation condition
   * @param config Table configuration
   * @return Generated function for flat map operations
   */
  private[flink] def generateFunction[T <: Function](
    inputType: RowType,
    name: String,
    returnType: RowType,
    outRowClass: Class[_ <: RowData],
    calcProjection: Seq[RexNode],
    calcCondition: Option[RexNode],
    config: TableConfig
  ): GeneratedFunction[FlatMapFunction[RowData, RowData]]
}

Usage Example:

import org.apache.flink.table.planner.codegen.CalcCodeGenerator
import org.apache.calcite.rex.RexProgram

// Generate code for a calculation (typically called internally by planner)
val rexProgram: RexProgram = // ... created during optimization
val inputType: RowType = // ... input schema
val outputType: RowType = // ... output schema

val generatedFunction = CalcCodeGenerator.generateFunction(
  rexProgram, inputType, outputType, tableConfig, classLoader
)

// Generated function can be used in Flink operators
val operator = new ProcessFunction[RowData, RowData] {
  val calc = generatedFunction.newInstance(classLoader)
  // ... use calc.apply() for processing
}

ProjectionCodeGenerator - Projection Code Generation

Generates specialized code for column projections and field access operations.

/**
 * Code generator for projection operations
 */
object ProjectionCodeGenerator {
  
  /**
   * Generates projection code for accessing specific fields
   * @param ctx Code generation context
   * @param name Name for the generated projection
   * @param inType Input row type information
   * @param outType Output row type information
   * @param inputMapping Array of field indices to project
   * @param outClass Output row data class
   * @param inputTerm Input term name for code generation
   * @param outRecordTerm Output record term name
   * @param outRecordWriterTerm Output record writer term name
   * @param reusedOutRecord Whether to reuse output record instances
   * @return Generated projection function
   */
  def generateProjection(
    ctx: CodeGeneratorContext,
    name: String,
    inType: RowType,
    outType: RowType,
    inputMapping: Array[Int],
    outClass: Class[_ <: RowData] = classOf[BinaryRowData],
    inputTerm: String = DEFAULT_INPUT1_TERM,
    outRecordTerm: String = DEFAULT_OUT_RECORD_TERM,
    outRecordWriterTerm: String = DEFAULT_OUT_RECORD_WRITER_TERM,
    reusedOutRecord: Boolean = true
  ): GeneratedProjection
  
  /**
   * Generates projection expression for field transformation
   * @param ctx Code generation context
   * @param inType Input row type information
   * @param outType Output row type information
   * @param inputMapping Array of field indices to project
   * @param outClass Output row data class
   * @param inputTerm Input term name for code generation
   * @param outRecordTerm Output record term name
   * @param outRecordWriterTerm Output record writer term name
   * @param reusedOutRecord Whether to reuse output record instances
   * @return Generated expression for projection
   */
  def generateProjectionExpression(
    ctx: CodeGeneratorContext,
    inType: RowType,
    outType: RowType,
    inputMapping: Array[Int],
    outClass: Class[_ <: RowData] = classOf[BinaryRowData],
    inputTerm: String = DEFAULT_INPUT1_TERM,
    outRecordTerm: String = DEFAULT_OUT_RECORD_TERM,
    outRecordWriterTerm: String = DEFAULT_OUT_RECORD_WRITER_TERM,
    reusedOutRecord: Boolean = true
  ): GeneratedExpression
}

/**
 * Generated projection function interface
 */
trait GeneratedProjection {
  /**
   * Applies projection to input row data
   * @param input Input row data
   * @return Projected row data
   */
  def apply(input: RowData): RowData
}

EqualiserCodeGenerator - Equality Comparison Code Generation

Generates efficient code for equality comparisons and key extraction operations.

/**
 * Code generator for equality comparisons and key operations
 */
object EqualiserCodeGenerator {
  
  /**
   * Generates equaliser for comparing row data instances
   * @param fieldTypes Types of fields to compare
   * @param config Table configuration
   * @param classLoader Class loader for generated code
   * @return Generated equaliser function
   */
  def generateRecordEqualiser(
    fieldTypes: Array[DataType],
    config: TableConfig,
    classLoader: ClassLoader
  ): GeneratedRecordEqualiser
  
  /**
   * Generates key equaliser for join and aggregation operations
   * @param keyTypes Types of key fields
   * @param nullsAreEqual Whether null values should be considered equal
   * @return Generated key equaliser
   */
  def generateKeyEqualiser(
    keyTypes: Array[DataType],
    nullsAreEqual: Boolean
  ): GeneratedEqualiser
}

/**
 * Generated equaliser interface for row comparisons
 */
trait GeneratedRecordEqualiser {
  /**
   * Tests equality between two row data instances
   * @param left First row to compare
   * @param right Second row to compare
   * @return True if rows are equal, false otherwise
   */
  def equals(left: RowData, right: RowData): Boolean
}

Aggregation Code Generation

Specialized code generation for aggregation operations including hash-based and sort-based aggregations.

/**
 * Code generator for aggregation handlers
 */
object AggsHandlerCodeGenerator {
  
  /**
   * Generates aggregation handler for processing aggregation functions
   * @param aggInfos Information about aggregation functions
   * @param inputType Input row type
   * @param grouping Grouping specification
   * @param config Table configuration
   * @return Generated aggregation handler
   */
  def generateAggsHandler(
    aggInfos: Array[AggregateInfo],
    inputType: RowType,
    grouping: Array[Int],
    config: TableConfig
  ): GeneratedAggsHandler
}

/**
 * Hash-based aggregation code generator
 */
object HashAggCodeGenerator {
  
  /**
   * Generates hash aggregation operator
   * @param aggInfos Aggregation function information
   * @param inputType Input data type
   * @param outputType Output data type
   * @param grouping Grouping key specification
   * @return Generated hash aggregation operator
   */
  def generate(
    aggInfos: Array[AggregateInfo],
    inputType: RowType,
    outputType: RowType,
    grouping: Array[Int]
  ): GeneratedOperator[OneInputStreamOperator[RowData, RowData]]
}

/**
 * Sort-based aggregation code generator
 */
object SortAggCodeGenerator {
  
  /**
   * Generates sort aggregation operator
   * @param aggInfos Aggregation function information
   * @param inputType Input data type
   * @param outputType Output data type
   * @param grouping Grouping specification
   * @param orderKeys Sort order specification
   * @return Generated sort aggregation operator
   */
  def generate(
    aggInfos: Array[AggregateInfo],
    inputType: RowType,
    outputType: RowType,
    grouping: Array[Int],
    orderKeys: Array[Int]
  ): GeneratedOperator[OneInputStreamOperator[RowData, RowData]]
}

WatermarkGeneratorCodeGen - Watermark Code Generation

Generates code for watermark generation in streaming scenarios.

/**
 * Code generator for watermark generation
 */
object WatermarkGeneratorCodeGen {
  
  /**
   * Generates watermark generator for streaming sources
   * @param rowtimeFieldIndex Index of rowtime field
   * @param watermarkExpr Watermark generation expression
   * @param inputType Input row type
   * @param config Table configuration
   * @return Generated watermark generator
   */
  def generateWatermarkGenerator(
    rowtimeFieldIndex: Int,
    watermarkExpr: RexNode,
    inputType: RowType,
    config: TableConfig
  ): GeneratedWatermarkGenerator
}

/**
 * Generated watermark generator interface
 */
trait GeneratedWatermarkGenerator {
  /**
   * Generates watermark for given row data
   * @param row Input row containing timestamp
   * @return Generated watermark timestamp
   */
  def currentWatermark(row: RowData): Long
}

HashCodeGenerator - Hash Code Generation

Generates efficient hash code computation for keys and records.

/**
 * Code generator for hash code computation
 */
object HashCodeGenerator {
  
  /**
   * Generates hash code computer for row data
   * @param fieldTypes Types of fields to hash
   * @param config Table configuration
   * @return Generated hash code computer
   */
  def generateRowHash(
    fieldTypes: Array[DataType],
    config: TableConfig
  ): GeneratedHashFunction
  
  /**
   * Generates hash code computer for key fields
   * @param keyTypes Types of key fields
   * @param keyIndices Indices of key fields in the row
   * @return Generated key hash function
   */
  def generateKeyHash(
    keyTypes: Array[DataType],
    keyIndices: Array[Int]
  ): GeneratedHashFunction
}

/**
 * Generated hash function interface
 */
trait GeneratedHashFunction {
  /**
   * Computes hash code for row data
   * @param row Row data to hash
   * @return Hash code value
   */
  def hashCode(row: RowData): Int
}

Over Window Code Generation

Specialized code generation for over window operations including range and row-based windows.

/**
 * Code generator for range-based window comparisons
 */
object RangeBoundComparatorCodeGenerator {
  
  /**
   * Generates range bound comparator for window operations
   * @param boundType Type of the bound field
   * @param isLowerBound Whether this is a lower bound comparator
   * @param config Table configuration
   * @return Generated range bound comparator
   */
  def generate(
    boundType: DataType,
    isLowerBound: Boolean,
    config: TableConfig
  ): GeneratedRecordComparator
}

/**
 * Multi-field range bound comparator generator
 */
object MultiFieldRangeBoundComparatorCodeGenerator {
  
  /**
   * Generates comparator for multi-field range bounds
   * @param orderKeys Array of order key information
   * @param boundTypes Types of bound fields
   * @param orders Sort orders for each field
   * @return Generated multi-field comparator
   */
  def generate(
    orderKeys: Array[Int],
    boundTypes: Array[DataType], 
    orders: Array[SortDirection]
  ): GeneratedRecordComparator
}

Generated Code Interface

GeneratedFunction

Base interface for all generated functions:

/**
 * Base interface for generated functions
 */
trait GeneratedFunction[F, T] {
  /**
   * Creates new instance of the generated function
   * @param classLoader Class loader for instantiation
   * @return New function instance
   */
  def newInstance(classLoader: ClassLoader): F
  
  /**
   * Returns generated code as string (for debugging)
   * @return Generated Java code
   */
  def getCode: String
  
  /**
   * Returns class name of generated function
   * @return Generated class name
   */
  def getClassName: String
}

GeneratedOperator

Interface for generated Flink operators:

/**
 * Generated Flink operator interface
 */  
trait GeneratedOperator[T <: StreamOperator[_]] {
  /**
   * Creates new instance of generated operator
   * @param parameters Operator parameters
   * @return New operator instance
   */
  def newInstance(parameters: Map[String, Any]): T
  
  /**
   * Returns generated operator code
   * @return Generated Java code
   */
  def getCode: String
}

Code Generation Configuration

Key configuration options for code generation:

// Enable/disable code generation
tableConfig.getConfiguration().setString("table.exec.codegen.enabled", "true");

// Set maximum generated code length
tableConfig.getConfiguration().setString("table.exec.codegen.length.max", "64000");

// Enable null check elimination optimization
tableConfig.getConfiguration().setString("table.exec.codegen.null-check", "true");

// Configure string concatenation method
tableConfig.getConfiguration().setString("table.exec.codegen.string.concat", "true");

The generated code is compiled using the Janino compiler at runtime and provides significant performance improvements over interpreted execution by eliminating virtual method calls and enabling JIT compiler optimizations.

Install with Tessl CLI

npx tessl i tessl/maven-org-apache-flink--flink-table-planner-blink-2-12

docs

catalog-integration.md

code-generation.md

expression-system.md

function-integration.md

index.md

planner-factory.md

query-planning.md

tile.json