0
# Function Integration
1
2
The function integration system provides comprehensive support for user-defined functions (UDFs) including scalar functions, aggregate functions, and table functions. It handles function registration, validation, SQL integration, and code generation for optimal performance.
3
4
## Capabilities
5
6
### UserDefinedFunctionUtils - UDF Utilities
7
8
Core utilities for user-defined function handling, validation, and integration with the Flink Table API.
9
10
```scala { .api }
11
/**
12
* Utilities for user-defined function management and validation
13
*/
14
object UserDefinedFunctionUtils {
15
16
/**
17
* Validates that a class can be instantiated for UDF usage
18
* @param clazz Class to check for instantiation capability
19
* @throws ValidationException if class cannot be instantiated
20
*/
21
def checkForInstantiation(clazz: Class[_]): Unit
22
23
/**
24
* Checks whether the given class is not a Scala singleton object
25
* Prevents concurrent risks with TableFunction implementations
26
* @param clazz Class to check for singleton pattern
27
* @throws ValidationException if class is a Scala object
28
*/
29
def checkNotSingleton(clazz: Class[_]): Unit
30
31
/**
32
* Gets the eval method signature for a user-defined function
33
* @param function User-defined function instance
34
* @param expectedTypes Expected parameter types
35
* @return Method signature for eval method
36
*/
37
def getEvalMethodSignature(
38
function: UserDefinedFunction,
39
expectedTypes: Array[LogicalType]
40
): Method
41
42
/**
43
* Checks if a specified method exists in the function
44
* @param method Method name to check
45
* @param function User-defined function instance
46
* @return true if method exists, false otherwise
47
*/
48
def ifMethodExistInFunction(method: String, function: UserDefinedFunction): Boolean
49
50
/**
51
* Extracts method signatures from UDF class for function inference
52
* @param clazz UDF class to analyze
53
* @param methodName Name of the method to extract (e.g., "eval", "accumulate")
54
* @return Array of method signatures found
55
*/
56
def getMethodSignatures(clazz: Class[_], methodName: String): Array[MethodSignature]
57
58
/**
59
* Determines result type information from UDF method signatures
60
* @param signatures Array of method signatures
61
* @param inputTypes Input argument types
62
* @return Inferred result type information
63
*/
64
def getResultTypeFromSignatures(
65
signatures: Array[MethodSignature],
66
inputTypes: Array[DataType]
67
): TypeInformation[_]
68
}
69
```
70
71
**Usage Example:**
72
73
```scala
74
import org.apache.flink.table.planner.functions.utils.UserDefinedFunctionUtils
75
import org.apache.flink.table.functions.ScalarFunction
76
77
// Custom scalar function
78
class MyUpperFunction extends ScalarFunction {
79
def eval(str: String): String = str.toUpperCase
80
}
81
82
// Validate function before registration
83
val myFunction = new MyUpperFunction()
84
UserDefinedFunctionUtils.validateScalarFunction(myFunction)
85
UserDefinedFunctionUtils.checkForInstantiation(myFunction.getClass)
86
87
// Function can now be registered with table environment
88
tableEnv.createTemporarySystemFunction("MY_UPPER", myFunction)
89
```
90
91
### Scalar Function SQL Integration
92
93
SQL integration layer for scalar functions, providing seamless integration between user-defined scalar functions and SQL queries.
94
95
```scala { .api }
96
/**
97
* SQL integration for scalar functions
98
*/
99
class ScalarSqlFunction(
100
identifier: String,
101
displayName: String,
102
scalarFunction: ScalarFunction,
103
typeFactory: FlinkTypeFactory
104
) extends SqlFunction {
105
106
/**
107
* Returns function kind for SQL integration
108
* @return SqlKind.OTHER_FUNCTION for scalar functions
109
*/
110
def getKind: SqlKind = SqlKind.OTHER_FUNCTION
111
112
/**
113
* Returns SQL identifier for this function
114
* @return Function identifier used in SQL
115
*/
116
def getName: String = identifier
117
118
/**
119
* Infers return type based on operand types
120
* @param opBinding Operand binding with argument types
121
* @return Inferred return type for SQL query
122
*/
123
def inferReturnType(opBinding: SqlOperatorBinding): RelDataType
124
125
/**
126
* Validates function call with given operands
127
* @param callBinding Call binding with argument information
128
* @return True if call is valid, false otherwise
129
*/
130
def checkOperandTypes(callBinding: SqlCallBinding): Boolean
131
}
132
```
133
134
### Aggregate Function SQL Integration
135
136
SQL integration for aggregate functions, handling accumulator management and result computation.
137
138
```scala { .api }
139
/**
140
* SQL integration for aggregate functions
141
*/
142
class AggSqlFunction(
143
identifier: String,
144
displayName: String,
145
aggregateFunction: AggregateFunction[_, _],
146
resultType: RelDataType,
147
accType: RelDataType,
148
typeFactory: FlinkTypeFactory
149
) extends SqlAggFunction {
150
151
/**
152
* Returns function kind for SQL integration
153
* @return SqlKind.OTHER_FUNCTION for aggregate functions
154
*/
155
def getKind: SqlKind = SqlKind.OTHER_FUNCTION
156
157
/**
158
* Returns aggregate function name
159
* @return Function identifier used in SQL
160
*/
161
def getName: String = identifier
162
163
/**
164
* Infers return type for aggregate result
165
* @param opBinding Operand binding information
166
* @return Return type of aggregation result
167
*/
168
def inferReturnType(opBinding: SqlOperatorBinding): RelDataType = resultType
169
170
/**
171
* Returns accumulator type information
172
* @param typeFactory Type factory for type creation
173
* @return Accumulator type information
174
*/
175
def getAccumulatorType(typeFactory: RelDataTypeFactory): RelDataType = accType
176
177
/**
178
* Validates aggregate function call
179
* @param callBinding Call binding with operand information
180
* @return True if call is valid
181
*/
182
def checkOperandTypes(callBinding: SqlCallBinding): Boolean
183
}
184
```
185
186
### Table Function SQL Integration
187
188
SQL integration for table functions (UDTFs), enabling table-valued function calls in SQL queries.
189
190
```scala { .api }
191
/**
192
* SQL integration for table functions (UDTFs)
193
*/
194
class TableSqlFunction(
195
identifier: String,
196
displayName: String,
197
tableFunction: TableFunction[_],
198
resultType: RelDataType,
199
typeFactory: FlinkTypeFactory
200
) extends SqlFunction {
201
202
/**
203
* Returns function kind for table functions
204
* @return SqlKind.OTHER_FUNCTION for table functions
205
*/
206
def getKind: SqlKind = SqlKind.OTHER_FUNCTION
207
208
/**
209
* Returns table function name
210
* @return Function identifier used in SQL
211
*/
212
def getName: String = identifier
213
214
/**
215
* Infers return type for table function
216
* @param opBinding Operand binding information
217
* @return Row type returned by table function
218
*/
219
def inferReturnType(opBinding: SqlOperatorBinding): RelDataType = resultType
220
221
/**
222
* Validates table function call
223
* @param callBinding Call binding with argument types
224
* @return True if call is valid
225
*/
226
def checkOperandTypes(callBinding: SqlCallBinding): Boolean
227
228
/**
229
* Returns table function definition for query planning
230
* @return TableFunction instance
231
*/
232
def getTableFunction: TableFunction[_] = tableFunction
233
}
234
```
235
236
## Function Call Code Generation
237
238
Integration with the code generation system for optimized function execution:
239
240
```scala { .api }
241
/**
242
* Code generation for function calls
243
*/
244
object FunctionCallCodeGenerator {
245
246
/**
247
* Generates code for scalar function calls
248
* @param scalarFunction Scalar function to generate code for
249
* @param operands Operand expressions
250
* @param resultType Expected result type
251
* @param config Table configuration
252
* @return Generated code expression
253
*/
254
def generateScalarFunctionCall(
255
scalarFunction: ScalarFunction,
256
operands: Seq[GeneratedExpression],
257
resultType: DataType,
258
config: TableConfig
259
): GeneratedExpression
260
261
/**
262
* Generates code for aggregate function calls
263
* @param aggregateFunction Aggregate function
264
* @param operands Input operand expressions
265
* @param accType Accumulator type
266
* @param resultType Result type
267
* @return Generated aggregate handler code
268
*/
269
def generateAggregateFunctionCall(
270
aggregateFunction: AggregateFunction[_, _],
271
operands: Seq[GeneratedExpression],
272
accType: DataType,
273
resultType: DataType
274
): GeneratedAggregateHandler
275
}
276
```
277
278
## Built-in Function Integration
279
280
Support for built-in SQL functions and their integration with the planning system:
281
282
```scala { .api }
283
/**
284
* Built-in function definitions and utilities
285
*/
286
object BuiltInFunctionDefinitions {
287
288
// String functions
289
val UPPER: FunctionDefinition
290
val LOWER: FunctionDefinition
291
val SUBSTRING: FunctionDefinition
292
val TRIM: FunctionDefinition
293
val CONCAT: FunctionDefinition
294
295
// Mathematical functions
296
val ABS: FunctionDefinition
297
val CEIL: FunctionDefinition
298
val FLOOR: FunctionDefinition
299
val ROUND: FunctionDefinition
300
val SIN: FunctionDefinition
301
val COS: FunctionDefinition
302
303
// Date/time functions
304
val CURRENT_TIMESTAMP: FunctionDefinition
305
val DATE_FORMAT: FunctionDefinition
306
val EXTRACT: FunctionDefinition
307
308
// Aggregate functions
309
val COUNT: FunctionDefinition
310
val SUM: FunctionDefinition
311
val AVG: FunctionDefinition
312
val MIN: FunctionDefinition
313
val MAX: FunctionDefinition
314
315
// Window functions
316
val ROW_NUMBER: FunctionDefinition
317
val RANK: FunctionDefinition
318
val DENSE_RANK: FunctionDefinition
319
val LAG: FunctionDefinition
320
val LEAD: FunctionDefinition
321
}
322
```
323
324
## Function Catalog Integration
325
326
Integration with Flink's function catalog system for function discovery and resolution:
327
328
```java { .api }
329
/**
330
* Function catalog integration (from flink-table-api)
331
* Functions are registered and resolved through FunctionCatalog
332
*/
333
public interface FunctionCatalog {
334
/**
335
* Registers a temporary system function
336
* @param name Function name
337
* @param functionDefinition Function definition
338
*/
339
void registerTemporarySystemFunction(String name, FunctionDefinition functionDefinition);
340
341
/**
342
* Registers a temporary catalog function
343
* @param objectIdentifier Function identifier with catalog/database/name
344
* @param functionDefinition Function definition
345
* @param ignoreIfExists Whether to ignore if function already exists
346
*/
347
void registerTemporaryCatalogFunction(
348
ObjectIdentifier objectIdentifier,
349
FunctionDefinition functionDefinition,
350
boolean ignoreIfExists
351
);
352
353
/**
354
* Looks up function by identifier
355
* @param objectIdentifier Function identifier
356
* @return Optional function lookup result
357
*/
358
Optional<FunctionLookup.Result> lookupFunction(ObjectIdentifier objectIdentifier);
359
}
360
```
361
362
## Error Handling and Validation
363
364
Function validation ensures proper UDF implementation:
365
366
```scala
367
// Common validation scenarios
368
try {
369
UserDefinedFunctionUtils.validateScalarFunction(myFunction)
370
} catch {
371
case e: ValidationException =>
372
// Handle validation errors:
373
// - Missing eval() method
374
// - Invalid method signatures
375
// - Unsupported parameter types
376
// - Missing result type information
377
}
378
379
// Function instantiation validation
380
try {
381
UserDefinedFunctionUtils.checkForInstantiation(functionClass)
382
} catch {
383
case e: ValidationException =>
384
// Handle instantiation errors:
385
// - No default constructor
386
// - Constructor throws exceptions
387
// - Class is abstract or interface
388
}
389
```
390
391
## Advanced Function Features
392
393
### Generic Function Support
394
395
Support for generic functions with type parameter resolution:
396
397
```scala
398
// Generic function example (handled automatically by utilities)
399
class GenericFunction[T] extends ScalarFunction {
400
def eval(input: T): T = input
401
}
402
403
// Type resolution is handled during function registration
404
val genericFunc = new GenericFunction[String]()
405
UserDefinedFunctionUtils.validateScalarFunction(genericFunc)
406
```
407
408
### Deterministic Function Optimization
409
410
Functions marked as deterministic can be optimized during planning:
411
412
```java
413
public class MyDeterministicFunction extends ScalarFunction {
414
@Override
415
public boolean isDeterministic() {
416
return true; // Enables constant folding and other optimizations
417
}
418
419
public String eval(String input) {
420
return input.toUpperCase();
421
}
422
}
423
```