Java API for Apache Flink's Table ecosystem, enabling type-safe table operations and SQL query execution.
—
Flink Table API supports custom functions to extend the built-in function library. You can create scalar functions, table functions, aggregate functions, and table aggregate functions to implement domain-specific logic.
User-defined scalar functions take zero, one, or multiple scalar values and return a single scalar value.
/**
* Base class for scalar user-defined functions
* Users must extend this class and implement eval() methods
*/
public abstract class ScalarFunction extends UserDefinedFunction {
/**
* Users implement one or more eval methods with different signatures
* The eval method name is fixed - Flink uses reflection to find matching methods
*
* Example signatures:
* public String eval(String input);
* public Integer eval(Integer a, Integer b);
* public Double eval(Double... values);
*/
}Usage Examples:
// Custom string manipulation function
public class StringHashFunction extends ScalarFunction {
public String eval(String input) {
if (input == null) {
return null;
}
return "hash_" + Math.abs(input.hashCode());
}
public String eval(String input, String prefix) {
if (input == null) {
return null;
}
return prefix + "_" + Math.abs(input.hashCode());
}
}
// Register and use the function
StringHashFunction hashFunc = new StringHashFunction();
tableEnv.createTemporaryFunction("string_hash", hashFunc);
// Use in Table API
Table result = sourceTable.select(
$("id"),
$("name"),
call("string_hash", $("name")).as("name_hash"),
call("string_hash", $("name"), lit("user")).as("prefixed_hash")
);
// Use in SQL
Table sqlResult = tableEnv.sqlQuery(
"SELECT id, name, string_hash(name) as name_hash " +
"FROM source_table"
);
// Mathematical function example
public class PowerFunction extends ScalarFunction {
public Double eval(Double base, Double exponent) {
if (base == null || exponent == null) {
return null;
}
return Math.pow(base, exponent);
}
public Long eval(Long base, Long exponent) {
if (base == null || exponent == null) {
return null;
}
return (long) Math.pow(base, exponent);
}
}User-defined table functions take zero, one, or multiple scalar values and return multiple rows (table).
/**
* Base class for table user-defined functions
* Users must extend this class and implement eval() methods
* Use collect() to emit output rows
*/
public abstract class TableFunction<T> extends UserDefinedFunction {
/**
* Emits a result row to the output table
* @param result Row data to emit
*/
protected void collect(T result);
/**
* Users implement eval methods that call collect() for each output row
* The eval method name is fixed - Flink uses reflection to find matching methods
*/
}Usage Examples:
// Split string into multiple rows
public class SplitFunction extends TableFunction<Row> {
public void eval(String str, String separator) {
if (str == null || separator == null) {
return;
}
String[] parts = str.split(separator);
for (int i = 0; i < parts.length; i++) {
collect(Row.of(parts[i].trim(), i));
}
}
}
// Register and use table function
SplitFunction splitFunc = new SplitFunction();
tableEnv.createTemporaryFunction("split_string", splitFunc);
// Use with LATERAL TABLE in SQL
Table result = tableEnv.sqlQuery(
"SELECT t.id, t.name, s.word, s.position " +
"FROM source_table t, " +
"LATERAL TABLE(split_string(t.tags, ',')) AS s(word, position)"
);
// Use in Table API with joinLateral
Table lateralResult = sourceTable
.joinLateral(call("split_string", $("tags"), lit(",")))
.select($("id"), $("name"), $("f0").as("word"), $("f1").as("position"));
// Generate series function
public class GenerateSeriesFunction extends TableFunction<Integer> {
public void eval(Integer start, Integer end) {
if (start == null || end == null) {
return;
}
for (int i = start; i <= end; i++) {
collect(i);
}
}
public void eval(Integer start, Integer end, Integer step) {
if (start == null || end == null || step == null || step == 0) {
return;
}
if (step > 0) {
for (int i = start; i <= end; i += step) {
collect(i);
}
} else {
for (int i = start; i >= end; i += step) {
collect(i);
}
}
}
}User-defined aggregate functions take multiple rows and compute a single aggregate result.
/**
* Base class for aggregate user-defined functions
* Users must implement accumulator management methods
*/
public abstract class AggregateFunction<T, ACC> extends UserDefinedFunction {
/**
* Creates a new accumulator for aggregation
* @return New accumulator instance
*/
public abstract ACC createAccumulator();
/**
* Accumulates input values into the accumulator
* @param accumulator Current accumulator state
* @param input Input values to accumulate (one or more parameters)
*/
public abstract void accumulate(ACC accumulator, Object... input);
/**
* Extracts the final result from the accumulator
* @param accumulator Final accumulator state
* @return Aggregate result
*/
public abstract T getValue(ACC accumulator);
/**
* Retracts input values from the accumulator (optional)
* Only needed for streaming scenarios with retractions
* @param accumulator Current accumulator state
* @param input Input values to retract
*/
public void retract(ACC accumulator, Object... input) {
// Optional - implement if retraction is needed
}
/**
* Merges two accumulators (optional)
* Needed for session windows and some optimization scenarios
* @param accumulator Target accumulator
* @param other Source accumulator to merge from
*/
public void merge(ACC accumulator, Iterable<ACC> other) {
// Optional - implement if merging is needed
}
}Usage Examples:
// Custom average function with accumulator
public class WeightedAverageFunction extends AggregateFunction<Double, WeightedAverageAccumulator> {
// Accumulator class
public static class WeightedAverageAccumulator {
public double sum = 0.0;
public double weightSum = 0.0;
}
@Override
public WeightedAverageAccumulator createAccumulator() {
return new WeightedAverageAccumulator();
}
@Override
public void accumulate(WeightedAverageAccumulator acc, Double value, Double weight) {
if (value != null && weight != null) {
acc.sum += value * weight;
acc.weightSum += weight;
}
}
@Override
public Double getValue(WeightedAverageAccumulator acc) {
if (acc.weightSum == 0.0) {
return null;
}
return acc.sum / acc.weightSum;
}
@Override
public void retract(WeightedAverageAccumulator acc, Double value, Double weight) {
if (value != null && weight != null) {
acc.sum -= value * weight;
acc.weightSum -= weight;
}
}
@Override
public void merge(WeightedAverageAccumulator acc, Iterable<WeightedAverageAccumulator> others) {
for (WeightedAverageAccumulator other : others) {
acc.sum += other.sum;
acc.weightSum += other.weightSum;
}
}
}
// Register and use aggregate function
WeightedAverageFunction weightedAvg = new WeightedAverageFunction();
tableEnv.createTemporaryFunction("weighted_avg", weightedAvg);
// Use in Table API
Table result = sourceTable
.groupBy($("category"))
.select(
$("category"),
call("weighted_avg", $("price"), $("quantity")).as("weighted_avg_price")
);
// Use in SQL
Table sqlResult = tableEnv.sqlQuery(
"SELECT category, weighted_avg(price, quantity) as weighted_avg_price " +
"FROM source_table " +
"GROUP BY category"
);
// Custom string concatenation aggregate
public class StringConcatFunction extends AggregateFunction<String, StringBuilder> {
@Override
public StringBuilder createAccumulator() {
return new StringBuilder();
}
@Override
public void accumulate(StringBuilder acc, String value, String separator) {
if (value != null) {
if (acc.length() > 0 && separator != null) {
acc.append(separator);
}
acc.append(value);
}
}
@Override
public String getValue(StringBuilder acc) {
return acc.toString();
}
}Table aggregate functions take multiple rows and return multiple rows (like table functions but with aggregation semantics).
/**
* Base class for table aggregate functions
* Combines aspects of both table functions and aggregate functions
*/
public abstract class TableAggregateFunction<T, ACC> extends UserDefinedFunction {
/**
* Creates a new accumulator
* @return New accumulator instance
*/
public abstract ACC createAccumulator();
/**
* Accumulates input values
* @param accumulator Current accumulator state
* @param input Input values
*/
public abstract void accumulate(ACC accumulator, Object... input);
/**
* Emits result rows from the final accumulator state
* @param accumulator Final accumulator state
* @param out Collector for emitting output rows
*/
public abstract void emitValue(ACC accumulator, Collector<T> out);
/**
* Emits updated result rows during streaming processing
* @param accumulator Current accumulator state
* @param out Collector for emitting output rows
*/
public void emitUpdateWithRetract(ACC accumulator, RetractableCollector<T> out) {
// Optional - implement for streaming scenarios with retractions
}
}Usage Examples:
// Top-N function returning multiple rows per group
public class TopNFunction extends TableAggregateFunction<Row, TopNAccumulator> {
private int n;
public TopNFunction(int n) {
this.n = n;
}
public static class TopNAccumulator {
public List<Double> topValues = new ArrayList<>();
}
@Override
public TopNAccumulator createAccumulator() {
return new TopNAccumulator();
}
@Override
public void accumulate(TopNAccumulator acc, Double value) {
if (value != null) {
acc.topValues.add(value);
acc.topValues.sort((a, b) -> Double.compare(b, a)); // Descending order
// Keep only top N values
if (acc.topValues.size() > n) {
acc.topValues = acc.topValues.subList(0, n);
}
}
}
@Override
public void emitValue(TopNAccumulator acc, Collector<Row> out) {
for (int i = 0; i < acc.topValues.size(); i++) {
out.collect(Row.of(acc.topValues.get(i), i + 1));
}
}
}
// Register and use table aggregate function
TopNFunction topN = new TopNFunction(3);
tableEnv.createTemporaryFunction("top_n", topN);
// Use in Table API
Table topResults = sourceTable
.groupBy($("category"))
.flatAggregate(call("top_n", $("score")))
.select($("category"), $("f0").as("score"), $("f1").as("rank"));Various ways to register functions in the table environment.
/**
* Register a function instance with the given name
* @param name Function name for SQL and Table API usage
* @param function Function instance
*/
public void createTemporaryFunction(String name, UserDefinedFunction function);
/**
* Register a function class by name
* @param name Function name
* @param functionClass Function class
*/
public void createTemporarySystemFunction(String name, Class<? extends UserDefinedFunction> functionClass);
/**
* Register function in specific catalog and database
* @param path Catalog path (catalog.database.function)
* @param function Function instance
*/
public void createFunction(String path, UserDefinedFunction function);
/**
* Drop a temporary function
* @param name Function name to drop
* @return true if function was dropped
*/
public boolean dropTemporaryFunction(String name);Usage Examples:
// Register with instance
MyCustomFunction customFunc = new MyCustomFunction();
tableEnv.createTemporaryFunction("my_func", customFunc);
// Register with class
tableEnv.createTemporarySystemFunction("power_func", PowerFunction.class);
// Register in specific catalog
tableEnv.createFunction("my_catalog.my_db.custom_func", customFunc);
// Register via SQL DDL
tableEnv.executeSql(
"CREATE TEMPORARY FUNCTION my_hash AS 'com.example.StringHashFunction'"
);
// Drop function
boolean dropped = tableEnv.dropTemporaryFunction("my_func");
// Drop via SQL
tableEnv.executeSql("DROP TEMPORARY FUNCTION my_hash");Advanced patterns for function development and optimization.
/**
* Function with type inference - override getResultType for complex return types
*/
public abstract class UserDefinedFunction {
/**
* Provides type information for the function result
* @param signature Method signature information
* @return TypeInformation for the result type
*/
public TypeInformation<?> getResultType(Class<?>[] signature) {
// Override to provide custom type information
return null;
}
/**
* Provides parameter type information
* @param signature Method signature information
* @return Array of TypeInformation for parameters
*/
public TypeInformation<?>[] getParameterTypes(Class<?>[] signature) {
// Override to provide custom parameter type information
return null;
}
}Usage Examples:
// Function with custom type handling
public class ComplexReturnFunction extends ScalarFunction {
// Return a complex type (Row with multiple fields)
public Row eval(String input) {
if (input == null) {
return null;
}
String[] parts = input.split(":");
return Row.of(parts[0], Integer.parseInt(parts[1]), Double.parseDouble(parts[2]));
}
@Override
public TypeInformation<?> getResultType(Class<?>[] signature) {
return Types.ROW(
Types.STRING, // field 0
Types.INT, // field 1
Types.DOUBLE // field 2
);
}
}
// Function with configuration
public class ConfigurableFunction extends ScalarFunction {
private String prefix;
public ConfigurableFunction(String prefix) {
this.prefix = prefix;
}
public String eval(String input) {
return prefix + "_" + input;
}
}
// Stateful function with open/close lifecycle
public class ResourceFunction extends ScalarFunction {
private transient SomeResource resource;
@Override
public void open(FunctionContext context) throws Exception {
super.open(context);
// Initialize resources (database connections, etc.)
this.resource = new SomeResource();
}
@Override
public void close() throws Exception {
super.close();
// Clean up resources
if (resource != null) {
resource.close();
}
}
public String eval(String input) {
return resource.process(input);
}
}Guidelines for efficient function implementation.
Usage Examples:
// Efficient function with null handling
public class EfficientStringFunction extends ScalarFunction {
// Handle null inputs early
public String eval(String input) {
if (input == null) {
return null;
}
// Avoid creating unnecessary objects
if (input.isEmpty()) {
return "";
}
// Use StringBuilder for string concatenation
StringBuilder sb = new StringBuilder(input.length() + 10);
sb.append("processed_").append(input);
return sb.toString();
}
// Provide overloaded methods for different input types
public String eval(Integer input) {
if (input == null) {
return null;
}
return "processed_" + input;
}
}
// Reusable accumulator for better performance
public class EfficientAggregateFunction extends AggregateFunction<Double, EfficientAggregateFunction.Acc> {
public static class Acc {
public double sum = 0.0;
public long count = 0L;
// Reset for reuse
public void reset() {
sum = 0.0;
count = 0L;
}
}
@Override
public Acc createAccumulator() {
return new Acc();
}
@Override
public void accumulate(Acc acc, Double value) {
if (value != null) {
acc.sum += value;
acc.count++;
}
}
@Override
public Double getValue(Acc acc) {
return acc.count > 0 ? acc.sum / acc.count : null;
}
}Install with Tessl CLI
npx tessl i tessl/maven-org-apache-flink--flink-table-api-java