0
# User-Defined Functions
1
2
Flink's Table API provides a comprehensive framework for creating custom scalar, table, and aggregate functions. UDFs enable extending the built-in function library with domain-specific logic while maintaining type safety and performance optimization.
3
4
## Capabilities
5
6
### Scalar Functions
7
8
Functions that take one or more input values and return a single output value.
9
10
```java { .api }
11
/**
12
* Base class for user-defined scalar functions
13
* Users extend this class and implement eval() methods
14
*/
15
abstract class ScalarFunction extends UserDefinedFunction {
16
// Users implement one or more eval() methods with different signatures
17
// public ReturnType eval(InputType1 input1, InputType2 input2, ...);
18
19
/**
20
* Gets the function context for accessing runtime information
21
* @return FunctionContext providing runtime context
22
*/
23
FunctionContext getFunctionContext();
24
}
25
26
/**
27
* Base class for all user-defined functions
28
*/
29
abstract class UserDefinedFunction implements FunctionDefinition {
30
/**
31
* Optional method called when function is opened
32
* @param context Function context for initialization
33
*/
34
void open(FunctionContext context) throws Exception;
35
36
/**
37
* Optional method called when function is closed
38
*/
39
void close() throws Exception;
40
41
/**
42
* Indicates whether the function is deterministic
43
* @return true if function always returns same result for same inputs
44
*/
45
boolean isDeterministic();
46
47
/**
48
* Gets the type inference for this function
49
* @return TypeInference defining input/output types
50
*/
51
TypeInference getTypeInference();
52
}
53
```
54
55
**Usage Examples:**
56
57
```java
58
// Simple scalar function
59
public class UpperCaseFunction extends ScalarFunction {
60
public String eval(String input) {
61
return input != null ? input.toUpperCase() : null;
62
}
63
}
64
65
// Multiple eval signatures for overloading
66
public class AddFunction extends ScalarFunction {
67
public Integer eval(Integer a, Integer b) {
68
return (a != null && b != null) ? a + b : null;
69
}
70
71
public Double eval(Double a, Double b) {
72
return (a != null && b != null) ? a + b : null;
73
}
74
75
public String eval(String a, String b) {
76
return (a != null && b != null) ? a + b : null;
77
}
78
}
79
80
// Function with context and lifecycle
81
public class HashFunction extends ScalarFunction {
82
private MessageDigest md5;
83
84
@Override
85
public void open(FunctionContext context) throws Exception {
86
md5 = MessageDigest.getInstance("MD5");
87
}
88
89
public String eval(String input) {
90
if (input == null) return null;
91
92
byte[] hash = md5.digest(input.getBytes());
93
return DatatypeConverter.printHexBinary(hash);
94
}
95
}
96
97
// Registration and usage
98
tableEnv.createTemporarySystemFunction("my_upper", new UpperCaseFunction());
99
Table result = tableEnv.sqlQuery("SELECT my_upper(name) FROM users");
100
```
101
102
### Table Functions
103
104
Functions that take zero or more input values and return multiple rows (table-valued functions).
105
106
```java { .api }
107
/**
108
* Base class for user-defined table functions
109
* @param <T> Type of output rows
110
*/
111
abstract class TableFunction<T> extends UserDefinedFunction {
112
// Users implement one or more eval() methods that call collect()
113
// public void eval(InputType1 input1, InputType2 input2, ...);
114
115
/**
116
* Emits a result row from the table function
117
* @param result Result row to emit
118
*/
119
protected void collect(T result);
120
121
/**
122
* Gets the result type of this table function
123
* @return DataType representing the output row structure
124
*/
125
DataType getResultType();
126
}
127
```
128
129
**Usage Examples:**
130
131
```java
132
// Split string into multiple rows
133
@FunctionHint(output = @DataTypeHint("ROW<word STRING>"))
134
public class SplitFunction extends TableFunction<Row> {
135
136
public void eval(String str) {
137
if (str != null) {
138
for (String word : str.split("\\s+")) {
139
collect(Row.of(word));
140
}
141
}
142
}
143
}
144
145
// Generate number sequence
146
@FunctionHint(output = @DataTypeHint("ROW<num INT>"))
147
public class RangeFunction extends TableFunction<Row> {
148
149
public void eval(Integer start, Integer end) {
150
if (start != null && end != null) {
151
for (int i = start; i <= end; i++) {
152
collect(Row.of(i));
153
}
154
}
155
}
156
}
157
158
// Registration and usage
159
tableEnv.createTemporarySystemFunction("split_words", new SplitFunction());
160
161
// SQL usage with LATERAL TABLE
162
Table result = tableEnv.sqlQuery(
163
"SELECT t.word, COUNT(*) as word_count " +
164
"FROM documents d, LATERAL TABLE(split_words(d.content)) AS t(word) " +
165
"GROUP BY t.word"
166
);
167
168
// Table API usage
169
Table documents = tableEnv.from("documents");
170
Table words = documents
171
.joinLateral(call("split_words", $("content")).as("word"))
172
.select($("doc_id"), $("word"));
173
```
174
175
### Aggregate Functions
176
177
Functions that aggregate multiple input rows into a single output value.
178
179
```java { .api }
180
/**
181
* Base class for user-defined aggregate functions
182
* @param <T> Type of the final result
183
* @param <ACC> Type of the accumulator
184
*/
185
abstract class AggregateFunction<T, ACC> extends ImperativeAggregateFunction<T, ACC> {
186
/**
187
* Creates a new accumulator for aggregation
188
* @return New accumulator instance
189
*/
190
public abstract ACC createAccumulator();
191
192
/**
193
* Extracts the final result from the accumulator
194
* @param accumulator Final accumulator state
195
* @return Aggregation result
196
*/
197
public abstract T getValue(ACC accumulator);
198
199
// Users implement accumulate() method(s)
200
// public void accumulate(ACC accumulator, InputType1 input1, InputType2 input2, ...);
201
202
/**
203
* Optional: Retracts a value from the accumulator (for changelog streams)
204
* @param accumulator Accumulator to retract from
205
* @param input Input values to retract
206
*/
207
// public void retract(ACC accumulator, InputType1 input1, InputType2 input2, ...);
208
209
/**
210
* Optional: Merges two accumulators (for batch processing)
211
* @param accumulator Target accumulator
212
* @param accumulators Source accumulators to merge
213
*/
214
// public void merge(ACC accumulator, Iterable<ACC> accumulators);
215
}
216
```
217
218
**Usage Examples:**
219
220
```java
221
// Weighted average aggregate function
222
public class WeightedAvgAccumulator {
223
public double sum = 0.0;
224
public double weightSum = 0.0;
225
}
226
227
public class WeightedAverage extends AggregateFunction<Double, WeightedAvgAccumulator> {
228
229
@Override
230
public WeightedAvgAccumulator createAccumulator() {
231
return new WeightedAvgAccumulator();
232
}
233
234
public void accumulate(WeightedAvgAccumulator acc, Double value, Double weight) {
235
if (value != null && weight != null) {
236
acc.sum += value * weight;
237
acc.weightSum += weight;
238
}
239
}
240
241
@Override
242
public Double getValue(WeightedAvgAccumulator acc) {
243
return acc.weightSum != 0 ? acc.sum / acc.weightSum : null;
244
}
245
246
public void retract(WeightedAvgAccumulator acc, Double value, Double weight) {
247
if (value != null && weight != null) {
248
acc.sum -= value * weight;
249
acc.weightSum -= weight;
250
}
251
}
252
253
public void merge(WeightedAvgAccumulator acc, Iterable<WeightedAvgAccumulator> others) {
254
for (WeightedAvgAccumulator other : others) {
255
acc.sum += other.sum;
256
acc.weightSum += other.weightSum;
257
}
258
}
259
}
260
261
// Registration and usage
262
tableEnv.createTemporarySystemFunction("weighted_avg", new WeightedAverage());
263
264
Table result = tableEnv.sqlQuery(
265
"SELECT product_category, weighted_avg(price, quantity) as avg_price " +
266
"FROM sales " +
267
"GROUP BY product_category"
268
);
269
```
270
271
### Table Aggregate Functions
272
273
Functions that aggregate multiple input rows into multiple output rows.
274
275
```java { .api }
276
/**
277
* Base class for user-defined table aggregate functions
278
* @param <T> Type of the output rows
279
* @param <ACC> Type of the accumulator
280
*/
281
abstract class TableAggregateFunction<T, ACC> extends ImperativeAggregateFunction<T, ACC> {
282
/**
283
* Creates a new accumulator for aggregation
284
* @return New accumulator instance
285
*/
286
public abstract ACC createAccumulator();
287
288
// Users implement accumulate() method(s)
289
// public void accumulate(ACC accumulator, InputType1 input1, InputType2 input2, ...);
290
291
/**
292
* Emits the final result from the accumulator
293
* @param accumulator Final accumulator state
294
* @param out Collector for emitting results
295
*/
296
public abstract void emitValue(ACC accumulator, Collector<T> out);
297
298
/**
299
* Optional: Emits incremental updates with retraction
300
* @param accumulator Current accumulator state
301
* @param out Collector for emitting results with retraction
302
*/
303
// public void emitUpdateWithRetract(ACC accumulator, RetractableCollector<T> out);
304
}
305
```
306
307
**Usage Examples:**
308
309
```java
310
// Top N aggregate function
311
public class TopNAccumulator {
312
public List<Tuple2<Integer, String>> topN = new ArrayList<>();
313
public int n;
314
}
315
316
public class TopN extends TableAggregateFunction<Tuple2<Integer, String>, TopNAccumulator> {
317
private int n;
318
319
public TopN(int n) {
320
this.n = n;
321
}
322
323
@Override
324
public TopNAccumulator createAccumulator() {
325
TopNAccumulator acc = new TopNAccumulator();
326
acc.n = n;
327
return acc;
328
}
329
330
public void accumulate(TopNAccumulator acc, Integer score, String name) {
331
if (score != null && name != null) {
332
acc.topN.add(Tuple2.of(score, name));
333
acc.topN.sort((a, b) -> b.f0.compareTo(a.f0)); // Sort descending
334
if (acc.topN.size() > acc.n) {
335
acc.topN.remove(acc.topN.size() - 1);
336
}
337
}
338
}
339
340
@Override
341
public void emitValue(TopNAccumulator acc, Collector<Tuple2<Integer, String>> out) {
342
for (Tuple2<Integer, String> item : acc.topN) {
343
out.collect(item);
344
}
345
}
346
}
347
348
// Registration and usage
349
tableEnv.createTemporarySystemFunction("top3", new TopN(3));
350
351
Table result = tableEnv.sqlQuery(
352
"SELECT score, name " +
353
"FROM (SELECT score, name FROM players) " +
354
"GROUP BY () " +
355
"FLAT_AGGREGATE(top3(score, name)) AS (score, name)"
356
);
357
```
358
359
### Async Functions
360
361
Functions that perform asynchronous operations for I/O bound tasks.
362
363
```java { .api }
364
/**
365
* Async scalar function for I/O bound operations
366
*/
367
class AsyncScalarFunction extends UserDefinedFunction {
368
// Users implement evalAsync() methods that return CompletableFuture
369
// public CompletableFuture<ReturnType> evalAsync(InputType1 input1, InputType2 input2, ...);
370
}
371
372
/**
373
* Base class for async table functions
374
* @param <T> Type of output rows
375
*/
376
abstract class AsyncTableFunction<T> extends UserDefinedFunction {
377
// Users implement evalAsync() methods that use AsyncCollector
378
// public void evalAsync(InputType1 input1, AsyncCollector<T> collector);
379
}
380
```
381
382
**Usage Examples:**
383
384
```java
385
// Async HTTP lookup function
386
public class HttpLookupFunction extends AsyncScalarFunction {
387
private transient AsyncHttpClient httpClient;
388
389
@Override
390
public void open(FunctionContext context) throws Exception {
391
httpClient = Dsl.asyncHttpClient();
392
}
393
394
public CompletableFuture<String> evalAsync(String url) {
395
if (url == null) {
396
return CompletableFuture.completedFuture(null);
397
}
398
399
return httpClient
400
.prepareGet(url)
401
.execute()
402
.toCompletableFuture()
403
.thenApply(response -> response.getResponseBody());
404
}
405
406
@Override
407
public void close() throws Exception {
408
if (httpClient != null) {
409
httpClient.close();
410
}
411
}
412
}
413
414
// Registration and usage
415
tableEnv.createTemporarySystemFunction("http_get", new HttpLookupFunction());
416
```
417
418
### Function Type Inference
419
420
Advanced type inference for complex function signatures.
421
422
```java { .api }
423
/**
424
* Annotation for providing type hints to functions
425
*/
426
@Target({ElementType.TYPE, ElementType.METHOD})
427
@Retention(RetentionPolicy.RUNTIME)
428
@interface FunctionHint {
429
DataTypeHint[] input() default {};
430
DataTypeHint output() default @DataTypeHint();
431
boolean isDeterministic() default true;
432
}
433
434
/**
435
* Annotation for specifying data types
436
*/
437
@Target({ElementType.TYPE, ElementType.METHOD, ElementType.FIELD})
438
@Retention(RetentionPolicy.RUNTIME)
439
@interface DataTypeHint {
440
String value() default "";
441
Class<?> bridgedTo() default void.class;
442
boolean allowRawGlobally() default false;
443
}
444
```
445
446
**Usage Examples:**
447
448
```java
449
// Function with explicit type hints
450
@FunctionHint(
451
input = {@DataTypeHint("STRING"), @DataTypeHint("INT")},
452
output = @DataTypeHint("ARRAY<STRING>")
453
)
454
public class RepeatFunction extends TableFunction<String[]> {
455
456
public void eval(String str, Integer count) {
457
if (str != null && count != null && count > 0) {
458
String[] result = new String[count];
459
Arrays.fill(result, str);
460
collect(result);
461
}
462
}
463
}
464
465
// Complex return type with row structure
466
@FunctionHint(output = @DataTypeHint("ROW<id BIGINT, name STRING, score DOUBLE>"))
467
public class ParseResultFunction extends TableFunction<Row> {
468
469
public void eval(String jsonString) {
470
// Parse JSON and emit structured rows
471
JsonObject json = JsonParser.parseString(jsonString).getAsJsonObject();
472
collect(Row.of(
473
json.get("id").getAsLong(),
474
json.get("name").getAsString(),
475
json.get("score").getAsDouble()
476
));
477
}
478
}
479
```
480
481
## Types
482
483
### Function Base Classes
484
485
```java { .api }
486
abstract class UserDefinedFunction implements FunctionDefinition {
487
void open(FunctionContext context) throws Exception;
488
void close() throws Exception;
489
boolean isDeterministic();
490
TypeInference getTypeInference();
491
}
492
493
abstract class ScalarFunction extends UserDefinedFunction {
494
// Implementation-specific eval() methods
495
}
496
497
abstract class TableFunction<T> extends UserDefinedFunction {
498
protected void collect(T result);
499
DataType getResultType();
500
}
501
502
abstract class AggregateFunction<T, ACC> extends ImperativeAggregateFunction<T, ACC> {
503
public abstract ACC createAccumulator();
504
public abstract T getValue(ACC accumulator);
505
// Optional: public void retract(ACC accumulator, ...);
506
// Optional: public void merge(ACC accumulator, Iterable<ACC> others);
507
}
508
509
abstract class TableAggregateFunction<T, ACC> extends ImperativeAggregateFunction<T, ACC> {
510
public abstract ACC createAccumulator();
511
public abstract void emitValue(ACC accumulator, Collector<T> out);
512
// Optional: public void emitUpdateWithRetract(ACC accumulator, RetractableCollector<T> out);
513
}
514
```
515
516
### Function Context
517
518
```java { .api }
519
interface FunctionContext {
520
MetricGroup getMetricGroup();
521
int getIndexOfThisSubtask();
522
int getNumberOfParallelSubtasks();
523
String getJobParameter(String key, String defaultValue);
524
525
// Access to distributed cache
526
File getCachedFile(String name);
527
}
528
```
529
530
### Collectors and Async Support
531
532
```java { .api }
533
interface Collector<T> {
534
void collect(T record);
535
}
536
537
interface RetractableCollector<T> extends Collector<T> {
538
void retract(T record);
539
}
540
541
interface AsyncCollector<T> {
542
void collect(Collection<T> result);
543
void complete(Collection<T> result);
544
}
545
```