CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-apache-flink--flink-table-planner-blink-2-12

Apache Flink's Table API Planner Blink module providing sophisticated SQL and Table API query planning and execution engine with advanced query optimization, code generation, and comprehensive support for both streaming and batch workloads.

Pending
Overview
Eval results
Files

function-integration.mddocs/

Function Integration

The function integration system provides comprehensive support for user-defined functions (UDFs) including scalar functions, aggregate functions, and table functions. It handles function registration, validation, SQL integration, and code generation for optimal performance.

Capabilities

UserDefinedFunctionUtils - UDF Utilities

Core utilities for user-defined function handling, validation, and integration with the Flink Table API.

/**
 * Utilities for user-defined function management and validation
 */
object UserDefinedFunctionUtils {
  
  /**
   * Validates that a class can be instantiated for UDF usage
   * @param clazz Class to check for instantiation capability
   * @throws ValidationException if class cannot be instantiated
   */
  def checkForInstantiation(clazz: Class[_]): Unit
  
  /**
   * Checks whether the given class is not a Scala singleton object
   * Prevents concurrent risks with TableFunction implementations
   * @param clazz Class to check for singleton pattern
   * @throws ValidationException if class is a Scala object
   */
  def checkNotSingleton(clazz: Class[_]): Unit
  
  /**
   * Gets the eval method signature for a user-defined function
   * @param function User-defined function instance
   * @param expectedTypes Expected parameter types
   * @return Method signature for eval method
   */
  def getEvalMethodSignature(
    function: UserDefinedFunction,
    expectedTypes: Array[LogicalType]
  ): Method
  
  /**
   * Checks if a specified method exists in the function
   * @param method Method name to check
   * @param function User-defined function instance
   * @return true if method exists, false otherwise
   */
  def ifMethodExistInFunction(method: String, function: UserDefinedFunction): Boolean
  
  /**
   * Extracts method signatures from UDF class for function inference
   * @param clazz UDF class to analyze
   * @param methodName Name of the method to extract (e.g., "eval", "accumulate")
   * @return Array of method signatures found
   */
  def getMethodSignatures(clazz: Class[_], methodName: String): Array[MethodSignature]
  
  /**
   * Determines result type information from UDF method signatures
   * @param signatures Array of method signatures
   * @param inputTypes Input argument types
   * @return Inferred result type information
   */
  def getResultTypeFromSignatures(
    signatures: Array[MethodSignature], 
    inputTypes: Array[DataType]
  ): TypeInformation[_]
}

Usage Example:

import org.apache.flink.table.planner.functions.utils.UserDefinedFunctionUtils
import org.apache.flink.table.functions.ScalarFunction

// Custom scalar function
class MyUpperFunction extends ScalarFunction {
  def eval(str: String): String = str.toUpperCase
}

// Validate function before registration
val myFunction = new MyUpperFunction()
UserDefinedFunctionUtils.validateScalarFunction(myFunction)
UserDefinedFunctionUtils.checkForInstantiation(myFunction.getClass)

// Function can now be registered with table environment
tableEnv.createTemporarySystemFunction("MY_UPPER", myFunction)

Scalar Function SQL Integration

SQL integration layer for scalar functions, providing seamless integration between user-defined scalar functions and SQL queries.

/**
 * SQL integration for scalar functions
 */
class ScalarSqlFunction(
  identifier: String,
  displayName: String,
  scalarFunction: ScalarFunction,
  typeFactory: FlinkTypeFactory
) extends SqlFunction {
  
  /**
   * Returns function kind for SQL integration
   * @return SqlKind.OTHER_FUNCTION for scalar functions
   */
  def getKind: SqlKind = SqlKind.OTHER_FUNCTION
  
  /**
   * Returns SQL identifier for this function
   * @return Function identifier used in SQL
   */
  def getName: String = identifier
  
  /**
   * Infers return type based on operand types
   * @param opBinding Operand binding with argument types
   * @return Inferred return type for SQL query
   */
  def inferReturnType(opBinding: SqlOperatorBinding): RelDataType
  
  /**
   * Validates function call with given operands
   * @param callBinding Call binding with argument information
   * @return True if call is valid, false otherwise
   */
  def checkOperandTypes(callBinding: SqlCallBinding): Boolean
}

