Apache Flink's Table API Planner Blink module providing sophisticated SQL and Table API query planning and execution engine with advanced query optimization, code generation, and comprehensive support for both streaming and batch workloads.
—
The function integration system provides comprehensive support for user-defined functions (UDFs) including scalar functions, aggregate functions, and table functions. It handles function registration, validation, SQL integration, and code generation for optimal performance.
Core utilities for user-defined function handling, validation, and integration with the Flink Table API.
/**
* Utilities for user-defined function management and validation
*/
object UserDefinedFunctionUtils {
/**
* Validates that a class can be instantiated for UDF usage
* @param clazz Class to check for instantiation capability
* @throws ValidationException if class cannot be instantiated
*/
def checkForInstantiation(clazz: Class[_]): Unit
/**
* Checks whether the given class is not a Scala singleton object
* Prevents concurrent risks with TableFunction implementations
* @param clazz Class to check for singleton pattern
* @throws ValidationException if class is a Scala object
*/
def checkNotSingleton(clazz: Class[_]): Unit
/**
* Gets the eval method signature for a user-defined function
* @param function User-defined function instance
* @param expectedTypes Expected parameter types
* @return Method signature for eval method
*/
def getEvalMethodSignature(
function: UserDefinedFunction,
expectedTypes: Array[LogicalType]
): Method
/**
* Checks if a specified method exists in the function
* @param method Method name to check
* @param function User-defined function instance
* @return true if method exists, false otherwise
*/
def ifMethodExistInFunction(method: String, function: UserDefinedFunction): Boolean
/**
* Extracts method signatures from UDF class for function inference
* @param clazz UDF class to analyze
* @param methodName Name of the method to extract (e.g., "eval", "accumulate")
* @return Array of method signatures found
*/
def getMethodSignatures(clazz: Class[_], methodName: String): Array[MethodSignature]
/**
* Determines result type information from UDF method signatures
* @param signatures Array of method signatures
* @param inputTypes Input argument types
* @return Inferred result type information
*/
def getResultTypeFromSignatures(
signatures: Array[MethodSignature],
inputTypes: Array[DataType]
): TypeInformation[_]
}Usage Example:
import org.apache.flink.table.planner.functions.utils.UserDefinedFunctionUtils
import org.apache.flink.table.functions.ScalarFunction
// Custom scalar function
class MyUpperFunction extends ScalarFunction {
def eval(str: String): String = str.toUpperCase
}
// Validate function before registration
val myFunction = new MyUpperFunction()
UserDefinedFunctionUtils.validateScalarFunction(myFunction)
UserDefinedFunctionUtils.checkForInstantiation(myFunction.getClass)
// Function can now be registered with table environment
tableEnv.createTemporarySystemFunction("MY_UPPER", myFunction)SQL integration layer for scalar functions, providing seamless integration between user-defined scalar functions and SQL queries.
/**
* SQL integration for scalar functions
*/
class ScalarSqlFunction(
identifier: String,
displayName: String,
scalarFunction: ScalarFunction,
typeFactory: FlinkTypeFactory
) extends SqlFunction {
/**
* Returns function kind for SQL integration
* @return SqlKind.OTHER_FUNCTION for scalar functions
*/
def getKind: SqlKind = SqlKind.OTHER_FUNCTION
/**
* Returns SQL identifier for this function
* @return Function identifier used in SQL
*/
def getName: String = identifier
/**
* Infers return type based on operand types
* @param opBinding Operand binding with argument types
* @return Inferred return type for SQL query
*/
def inferReturnType(opBinding: SqlOperatorBinding): RelDataType
/**
* Validates function call with given operands
* @param callBinding Call binding with argument information
* @return True if call is valid, false otherwise
*/
def checkOperandTypes(callBinding: SqlCallBinding): Boolean
}SQL integration for aggregate functions, handling accumulator management and result computation.
/**
* SQL integration for aggregate functions
*/
class AggSqlFunction(
identifier: String,
displayName: String,
aggregateFunction: AggregateFunction[_, _],
resultType: RelDataType,
accType: RelDataType,
typeFactory: FlinkTypeFactory
) extends SqlAggFunction {
/**
* Returns function kind for SQL integration
* @return SqlKind.OTHER_FUNCTION for aggregate functions
*/
def getKind: SqlKind = SqlKind.OTHER_FUNCTION
/**
* Returns aggregate function name
* @return Function identifier used in SQL
*/
def getName: String = identifier
/**
* Infers return type for aggregate result
* @param opBinding Operand binding information
* @return Return type of aggregation result
*/
def inferReturnType(opBinding: SqlOperatorBinding): RelDataType = resultType
/**
* Returns accumulator type information
* @param typeFactory Type factory for type creation
* @return Accumulator type information
*/
def getAccumulatorType(typeFactory: RelDataTypeFactory): RelDataType = accType
/**
* Validates aggregate function call
* @param callBinding Call binding with operand information
* @return True if call is valid
*/
def checkOperandTypes(callBinding: SqlCallBinding): Boolean
}SQL integration for table functions (UDTFs), enabling table-valued function calls in SQL queries.
/**
* SQL integration for table functions (UDTFs)
*/
class TableSqlFunction(
identifier: String,
displayName: String,
tableFunction: TableFunction[_],
resultType: RelDataType,
typeFactory: FlinkTypeFactory
) extends SqlFunction {
/**
* Returns function kind for table functions
* @return SqlKind.OTHER_FUNCTION for table functions
*/
def getKind: SqlKind = SqlKind.OTHER_FUNCTION
/**
* Returns table function name
* @return Function identifier used in SQL
*/
def getName: String = identifier
/**
* Infers return type for table function
* @param opBinding Operand binding information
* @return Row type returned by table function
*/
def inferReturnType(opBinding: SqlOperatorBinding): RelDataType = resultType
/**
* Validates table function call
* @param callBinding Call binding with argument types
* @return True if call is valid
*/
def checkOperandTypes(callBinding: SqlCallBinding): Boolean
/**
* Returns table function definition for query planning
* @return TableFunction instance
*/
def getTableFunction: TableFunction[_] = tableFunction
}Integration with the code generation system for optimized function execution:
/**
* Code generation for function calls
*/
object FunctionCallCodeGenerator {
/**
* Generates code for scalar function calls
* @param scalarFunction Scalar function to generate code for
* @param operands Operand expressions
* @param resultType Expected result type
* @param config Table configuration
* @return Generated code expression
*/
def generateScalarFunctionCall(
scalarFunction: ScalarFunction,
operands: Seq[GeneratedExpression],
resultType: DataType,
config: TableConfig
): GeneratedExpression
/**
* Generates code for aggregate function calls
* @param aggregateFunction Aggregate function
* @param operands Input operand expressions
* @param accType Accumulator type
* @param resultType Result type
* @return Generated aggregate handler code
*/
def generateAggregateFunctionCall(
aggregateFunction: AggregateFunction[_, _],
operands: Seq[GeneratedExpression],
accType: DataType,
resultType: DataType
): GeneratedAggregateHandler
}Support for built-in SQL functions and their integration with the planning system:
/**
* Built-in function definitions and utilities
*/
object BuiltInFunctionDefinitions {
// String functions
val UPPER: FunctionDefinition
val LOWER: FunctionDefinition
val SUBSTRING: FunctionDefinition
val TRIM: FunctionDefinition
val CONCAT: FunctionDefinition
// Mathematical functions
val ABS: FunctionDefinition
val CEIL: FunctionDefinition
val FLOOR: FunctionDefinition
val ROUND: FunctionDefinition
val SIN: FunctionDefinition
val COS: FunctionDefinition
// Date/time functions
val CURRENT_TIMESTAMP: FunctionDefinition
val DATE_FORMAT: FunctionDefinition
val EXTRACT: FunctionDefinition
// Aggregate functions
val COUNT: FunctionDefinition
val SUM: FunctionDefinition
val AVG: FunctionDefinition
val MIN: FunctionDefinition
val MAX: FunctionDefinition
// Window functions
val ROW_NUMBER: FunctionDefinition
val RANK: FunctionDefinition
val DENSE_RANK: FunctionDefinition
val LAG: FunctionDefinition
val LEAD: FunctionDefinition
}Integration with Flink's function catalog system for function discovery and resolution:
/**
* Function catalog integration (from flink-table-api)
* Functions are registered and resolved through FunctionCatalog
*/
public interface FunctionCatalog {
/**
* Registers a temporary system function
* @param name Function name
* @param functionDefinition Function definition
*/
void registerTemporarySystemFunction(String name, FunctionDefinition functionDefinition);
/**
* Registers a temporary catalog function
* @param objectIdentifier Function identifier with catalog/database/name
* @param functionDefinition Function definition
* @param ignoreIfExists Whether to ignore if function already exists
*/
void registerTemporaryCatalogFunction(
ObjectIdentifier objectIdentifier,
FunctionDefinition functionDefinition,
boolean ignoreIfExists
);
/**
* Looks up function by identifier
* @param objectIdentifier Function identifier
* @return Optional function lookup result
*/
Optional<FunctionLookup.Result> lookupFunction(ObjectIdentifier objectIdentifier);
}Function validation ensures proper UDF implementation:
// Common validation scenarios
try {
UserDefinedFunctionUtils.validateScalarFunction(myFunction)
} catch {
case e: ValidationException =>
// Handle validation errors:
// - Missing eval() method
// - Invalid method signatures
// - Unsupported parameter types
// - Missing result type information
}
// Function instantiation validation
try {
UserDefinedFunctionUtils.checkForInstantiation(functionClass)
} catch {
case e: ValidationException =>
// Handle instantiation errors:
// - No default constructor
// - Constructor throws exceptions
// - Class is abstract or interface
}Support for generic functions with type parameter resolution:
// Generic function example (handled automatically by utilities)
class GenericFunction[T] extends ScalarFunction {
def eval(input: T): T = input
}
// Type resolution is handled during function registration
val genericFunc = new GenericFunction[String]()
UserDefinedFunctionUtils.validateScalarFunction(genericFunc)Functions marked as deterministic can be optimized during planning:
public class MyDeterministicFunction extends ScalarFunction {
@Override
public boolean isDeterministic() {
return true; // Enables constant folding and other optimizations
}
public String eval(String input) {
return input.toUpperCase();
}
}Install with Tessl CLI
npx tessl i tessl/maven-org-apache-flink--flink-table-planner-blink-2-12