0
# User-Defined Functions
1
2
Flink Table API supports custom functions to extend the built-in function library. You can create scalar functions, table functions, aggregate functions, and table aggregate functions to implement domain-specific logic.
3
4
## Capabilities
5
6
### Scalar Functions
7
8
User-defined scalar functions take zero, one, or multiple scalar values and return a single scalar value.
9
10
```java { .api }
11
/**
12
* Base class for scalar user-defined functions
13
* Users must extend this class and implement eval() methods
14
*/
15
public abstract class ScalarFunction extends UserDefinedFunction {
16
/**
17
* Users implement one or more eval methods with different signatures
18
* The eval method name is fixed - Flink uses reflection to find matching methods
19
*
20
* Example signatures:
21
* public String eval(String input);
22
* public Integer eval(Integer a, Integer b);
23
* public Double eval(Double... values);
24
*/
25
}
26
```
27
28
**Usage Examples:**
29
30
```java
31
// Custom string manipulation function
32
public class StringHashFunction extends ScalarFunction {
33
public String eval(String input) {
34
if (input == null) {
35
return null;
36
}
37
return "hash_" + Math.abs(input.hashCode());
38
}
39
40
public String eval(String input, String prefix) {
41
if (input == null) {
42
return null;
43
}
44
return prefix + "_" + Math.abs(input.hashCode());
45
}
46
}
47
48
// Register and use the function
49
StringHashFunction hashFunc = new StringHashFunction();
50
tableEnv.createTemporaryFunction("string_hash", hashFunc);
51
52
// Use in Table API
53
Table result = sourceTable.select(
54
$("id"),
55
$("name"),
56
call("string_hash", $("name")).as("name_hash"),
57
call("string_hash", $("name"), lit("user")).as("prefixed_hash")
58
);
59
60
// Use in SQL
61
Table sqlResult = tableEnv.sqlQuery(
62
"SELECT id, name, string_hash(name) as name_hash " +
63
"FROM source_table"
64
);
65
66
// Mathematical function example
67
public class PowerFunction extends ScalarFunction {
68
public Double eval(Double base, Double exponent) {
69
if (base == null || exponent == null) {
70
return null;
71
}
72
return Math.pow(base, exponent);
73
}
74
75
public Long eval(Long base, Long exponent) {
76
if (base == null || exponent == null) {
77
return null;
78
}
79
return (long) Math.pow(base, exponent);
80
}
81
}
82
```
83
84
### Table Functions
85
86
User-defined table functions take zero, one, or multiple scalar values and return multiple rows (table).
87
88
```java { .api }
89
/**
90
* Base class for table user-defined functions
91
* Users must extend this class and implement eval() methods
92
* Use collect() to emit output rows
93
*/
94
public abstract class TableFunction<T> extends UserDefinedFunction {
95
/**
96
* Emits a result row to the output table
97
* @param result Row data to emit
98
*/
99
protected void collect(T result);
100
101
/**
102
* Users implement eval methods that call collect() for each output row
103
* The eval method name is fixed - Flink uses reflection to find matching methods
104
*/
105
}
106
```
107
108
**Usage Examples:**
109
110
```java
111
// Split string into multiple rows
112
public class SplitFunction extends TableFunction<Row> {
113
public void eval(String str, String separator) {
114
if (str == null || separator == null) {
115
return;
116
}
117
118
String[] parts = str.split(separator);
119
for (int i = 0; i < parts.length; i++) {
120
collect(Row.of(parts[i].trim(), i));
121
}
122
}
123
}
124
125
// Register and use table function
126
SplitFunction splitFunc = new SplitFunction();
127
tableEnv.createTemporaryFunction("split_string", splitFunc);
128
129
// Use with LATERAL TABLE in SQL
130
Table result = tableEnv.sqlQuery(
131
"SELECT t.id, t.name, s.word, s.position " +
132
"FROM source_table t, " +
133
"LATERAL TABLE(split_string(t.tags, ',')) AS s(word, position)"
134
);
135
136
// Use in Table API with joinLateral
137
Table lateralResult = sourceTable
138
.joinLateral(call("split_string", $("tags"), lit(",")))
139
.select($("id"), $("name"), $("f0").as("word"), $("f1").as("position"));
140
141
// Generate series function
142
public class GenerateSeriesFunction extends TableFunction<Integer> {
143
public void eval(Integer start, Integer end) {
144
if (start == null || end == null) {
145
return;
146
}
147
148
for (int i = start; i <= end; i++) {
149
collect(i);
150
}
151
}
152
153
public void eval(Integer start, Integer end, Integer step) {
154
if (start == null || end == null || step == null || step == 0) {
155
return;
156
}
157
158
if (step > 0) {
159
for (int i = start; i <= end; i += step) {
160
collect(i);
161
}
162
} else {
163
for (int i = start; i >= end; i += step) {
164
collect(i);
165
}
166
}
167
}
168
}
169
```
170
171
### Aggregate Functions
172
173
User-defined aggregate functions take multiple rows and compute a single aggregate result.
174
175
```java { .api }
176
/**
177
* Base class for aggregate user-defined functions
178
* Users must implement accumulator management methods
179
*/
180
public abstract class AggregateFunction<T, ACC> extends UserDefinedFunction {
181
/**
182
* Creates a new accumulator for aggregation
183
* @return New accumulator instance
184
*/
185
public abstract ACC createAccumulator();
186
187
/**
188
* Accumulates input values into the accumulator
189
* @param accumulator Current accumulator state
190
* @param input Input values to accumulate (one or more parameters)
191
*/
192
public abstract void accumulate(ACC accumulator, Object... input);
193
194
/**
195
* Extracts the final result from the accumulator
196
* @param accumulator Final accumulator state
197
* @return Aggregate result
198
*/
199
public abstract T getValue(ACC accumulator);
200
201
/**
202
* Retracts input values from the accumulator (optional)
203
* Only needed for streaming scenarios with retractions
204
* @param accumulator Current accumulator state
205
* @param input Input values to retract
206
*/
207
public void retract(ACC accumulator, Object... input) {
208
// Optional - implement if retraction is needed
209
}
210
211
/**
212
* Merges two accumulators (optional)
213
* Needed for session windows and some optimization scenarios
214
* @param accumulator Target accumulator
215
* @param other Source accumulator to merge from
216
*/
217
public void merge(ACC accumulator, Iterable<ACC> other) {
218
// Optional - implement if merging is needed
219
}
220
}
221
```
222
223
**Usage Examples:**
224
225
```java
226
// Custom average function with accumulator
227
public class WeightedAverageFunction extends AggregateFunction<Double, WeightedAverageAccumulator> {
228
229
// Accumulator class
230
public static class WeightedAverageAccumulator {
231
public double sum = 0.0;
232
public double weightSum = 0.0;
233
}
234
235
@Override
236
public WeightedAverageAccumulator createAccumulator() {
237
return new WeightedAverageAccumulator();
238
}
239
240
@Override
241
public void accumulate(WeightedAverageAccumulator acc, Double value, Double weight) {
242
if (value != null && weight != null) {
243
acc.sum += value * weight;
244
acc.weightSum += weight;
245
}
246
}
247
248
@Override
249
public Double getValue(WeightedAverageAccumulator acc) {
250
if (acc.weightSum == 0.0) {
251
return null;
252
}
253
return acc.sum / acc.weightSum;
254
}
255
256
@Override
257
public void retract(WeightedAverageAccumulator acc, Double value, Double weight) {
258
if (value != null && weight != null) {
259
acc.sum -= value * weight;
260
acc.weightSum -= weight;
261
}
262
}
263
264
@Override
265
public void merge(WeightedAverageAccumulator acc, Iterable<WeightedAverageAccumulator> others) {
266
for (WeightedAverageAccumulator other : others) {
267
acc.sum += other.sum;
268
acc.weightSum += other.weightSum;
269
}
270
}
271
}
272
273
// Register and use aggregate function
274
WeightedAverageFunction weightedAvg = new WeightedAverageFunction();
275
tableEnv.createTemporaryFunction("weighted_avg", weightedAvg);
276
277
// Use in Table API
278
Table result = sourceTable
279
.groupBy($("category"))
280
.select(
281
$("category"),
282
call("weighted_avg", $("price"), $("quantity")).as("weighted_avg_price")
283
);
284
285
// Use in SQL
286
Table sqlResult = tableEnv.sqlQuery(
287
"SELECT category, weighted_avg(price, quantity) as weighted_avg_price " +
288
"FROM source_table " +
289
"GROUP BY category"
290
);
291
292
// Custom string concatenation aggregate
293
public class StringConcatFunction extends AggregateFunction<String, StringBuilder> {
294
295
@Override
296
public StringBuilder createAccumulator() {
297
return new StringBuilder();
298
}
299
300
@Override
301
public void accumulate(StringBuilder acc, String value, String separator) {
302
if (value != null) {
303
if (acc.length() > 0 && separator != null) {
304
acc.append(separator);
305
}
306
acc.append(value);
307
}
308
}
309
310
@Override
311
public String getValue(StringBuilder acc) {
312
return acc.toString();
313
}
314
}
315
```
316
317
### Table Aggregate Functions
318
319
Table aggregate functions take multiple rows and return multiple rows (like table functions but with aggregation semantics).
320
321
```java { .api }
322
/**
323
* Base class for table aggregate functions
324
* Combines aspects of both table functions and aggregate functions
325
*/
326
public abstract class TableAggregateFunction<T, ACC> extends UserDefinedFunction {
327
/**
328
* Creates a new accumulator
329
* @return New accumulator instance
330
*/
331
public abstract ACC createAccumulator();
332
333
/**
334
* Accumulates input values
335
* @param accumulator Current accumulator state
336
* @param input Input values
337
*/
338
public abstract void accumulate(ACC accumulator, Object... input);
339
340
/**
341
* Emits result rows from the final accumulator state
342
* @param accumulator Final accumulator state
343
* @param out Collector for emitting output rows
344
*/
345
public abstract void emitValue(ACC accumulator, Collector<T> out);
346
347
/**
348
* Emits updated result rows during streaming processing
349
* @param accumulator Current accumulator state
350
* @param out Collector for emitting output rows
351
*/
352
public void emitUpdateWithRetract(ACC accumulator, RetractableCollector<T> out) {
353
// Optional - implement for streaming scenarios with retractions
354
}
355
}
356
```
357
358
**Usage Examples:**
359
360
```java
361
// Top-N function returning multiple rows per group
362
public class TopNFunction extends TableAggregateFunction<Row, TopNAccumulator> {
363
364
private int n;
365
366
public TopNFunction(int n) {
367
this.n = n;
368
}
369
370
public static class TopNAccumulator {
371
public List<Double> topValues = new ArrayList<>();
372
}
373
374
@Override
375
public TopNAccumulator createAccumulator() {
376
return new TopNAccumulator();
377
}
378
379
@Override
380
public void accumulate(TopNAccumulator acc, Double value) {
381
if (value != null) {
382
acc.topValues.add(value);
383
acc.topValues.sort((a, b) -> Double.compare(b, a)); // Descending order
384
385
// Keep only top N values
386
if (acc.topValues.size() > n) {
387
acc.topValues = acc.topValues.subList(0, n);
388
}
389
}
390
}
391
392
@Override
393
public void emitValue(TopNAccumulator acc, Collector<Row> out) {
394
for (int i = 0; i < acc.topValues.size(); i++) {
395
out.collect(Row.of(acc.topValues.get(i), i + 1));
396
}
397
}
398
}
399
400
// Register and use table aggregate function
401
TopNFunction topN = new TopNFunction(3);
402
tableEnv.createTemporaryFunction("top_n", topN);
403
404
// Use in Table API
405
Table topResults = sourceTable
406
.groupBy($("category"))
407
.flatAggregate(call("top_n", $("score")))
408
.select($("category"), $("f0").as("score"), $("f1").as("rank"));
409
```
410
411
### Function Registration Methods
412
413
Various ways to register functions in the table environment.
414
415
```java { .api }
416
/**
417
* Register a function instance with the given name
418
* @param name Function name for SQL and Table API usage
419
* @param function Function instance
420
*/
421
public void createTemporaryFunction(String name, UserDefinedFunction function);
422
423
/**
424
* Register a function class by name
425
* @param name Function name
426
* @param functionClass Function class
427
*/
428
public void createTemporarySystemFunction(String name, Class<? extends UserDefinedFunction> functionClass);
429
430
/**
431
* Register function in specific catalog and database
432
* @param path Catalog path (catalog.database.function)
433
* @param function Function instance
434
*/
435
public void createFunction(String path, UserDefinedFunction function);
436
437
/**
438
* Drop a temporary function
439
* @param name Function name to drop
440
* @return true if function was dropped
441
*/
442
public boolean dropTemporaryFunction(String name);
443
```
444
445
**Usage Examples:**
446
447
```java
448
// Register with instance
449
MyCustomFunction customFunc = new MyCustomFunction();
450
tableEnv.createTemporaryFunction("my_func", customFunc);
451
452
// Register with class
453
tableEnv.createTemporarySystemFunction("power_func", PowerFunction.class);
454
455
// Register in specific catalog
456
tableEnv.createFunction("my_catalog.my_db.custom_func", customFunc);
457
458
// Register via SQL DDL
459
tableEnv.executeSql(
460
"CREATE TEMPORARY FUNCTION my_hash AS 'com.example.StringHashFunction'"
461
);
462
463
// Drop function
464
boolean dropped = tableEnv.dropTemporaryFunction("my_func");
465
466
// Drop via SQL
467
tableEnv.executeSql("DROP TEMPORARY FUNCTION my_hash");
468
```
469
470
### Advanced Function Features
471
472
Advanced patterns for function development and optimization.
473
474
```java { .api }
475
/**
476
* Function with type inference - override getResultType for complex return types
477
*/
478
public abstract class UserDefinedFunction {
479
/**
480
* Provides type information for the function result
481
* @param signature Method signature information
482
* @return TypeInformation for the result type
483
*/
484
public TypeInformation<?> getResultType(Class<?>[] signature) {
485
// Override to provide custom type information
486
return null;
487
}
488
489
/**
490
* Provides parameter type information
491
* @param signature Method signature information
492
* @return Array of TypeInformation for parameters
493
*/
494
public TypeInformation<?>[] getParameterTypes(Class<?>[] signature) {
495
// Override to provide custom parameter type information
496
return null;
497
}
498
}
499
```
500
501
**Usage Examples:**
502
503
```java
504
// Function with custom type handling
505
public class ComplexReturnFunction extends ScalarFunction {
506
507
// Return a complex type (Row with multiple fields)
508
public Row eval(String input) {
509
if (input == null) {
510
return null;
511
}
512
513
String[] parts = input.split(":");
514
return Row.of(parts[0], Integer.parseInt(parts[1]), Double.parseDouble(parts[2]));
515
}
516
517
@Override
518
public TypeInformation<?> getResultType(Class<?>[] signature) {
519
return Types.ROW(
520
Types.STRING, // field 0
521
Types.INT, // field 1
522
Types.DOUBLE // field 2
523
);
524
}
525
}
526
527
// Function with configuration
528
public class ConfigurableFunction extends ScalarFunction {
529
private String prefix;
530
531
public ConfigurableFunction(String prefix) {
532
this.prefix = prefix;
533
}
534
535
public String eval(String input) {
536
return prefix + "_" + input;
537
}
538
}
539
540
// Stateful function with open/close lifecycle
541
public class ResourceFunction extends ScalarFunction {
542
private transient SomeResource resource;
543
544
@Override
545
public void open(FunctionContext context) throws Exception {
546
super.open(context);
547
// Initialize resources (database connections, etc.)
548
this.resource = new SomeResource();
549
}
550
551
@Override
552
public void close() throws Exception {
553
super.close();
554
// Clean up resources
555
if (resource != null) {
556
resource.close();
557
}
558
}
559
560
public String eval(String input) {
561
return resource.process(input);
562
}
563
}
564
```
565
566
### Best Practices and Performance
567
568
Guidelines for efficient function implementation.
569
570
**Usage Examples:**
571
572
```java
573
// Efficient function with null handling
574
public class EfficientStringFunction extends ScalarFunction {
575
576
// Handle null inputs early
577
public String eval(String input) {
578
if (input == null) {
579
return null;
580
}
581
582
// Avoid creating unnecessary objects
583
if (input.isEmpty()) {
584
return "";
585
}
586
587
// Use StringBuilder for string concatenation
588
StringBuilder sb = new StringBuilder(input.length() + 10);
589
sb.append("processed_").append(input);
590
return sb.toString();
591
}
592
593
// Provide overloaded methods for different input types
594
public String eval(Integer input) {
595
if (input == null) {
596
return null;
597
}
598
return "processed_" + input;
599
}
600
}
601
602
// Reusable accumulator for better performance
603
public class EfficientAggregateFunction extends AggregateFunction<Double, EfficientAggregateFunction.Acc> {
604
605
public static class Acc {
606
public double sum = 0.0;
607
public long count = 0L;
608
609
// Reset for reuse
610
public void reset() {
611
sum = 0.0;
612
count = 0L;
613
}
614
}
615
616
@Override
617
public Acc createAccumulator() {
618
return new Acc();
619
}
620
621
@Override
622
public void accumulate(Acc acc, Double value) {
623
if (value != null) {
624
acc.sum += value;
625
acc.count++;
626
}
627
}
628
629
@Override
630
public Double getValue(Acc acc) {
631
return acc.count > 0 ? acc.sum / acc.count : null;
632
}
633
}
634
```