Apache Flink's Table API and SQL module for unified stream and batch processing
—
Flink's Table API provides a comprehensive framework for creating custom scalar, table, and aggregate functions. UDFs enable extending the built-in function library with domain-specific logic while maintaining type safety and performance optimization.
Functions that take one or more input values and return a single output value.
/**
* Base class for user-defined scalar functions
* Users extend this class and implement eval() methods
*/
abstract class ScalarFunction extends UserDefinedFunction {
// Users implement one or more eval() methods with different signatures
// public ReturnType eval(InputType1 input1, InputType2 input2, ...);
/**
* Gets the function context for accessing runtime information
* @return FunctionContext providing runtime context
*/
FunctionContext getFunctionContext();
}
/**
* Base class for all user-defined functions
*/
abstract class UserDefinedFunction implements FunctionDefinition {
/**
* Optional method called when function is opened
* @param context Function context for initialization
*/
void open(FunctionContext context) throws Exception;
/**
* Optional method called when function is closed
*/
void close() throws Exception;
/**
* Indicates whether the function is deterministic
* @return true if function always returns same result for same inputs
*/
boolean isDeterministic();
/**
* Gets the type inference for this function
* @return TypeInference defining input/output types
*/
TypeInference getTypeInference();
}Usage Examples:
// Simple scalar function
public class UpperCaseFunction extends ScalarFunction {
public String eval(String input) {
return input != null ? input.toUpperCase() : null;
}
}
// Multiple eval signatures for overloading
public class AddFunction extends ScalarFunction {
public Integer eval(Integer a, Integer b) {
return (a != null && b != null) ? a + b : null;
}
public Double eval(Double a, Double b) {
return (a != null && b != null) ? a + b : null;
}
public String eval(String a, String b) {
return (a != null && b != null) ? a + b : null;
}
}
// Function with context and lifecycle
public class HashFunction extends ScalarFunction {
private MessageDigest md5;
@Override
public void open(FunctionContext context) throws Exception {
md5 = MessageDigest.getInstance("MD5");
}
public String eval(String input) {
if (input == null) return null;
byte[] hash = md5.digest(input.getBytes());
return DatatypeConverter.printHexBinary(hash);
}
}
// Registration and usage
tableEnv.createTemporarySystemFunction("my_upper", new UpperCaseFunction());
Table result = tableEnv.sqlQuery("SELECT my_upper(name) FROM users");Functions that take zero or more input values and return multiple rows (table-valued functions).
/**
* Base class for user-defined table functions
* @param <T> Type of output rows
*/
abstract class TableFunction<T> extends UserDefinedFunction {
// Users implement one or more eval() methods that call collect()
// public void eval(InputType1 input1, InputType2 input2, ...);
/**
* Emits a result row from the table function
* @param result Result row to emit
*/
protected void collect(T result);
/**
* Gets the result type of this table function
* @return DataType representing the output row structure
*/
DataType getResultType();
}Usage Examples:
// Split string into multiple rows
@FunctionHint(output = @DataTypeHint("ROW<word STRING>"))
public class SplitFunction extends TableFunction<Row> {
public void eval(String str) {
if (str != null) {
for (String word : str.split("\\s+")) {
collect(Row.of(word));
}
}
}
}
// Generate number sequence
@FunctionHint(output = @DataTypeHint("ROW<num INT>"))
public class RangeFunction extends TableFunction<Row> {
public void eval(Integer start, Integer end) {
if (start != null && end != null) {
for (int i = start; i <= end; i++) {
collect(Row.of(i));
}
}
}
}
// Registration and usage
tableEnv.createTemporarySystemFunction("split_words", new SplitFunction());
// SQL usage with LATERAL TABLE
Table result = tableEnv.sqlQuery(
"SELECT t.word, COUNT(*) as word_count " +
"FROM documents d, LATERAL TABLE(split_words(d.content)) AS t(word) " +
"GROUP BY t.word"
);
// Table API usage
Table documents = tableEnv.from("documents");
Table words = documents
.joinLateral(call("split_words", $("content")).as("word"))
.select($("doc_id"), $("word"));Functions that aggregate multiple input rows into a single output value.
/**
* Base class for user-defined aggregate functions
* @param <T> Type of the final result
* @param <ACC> Type of the accumulator
*/
abstract class AggregateFunction<T, ACC> extends ImperativeAggregateFunction<T, ACC> {
/**
* Creates a new accumulator for aggregation
* @return New accumulator instance
*/
public abstract ACC createAccumulator();
/**
* Extracts the final result from the accumulator
* @param accumulator Final accumulator state
* @return Aggregation result
*/
public abstract T getValue(ACC accumulator);
// Users implement accumulate() method(s)
// public void accumulate(ACC accumulator, InputType1 input1, InputType2 input2, ...);
/**
* Optional: Retracts a value from the accumulator (for changelog streams)
* @param accumulator Accumulator to retract from
* @param input Input values to retract
*/
// public void retract(ACC accumulator, InputType1 input1, InputType2 input2, ...);
/**
* Optional: Merges two accumulators (for batch processing)
* @param accumulator Target accumulator
* @param accumulators Source accumulators to merge
*/
// public void merge(ACC accumulator, Iterable<ACC> accumulators);
}Usage Examples:
// Weighted average aggregate function
public class WeightedAvgAccumulator {
public double sum = 0.0;
public double weightSum = 0.0;
}
public class WeightedAverage extends AggregateFunction<Double, WeightedAvgAccumulator> {
@Override
public WeightedAvgAccumulator createAccumulator() {
return new WeightedAvgAccumulator();
}
public void accumulate(WeightedAvgAccumulator acc, Double value, Double weight) {
if (value != null && weight != null) {
acc.sum += value * weight;
acc.weightSum += weight;
}
}
@Override
public Double getValue(WeightedAvgAccumulator acc) {
return acc.weightSum != 0 ? acc.sum / acc.weightSum : null;
}
public void retract(WeightedAvgAccumulator acc, Double value, Double weight) {
if (value != null && weight != null) {
acc.sum -= value * weight;
acc.weightSum -= weight;
}
}
public void merge(WeightedAvgAccumulator acc, Iterable<WeightedAvgAccumulator> others) {
for (WeightedAvgAccumulator other : others) {
acc.sum += other.sum;
acc.weightSum += other.weightSum;
}
}
}
// Registration and usage
tableEnv.createTemporarySystemFunction("weighted_avg", new WeightedAverage());
Table result = tableEnv.sqlQuery(
"SELECT product_category, weighted_avg(price, quantity) as avg_price " +
"FROM sales " +
"GROUP BY product_category"
);Functions that aggregate multiple input rows into multiple output rows.
/**
* Base class for user-defined table aggregate functions
* @param <T> Type of the output rows
* @param <ACC> Type of the accumulator
*/
abstract class TableAggregateFunction<T, ACC> extends ImperativeAggregateFunction<T, ACC> {
/**
* Creates a new accumulator for aggregation
* @return New accumulator instance
*/
public abstract ACC createAccumulator();
// Users implement accumulate() method(s)
// public void accumulate(ACC accumulator, InputType1 input1, InputType2 input2, ...);
/**
* Emits the final result from the accumulator
* @param accumulator Final accumulator state
* @param out Collector for emitting results
*/
public abstract void emitValue(ACC accumulator, Collector<T> out);
/**
* Optional: Emits incremental updates with retraction
* @param accumulator Current accumulator state
* @param out Collector for emitting results with retraction
*/
// public void emitUpdateWithRetract(ACC accumulator, RetractableCollector<T> out);
}Usage Examples:
// Top N aggregate function
public class TopNAccumulator {
public List<Tuple2<Integer, String>> topN = new ArrayList<>();
public int n;
}
public class TopN extends TableAggregateFunction<Tuple2<Integer, String>, TopNAccumulator> {
private int n;
public TopN(int n) {
this.n = n;
}
@Override
public TopNAccumulator createAccumulator() {
TopNAccumulator acc = new TopNAccumulator();
acc.n = n;
return acc;
}
public void accumulate(TopNAccumulator acc, Integer score, String name) {
if (score != null && name != null) {
acc.topN.add(Tuple2.of(score, name));
acc.topN.sort((a, b) -> b.f0.compareTo(a.f0)); // Sort descending
if (acc.topN.size() > acc.n) {
acc.topN.remove(acc.topN.size() - 1);
}
}
}
@Override
public void emitValue(TopNAccumulator acc, Collector<Tuple2<Integer, String>> out) {
for (Tuple2<Integer, String> item : acc.topN) {
out.collect(item);
}
}
}
// Registration and usage
tableEnv.createTemporarySystemFunction("top3", new TopN(3));
Table result = tableEnv.sqlQuery(
"SELECT score, name " +
"FROM (SELECT score, name FROM players) " +
"GROUP BY () " +
"FLAT_AGGREGATE(top3(score, name)) AS (score, name)"
);Functions that perform asynchronous operations for I/O bound tasks.
/**
* Async scalar function for I/O bound operations
*/
class AsyncScalarFunction extends UserDefinedFunction {
// Users implement evalAsync() methods that return CompletableFuture
// public CompletableFuture<ReturnType> evalAsync(InputType1 input1, InputType2 input2, ...);
}
/**
* Base class for async table functions
* @param <T> Type of output rows
*/
abstract class AsyncTableFunction<T> extends UserDefinedFunction {
// Users implement evalAsync() methods that use AsyncCollector
// public void evalAsync(InputType1 input1, AsyncCollector<T> collector);
}Usage Examples:
// Async HTTP lookup function
public class HttpLookupFunction extends AsyncScalarFunction {
private transient AsyncHttpClient httpClient;
@Override
public void open(FunctionContext context) throws Exception {
httpClient = Dsl.asyncHttpClient();
}
public CompletableFuture<String> evalAsync(String url) {
if (url == null) {
return CompletableFuture.completedFuture(null);
}
return httpClient
.prepareGet(url)
.execute()
.toCompletableFuture()
.thenApply(response -> response.getResponseBody());
}
@Override
public void close() throws Exception {
if (httpClient != null) {
httpClient.close();
}
}
}
// Registration and usage
tableEnv.createTemporarySystemFunction("http_get", new HttpLookupFunction());Advanced type inference for complex function signatures.
/**
* Annotation for providing type hints to functions
*/
@Target({ElementType.TYPE, ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
@interface FunctionHint {
DataTypeHint[] input() default {};
DataTypeHint output() default @DataTypeHint();
boolean isDeterministic() default true;
}
/**
* Annotation for specifying data types
*/
@Target({ElementType.TYPE, ElementType.METHOD, ElementType.FIELD})
@Retention(RetentionPolicy.RUNTIME)
@interface DataTypeHint {
String value() default "";
Class<?> bridgedTo() default void.class;
boolean allowRawGlobally() default false;
}Usage Examples:
// Function with explicit type hints
@FunctionHint(
input = {@DataTypeHint("STRING"), @DataTypeHint("INT")},
output = @DataTypeHint("ARRAY<STRING>")
)
public class RepeatFunction extends TableFunction<String[]> {
public void eval(String str, Integer count) {
if (str != null && count != null && count > 0) {
String[] result = new String[count];
Arrays.fill(result, str);
collect(result);
}
}
}
// Complex return type with row structure
@FunctionHint(output = @DataTypeHint("ROW<id BIGINT, name STRING, score DOUBLE>"))
public class ParseResultFunction extends TableFunction<Row> {
public void eval(String jsonString) {
// Parse JSON and emit structured rows
JsonObject json = JsonParser.parseString(jsonString).getAsJsonObject();
collect(Row.of(
json.get("id").getAsLong(),
json.get("name").getAsString(),
json.get("score").getAsDouble()
));
}
}abstract class UserDefinedFunction implements FunctionDefinition {
void open(FunctionContext context) throws Exception;
void close() throws Exception;
boolean isDeterministic();
TypeInference getTypeInference();
}
abstract class ScalarFunction extends UserDefinedFunction {
// Implementation-specific eval() methods
}
abstract class TableFunction<T> extends UserDefinedFunction {
protected void collect(T result);
DataType getResultType();
}
abstract class AggregateFunction<T, ACC> extends ImperativeAggregateFunction<T, ACC> {
public abstract ACC createAccumulator();
public abstract T getValue(ACC accumulator);
// Optional: public void retract(ACC accumulator, ...);
// Optional: public void merge(ACC accumulator, Iterable<ACC> others);
}
abstract class TableAggregateFunction<T, ACC> extends ImperativeAggregateFunction<T, ACC> {
public abstract ACC createAccumulator();
public abstract void emitValue(ACC accumulator, Collector<T> out);
// Optional: public void emitUpdateWithRetract(ACC accumulator, RetractableCollector<T> out);
}interface FunctionContext {
MetricGroup getMetricGroup();
int getIndexOfThisSubtask();
int getNumberOfParallelSubtasks();
String getJobParameter(String key, String defaultValue);
// Access to distributed cache
File getCachedFile(String name);
}interface Collector<T> {
void collect(T record);
}
interface RetractableCollector<T> extends Collector<T> {
void retract(T record);
}
interface AsyncCollector<T> {
void collect(Collection<T> result);
void complete(Collection<T> result);
}Install with Tessl CLI
npx tessl i tessl/maven-org-apache-flink--flink-table