0
# User-Defined Functions
1
2
Framework for extending Apache Flink with custom scalar functions, table functions, and aggregate functions for specialized processing requirements.
3
4
## Capabilities
5
6
### ScalarFunction
7
8
Base class for creating custom scalar functions that take one or more input parameters and return a single result value.
9
10
```java { .api }
11
/**
12
* Base class for user-defined scalar functions
13
*/
14
public abstract class ScalarFunction extends UserDefinedFunction {
15
/**
16
* Evaluation method that must be implemented for the function logic.
17
* Method can be overloaded for different parameter combinations.
18
* @param args Input arguments (types must match function signature)
19
* @return Computed result value
20
*/
21
public abstract Object eval(Object... args);
22
23
/**
24
* Optional method to specify the result type when it cannot be inferred
25
* @param signature Array of argument types
26
* @return DataType of the function result
27
*/
28
public DataType getResultType(DataType[] signature);
29
30
/**
31
* Optional method to specify type inference for parameters
32
* @return TypeInference specification
33
*/
34
public TypeInference getTypeInference();
35
}
36
```
37
38
**ScalarFunction Example:**
39
40
```java
41
// Custom hash function
42
public class HashFunction extends ScalarFunction {
43
public String eval(String input) {
44
if (input == null) {
45
return null;
46
}
47
return Integer.toHexString(input.hashCode());
48
}
49
50
// Overloaded version for multiple inputs
51
public String eval(String input1, String input2) {
52
if (input1 == null || input2 == null) {
53
return null;
54
}
55
return Integer.toHexString((input1 + input2).hashCode());
56
}
57
}
58
59
// Register and use the function
60
tEnv.createTemporarySystemFunction("hash", HashFunction.class);
61
62
Table result = orders
63
.select($("order_id"),
64
call("hash", $("customer_email")).as("customer_hash"),
65
call("hash", $("order_id").cast(DataTypes.STRING()),
66
$("customer_email")).as("order_hash"));
67
```
68
69
### TableFunction
70
71
Base class for creating custom table functions that take one or more input parameters and return multiple rows (one-to-many transformation).
72
73
```java { .api }
74
/**
75
* Base class for user-defined table functions
76
* @param <T> Type of the output rows
77
*/
78
public abstract class TableFunction<T> extends UserDefinedFunction {
79
/**
80
* Evaluation method that must be implemented for the function logic.
81
* Use collect() method to emit output rows.
82
* @param args Input arguments (types must match function signature)
83
*/
84
public abstract void eval(Object... args);
85
86
/**
87
* Emit an output row from the table function
88
* @param result Row to emit
89
*/
90
protected void collect(T result);
91
92
/**
93
* Optional method to specify the result type when it cannot be inferred
94
* @param signature Array of argument types
95
* @return DataType of the function result rows
96
*/
97
public DataType getResultType(DataType[] signature);
98
}
99
```
100
101
**TableFunction Example:**
102
103
```java
104
// Function to split comma-separated values into rows
105
@FunctionHint(output = @DataTypeHint("STRING"))
106
public class SplitFunction extends TableFunction<String> {
107
public void eval(String str) {
108
if (str != null) {
109
for (String s : str.split(",")) {
110
collect(s.trim());
111
}
112
}
113
}
114
}
115
116
// Function returning structured rows
117
@FunctionHint(output = @DataTypeHint("ROW<word STRING, length INT>"))
118
public class WordAnalyzer extends TableFunction<Row> {
119
public void eval(String sentence) {
120
if (sentence != null) {
121
for (String word : sentence.split("\\s+")) {
122
collect(Row.of(word, word.length()));
123
}
124
}
125
}
126
}
127
128
// Register and use table functions
129
tEnv.createTemporarySystemFunction("split", SplitFunction.class);
130
tEnv.createTemporarySystemFunction("analyze_words", WordAnalyzer.class);
131
132
// Use with LATERAL JOIN
133
Table result = orders
134
.joinLateral(call("split", $("product_tags")).as("tag"))
135
.select($("order_id"), $("tag"));
136
137
// Use with LEFT JOIN LATERAL for optional results
138
Table analysis = documents
139
.leftOuterJoinLateral(call("analyze_words", $("title")).as("word", "length"))
140
.select($("document_id"), $("word"), $("length"));
141
```
142
143
### AggregateFunction
144
145
Base class for creating custom aggregate functions that accumulate values over multiple rows and return a single result.
146
147
```java { .api }
148
/**
149
* Base class for user-defined aggregate functions
150
* @param <T> Type of the aggregation result
151
* @param <ACC> Type of the accumulator used during aggregation
152
*/
153
public abstract class AggregateFunction<T, ACC> extends UserDefinedFunction {
154
/**
155
* Create and initialize a new accumulator
156
* @return New accumulator instance
157
*/
158
public abstract ACC createAccumulator();
159
160
/**
161
* Accumulate input values into the accumulator
162
* @param accumulator Current accumulator state
163
* @param args Input values to accumulate
164
*/
165
public abstract void accumulate(ACC accumulator, Object... args);
166
167
/**
168
* Extract the final result from the accumulator
169
* @param accumulator Final accumulator state
170
* @return Aggregated result
171
*/
172
public abstract T getValue(ACC accumulator);
173
174
/**
175
* Retract input values from the accumulator (for streaming updates)
176
* @param accumulator Current accumulator state
177
* @param args Input values to retract
178
*/
179
public void retract(ACC accumulator, Object... args) {
180
// Optional: implement for streaming scenarios with retractions
181
}
182
183
/**
184
* Merge two accumulators (for distributed aggregation)
185
* @param accumulator Target accumulator
186
* @param others Accumulators to merge from
187
*/
188
public void merge(ACC accumulator, Iterable<ACC> others) {
189
// Optional: implement for distributed scenarios
190
}
191
192
/**
193
* Reset the accumulator to initial state
194
* @param accumulator Accumulator to reset
195
*/
196
public void resetAccumulator(ACC accumulator) {
197
// Optional: implement for reusing accumulators
198
}
199
}
200
```
201
202
**AggregateFunction Example:**
203
204
```java
205
// Custom weighted average function
206
public class WeightedAvg extends AggregateFunction<Double, WeightedAvgAccumulator> {
207
208
// Accumulator class
209
public static class WeightedAvgAccumulator {
210
public double sum = 0;
211
public double weightSum = 0;
212
}
213
214
@Override
215
public WeightedAvgAccumulator createAccumulator() {
216
return new WeightedAvgAccumulator();
217
}
218
219
@Override
220
public void accumulate(WeightedAvgAccumulator acc, Double value, Double weight) {
221
if (value != null && weight != null) {
222
acc.sum += value * weight;
223
acc.weightSum += weight;
224
}
225
}
226
227
@Override
228
public Double getValue(WeightedAvgAccumulator acc) {
229
if (acc.weightSum == 0) {
230
return null;
231
}
232
return acc.sum / acc.weightSum;
233
}
234
235
@Override
236
public void retract(WeightedAvgAccumulator acc, Double value, Double weight) {
237
if (value != null && weight != null) {
238
acc.sum -= value * weight;
239
acc.weightSum -= weight;
240
}
241
}
242
243
@Override
244
public void merge(WeightedAvgAccumulator acc, Iterable<WeightedAvgAccumulator> others) {
245
for (WeightedAvgAccumulator other : others) {
246
acc.sum += other.sum;
247
acc.weightSum += other.weightSum;
248
}
249
}
250
}
251
252
// Register and use aggregate function
253
tEnv.createTemporarySystemFunction("weighted_avg", WeightedAvg.class);
254
255
Table result = sales
256
.groupBy($("product_category"))
257
.select($("product_category"),
258
call("weighted_avg", $("price"), $("quantity")).as("weighted_avg_price"));
259
```
260
261
### AsyncScalarFunction
262
263
Base class for creating asynchronous scalar functions that can perform non-blocking I/O operations.
264
265
```java { .api }
266
/**
267
* Base class for asynchronous user-defined scalar functions
268
*/
269
public abstract class AsyncScalarFunction extends UserDefinedFunction {
270
/**
271
* Asynchronous evaluation method
272
* @param resultFuture CompletableFuture to complete with the result
273
* @param args Input arguments
274
*/
275
public abstract void eval(CompletableFuture<Object> resultFuture, Object... args);
276
277
/**
278
* Optional method to specify timeout for async operations
279
* @return Timeout duration in milliseconds
280
*/
281
public long getTimeout() {
282
return 60000; // Default 60 seconds
283
}
284
}
285
```
286
287
**AsyncScalarFunction Example:**
288
289
```java
290
// Async function for external service lookup
291
public class AsyncEnrichFunction extends AsyncScalarFunction {
292
private transient HttpClient httpClient;
293
294
@Override
295
public void open(FunctionContext context) throws Exception {
296
httpClient = HttpClient.newHttpClient();
297
}
298
299
public void eval(CompletableFuture<String> resultFuture, String userId) {
300
if (userId == null) {
301
resultFuture.complete(null);
302
return;
303
}
304
305
HttpRequest request = HttpRequest.newBuilder()
306
.uri(URI.create("https://api.example.com/users/" + userId))
307
.build();
308
309
httpClient.sendAsync(request, HttpResponse.BodyHandlers.ofString())
310
.thenApply(HttpResponse::body)
311
.thenAccept(resultFuture::complete)
312
.exceptionally(throwable -> {
313
resultFuture.complete(null); // Handle errors gracefully
314
return null;
315
});
316
}
317
318
@Override
319
public void close() throws Exception {
320
if (httpClient != null) {
321
// Cleanup resources
322
}
323
}
324
}
325
```
326
327
### AsyncTableFunction
328
329
Base class for creating asynchronous table functions for non-blocking one-to-many transformations.
330
331
```java { .api }
332
/**
333
* Base class for asynchronous user-defined table functions
334
* @param <T> Type of the output rows
335
*/
336
public abstract class AsyncTableFunction<T> extends UserDefinedFunction {
337
/**
338
* Asynchronous evaluation method
339
* @param resultFuture CompletableFuture to complete with collection of results
340
* @param args Input arguments
341
*/
342
public abstract void eval(CompletableFuture<Collection<T>> resultFuture, Object... args);
343
}
344
```
345
346
### ProcessTableFunction
347
348
Advanced table function for complex transformations with access to multiple input tables and state.
349
350
```java { .api }
351
/**
352
* Base class for process table functions with advanced capabilities
353
* @param <T> Type of the output rows
354
*/
355
public abstract class ProcessTableFunction<T> extends UserDefinedFunction {
356
/**
357
* Process method with access to context
358
* @param ctx Processing context with state and timer access
359
* @param args Input arguments
360
*/
361
public abstract void eval(ProcessContext ctx, Object... args) throws Exception;
362
363
/**
364
* Processing context interface
365
*/
366
public interface ProcessContext {
367
/**
368
* Get keyed state for maintaining function state
369
* @param stateDescriptor State descriptor
370
* @return State instance
371
*/
372
<S extends State> S getState(StateDescriptor<S, ?> stateDescriptor);
373
374
/**
375
* Emit an output row
376
* @param result Row to emit
377
*/
378
void collect(T result);
379
380
/**
381
* Get current processing time
382
* @return Current processing time timestamp
383
*/
384
long currentProcessingTime();
385
386
/**
387
* Get current watermark
388
* @return Current event time watermark
389
*/
390
long currentWatermark();
391
}
392
}
393
```
394
395
### Function Registration
396
397
Methods for registering user-defined functions in the TableEnvironment.
398
399
```java { .api }
400
/**
401
* Register a function class as a temporary system function
402
* @param name Function name for SQL usage
403
* @param functionClass Function implementation class
404
*/
405
public void createTemporarySystemFunction(String name, Class<? extends UserDefinedFunction> functionClass);
406
407
/**
408
* Register a function instance as a temporary system function
409
* @param name Function name for SQL usage
410
* @param functionInstance Function implementation instance
411
*/
412
public void createTemporarySystemFunction(String name, UserDefinedFunction functionInstance);
413
414
/**
415
* Register a function in a specific catalog and database
416
* @param path Full path to the function (catalog.database.function)
417
* @param functionClass Function implementation class
418
*/
419
public void createFunction(String path, Class<? extends UserDefinedFunction> functionClass);
420
421
/**
422
* Drop a temporary system function
423
* @param name Function name to drop
424
* @return true if function existed and was dropped
425
*/
426
public boolean dropTemporarySystemFunction(String name);
427
```
428
429
### Type Hints and Annotations
430
431
Annotations for providing type information to the Flink runtime.
432
433
```java { .api }
434
/**
435
* Annotation for providing function-level type hints
436
*/
437
@Target(ElementType.TYPE)
438
@Retention(RetentionPolicy.RUNTIME)
439
public @interface FunctionHint {
440
/**
441
* Hint for function input parameters
442
*/
443
DataTypeHint[] input() default {};
444
445
/**
446
* Hint for function output type
447
*/
448
DataTypeHint output() default @DataTypeHint();
449
450
/**
451
* Whether function is deterministic
452
*/
453
boolean isDeterministic() default true;
454
}
455
456
/**
457
* Annotation for providing data type hints
458
*/
459
@Target({ElementType.PARAMETER, ElementType.METHOD})
460
@Retention(RetentionPolicy.RUNTIME)
461
public @interface DataTypeHint {
462
/**
463
* Data type specification string
464
*/
465
String value() default "";
466
467
/**
468
* Whether the type is nullable
469
*/
470
DefaultBoolean allowRawGlobally() default DefaultBoolean.TRUE;
471
}
472
```
473
474
**Type Hint Examples:**
475
476
```java
477
@FunctionHint(
478
input = {@DataTypeHint("STRING"), @DataTypeHint("INT")},
479
output = @DataTypeHint("ARRAY<STRING>")
480
)
481
public class RepeatString extends ScalarFunction {
482
public String[] eval(String str, Integer count) {
483
if (str == null || count == null || count <= 0) {
484
return new String[0];
485
}
486
String[] result = new String[count];
487
Arrays.fill(result, str);
488
return result;
489
}
490
}
491
492
// Complex type hint for nested structure
493
@FunctionHint(output = @DataTypeHint("ROW<name STRING, stats ROW<count BIGINT, avg DOUBLE>>"))
494
public class AnalyzeData extends TableFunction<Row> {
495
public void eval(String data) {
496
// Implementation that emits Row objects matching the hint
497
}
498
}
499
```
500
501
### Function Lifecycle
502
503
Methods for managing function resources and state.
504
505
```java { .api }
506
/**
507
* Base class providing lifecycle methods for all user-defined functions
508
*/
509
public abstract class UserDefinedFunction {
510
/**
511
* Initialize function resources when function is first used
512
* @param context Function context with configuration and metrics
513
*/
514
public void open(FunctionContext context) throws Exception {
515
// Override to initialize resources
516
}
517
518
/**
519
* Clean up function resources when function is no longer needed
520
*/
521
public void close() throws Exception {
522
// Override to clean up resources
523
}
524
525
/**
526
* Check if function calls are deterministic
527
* @return true if function is deterministic (same input = same output)
528
*/
529
public boolean isDeterministic() {
530
return true;
531
}
532
}
533
534
/**
535
* Function context providing access to configuration and metrics
536
*/
537
public interface FunctionContext {
538
/**
539
* Get metric group for function metrics
540
* @return MetricGroup for registering custom metrics
541
*/
542
MetricGroup getMetricGroup();
543
544
/**
545
* Get function configuration
546
* @return Configuration object with job and function parameters
547
*/
548
Configuration getJobParameter();
549
}
550
```
551
552
**Function Lifecycle Example:**
553
554
```java
555
public class DatabaseLookupFunction extends AsyncScalarFunction {
556
private transient Connection connection;
557
private transient Counter lookupCounter;
558
559
@Override
560
public void open(FunctionContext context) throws Exception {
561
// Initialize database connection
562
String jdbcUrl = context.getJobParameter().getString("database.url");
563
connection = DriverManager.getConnection(jdbcUrl);
564
565
// Register metrics
566
lookupCounter = context.getMetricGroup().counter("lookup_count");
567
}
568
569
public void eval(CompletableFuture<String> resultFuture, String key) {
570
lookupCounter.inc();
571
572
CompletableFuture.supplyAsync(() -> {
573
try (PreparedStatement stmt = connection.prepareStatement("SELECT value FROM lookup WHERE key = ?")) {
574
stmt.setString(1, key);
575
ResultSet rs = stmt.executeQuery();
576
return rs.next() ? rs.getString("value") : null;
577
} catch (SQLException e) {
578
throw new RuntimeException(e);
579
}
580
}).thenAccept(resultFuture::complete);
581
}
582
583
@Override
584
public void close() throws Exception {
585
if (connection != null) {
586
connection.close();
587
}
588
}
589
}
590
```