Aggregate Function SQL Integration

SQL integration for aggregate functions, handling accumulator management and result computation.

/**
 * SQL integration for aggregate functions
 */
class AggSqlFunction(
  identifier: String,
  displayName: String,
  aggregateFunction: AggregateFunction[_, _],
  resultType: RelDataType,
  accType: RelDataType,
  typeFactory: FlinkTypeFactory
) extends SqlAggFunction {
  
  /**
   * Returns function kind for SQL integration
   * @return SqlKind.OTHER_FUNCTION for aggregate functions
   */
  def getKind: SqlKind = SqlKind.OTHER_FUNCTION
  
  /**
   * Returns aggregate function name
   * @return Function identifier used in SQL
   */
  def getName: String = identifier
  
  /**
   * Infers return type for aggregate result
   * @param opBinding Operand binding information
   * @return Return type of aggregation result
   */
  def inferReturnType(opBinding: SqlOperatorBinding): RelDataType = resultType
  
  /**
   * Returns accumulator type information
   * @param typeFactory Type factory for type creation
   * @return Accumulator type information
   */
  def getAccumulatorType(typeFactory: RelDataTypeFactory): RelDataType = accType
  
  /**
   * Validates aggregate function call
   * @param callBinding Call binding with operand information
   * @return True if call is valid
   */
  def checkOperandTypes(callBinding: SqlCallBinding): Boolean
}

Table Function SQL Integration

SQL integration for table functions (UDTFs), enabling table-valued function calls in SQL queries.

/**
 * SQL integration for table functions (UDTFs)
 */
class TableSqlFunction(
  identifier: String,
  displayName: String, 
  tableFunction: TableFunction[_],
  resultType: RelDataType,
  typeFactory: FlinkTypeFactory
) extends SqlFunction {
  
  /**
   * Returns function kind for table functions
   * @return SqlKind.OTHER_FUNCTION for table functions
   */
  def getKind: SqlKind = SqlKind.OTHER_FUNCTION
  
  /**
   * Returns table function name
   * @return Function identifier used in SQL
   */
  def getName: String = identifier
  
  /**
   * Infers return type for table function
   * @param opBinding Operand binding information
   * @return Row type returned by table function
   */
  def inferReturnType(opBinding: SqlOperatorBinding): RelDataType = resultType
  
  /**
   * Validates table function call
   * @param callBinding Call binding with argument types
   * @return True if call is valid
   */
  def checkOperandTypes(callBinding: SqlCallBinding): Boolean
  
  /**
   * Returns table function definition for query planning
   * @return TableFunction instance
   */
  def getTableFunction: TableFunction[_] = tableFunction
}

Function Call Code Generation

Integration with the code generation system for optimized function execution:

/**
 * Code generation for function calls
 */
object FunctionCallCodeGenerator {
  
  /**
   * Generates code for scalar function calls
   * @param scalarFunction Scalar function to generate code for
   * @param operands Operand expressions
   * @param resultType Expected result type
   * @param config Table configuration
   * @return Generated code expression
   */
  def generateScalarFunctionCall(
    scalarFunction: ScalarFunction,
    operands: Seq[GeneratedExpression],
    resultType: DataType,
    config: TableConfig
  ): GeneratedExpression
  
  /**
   * Generates code for aggregate function calls
   * @param aggregateFunction Aggregate function
   * @param operands Input operand expressions
   * @param accType Accumulator type
   * @param resultType Result type
   * @return Generated aggregate handler code
   */
  def generateAggregateFunctionCall(
    aggregateFunction: AggregateFunction[_, _],
    operands: Seq[GeneratedExpression],
    accType: DataType,
    resultType: DataType
  ): GeneratedAggregateHandler
}

Built-in Function Integration

Support for built-in SQL functions and their integration with the planning system:

/**
 * Built-in function definitions and utilities
 */
object BuiltInFunctionDefinitions {
  
  // String functions
  val UPPER: FunctionDefinition
  val LOWER: FunctionDefinition
  val SUBSTRING: FunctionDefinition
  val TRIM: FunctionDefinition
  val CONCAT: FunctionDefinition
  
  // Mathematical functions
  val ABS: FunctionDefinition
  val CEIL: FunctionDefinition
  val FLOOR: FunctionDefinition
  val ROUND: FunctionDefinition
  val SIN: FunctionDefinition
  val COS: FunctionDefinition
  
  // Date/time functions
  val CURRENT_TIMESTAMP: FunctionDefinition
  val DATE_FORMAT: FunctionDefinition
  val EXTRACT: FunctionDefinition
  
  // Aggregate functions
  val COUNT: FunctionDefinition
  val SUM: FunctionDefinition
  val AVG: FunctionDefinition
  val MIN: FunctionDefinition
  val MAX: FunctionDefinition
  
  // Window functions
  val ROW_NUMBER: FunctionDefinition
  val RANK: FunctionDefinition
  val DENSE_RANK: FunctionDefinition
  val LAG: FunctionDefinition
  val LEAD: FunctionDefinition
}

Function Catalog Integration

Integration with Flink's function catalog system for function discovery and resolution:

/**
 * Function catalog integration (from flink-table-api)
 * Functions are registered and resolved through FunctionCatalog
 */
public interface FunctionCatalog {
  /**
   * Registers a temporary system function
   * @param name Function name
   * @param functionDefinition Function definition
   */
  void registerTemporarySystemFunction(String name, FunctionDefinition functionDefinition);
  
  /**
   * Registers a temporary catalog function
   * @param objectIdentifier Function identifier with catalog/database/name
   * @param functionDefinition Function definition
   * @param ignoreIfExists Whether to ignore if function already exists
   */
  void registerTemporaryCatalogFunction(
    ObjectIdentifier objectIdentifier,
    FunctionDefinition functionDefinition, 
    boolean ignoreIfExists
  );
  
  /**
   * Looks up function by identifier
   * @param objectIdentifier Function identifier
   * @return Optional function lookup result
   */
  Optional<FunctionLookup.Result> lookupFunction(ObjectIdentifier objectIdentifier);
}

Error Handling and Validation

Function validation ensures proper UDF implementation:

// Common validation scenarios
try {
  UserDefinedFunctionUtils.validateScalarFunction(myFunction)
} catch {
  case e: ValidationException => 
    // Handle validation errors:
    // - Missing eval() method
    // - Invalid method signatures  
    // - Unsupported parameter types
    // - Missing result type information
}

// Function instantiation validation
try {
  UserDefinedFunctionUtils.checkForInstantiation(functionClass)
} catch {
  case e: ValidationException =>
    // Handle instantiation errors:
    // - No default constructor
    // - Constructor throws exceptions
    // - Class is abstract or interface
}

Advanced Function Features

Generic Function Support

Support for generic functions with type parameter resolution:

// Generic function example (handled automatically by utilities)
class GenericFunction[T] extends ScalarFunction {
  def eval(input: T): T = input
}

// Type resolution is handled during function registration
val genericFunc = new GenericFunction[String]()
UserDefinedFunctionUtils.validateScalarFunction(genericFunc)

Deterministic Function Optimization

Functions marked as deterministic can be optimized during planning:

public class MyDeterministicFunction extends ScalarFunction {
  @Override
  public boolean isDeterministic() {
    return true;  // Enables constant folding and other optimizations
  }
  
  public String eval(String input) {
    return input.toUpperCase();
  }
}

Install with Tessl CLI

npx tessl i tessl/maven-org-apache-flink--flink-table-planner-blink-2-12

docs

catalog-integration.md

code-generation.md

expression-system.md

function-integration.md

index.md

planner-factory.md

query-planning.md

tile.json