Comprehensive uber JAR that consolidates all Java APIs for Apache Flink's Table/SQL ecosystem, enabling developers to write table programs and integrate with other Flink APIs through a single dependency.
—
Framework for extending Apache Flink with custom scalar functions, table functions, and aggregate functions for specialized processing requirements.
Base class for creating custom scalar functions that take one or more input parameters and return a single result value.
/**
* Base class for user-defined scalar functions
*/
public abstract class ScalarFunction extends UserDefinedFunction {
/**
* Evaluation method that must be implemented for the function logic.
* Method can be overloaded for different parameter combinations.
* @param args Input arguments (types must match function signature)
* @return Computed result value
*/
public abstract Object eval(Object... args);
/**
* Optional method to specify the result type when it cannot be inferred
* @param signature Array of argument types
* @return DataType of the function result
*/
public DataType getResultType(DataType[] signature);
/**
* Optional method to specify type inference for parameters
* @return TypeInference specification
*/
public TypeInference getTypeInference();
}ScalarFunction Example:
// Custom hash function
public class HashFunction extends ScalarFunction {
public String eval(String input) {
if (input == null) {
return null;
}
return Integer.toHexString(input.hashCode());
}
// Overloaded version for multiple inputs
public String eval(String input1, String input2) {
if (input1 == null || input2 == null) {
return null;
}
return Integer.toHexString((input1 + input2).hashCode());
}
}
// Register and use the function
tEnv.createTemporarySystemFunction("hash", HashFunction.class);
Table result = orders
.select($("order_id"),
call("hash", $("customer_email")).as("customer_hash"),
call("hash", $("order_id").cast(DataTypes.STRING()),
$("customer_email")).as("order_hash"));Base class for creating custom table functions that take one or more input parameters and return multiple rows (one-to-many transformation).
/**
* Base class for user-defined table functions
* @param <T> Type of the output rows
*/
public abstract class TableFunction<T> extends UserDefinedFunction {
/**
* Evaluation method that must be implemented for the function logic.
* Use collect() method to emit output rows.
* @param args Input arguments (types must match function signature)
*/
public abstract void eval(Object... args);
/**
* Emit an output row from the table function
* @param result Row to emit
*/
protected void collect(T result);
/**
* Optional method to specify the result type when it cannot be inferred
* @param signature Array of argument types
* @return DataType of the function result rows
*/
public DataType getResultType(DataType[] signature);
}TableFunction Example:
// Function to split comma-separated values into rows
@FunctionHint(output = @DataTypeHint("STRING"))
public class SplitFunction extends TableFunction<String> {
public void eval(String str) {
if (str != null) {
for (String s : str.split(",")) {
collect(s.trim());
}
}
}
}
// Function returning structured rows
@FunctionHint(output = @DataTypeHint("ROW<word STRING, length INT>"))
public class WordAnalyzer extends TableFunction<Row> {
public void eval(String sentence) {
if (sentence != null) {
for (String word : sentence.split("\\s+")) {
collect(Row.of(word, word.length()));
}
}
}
}
// Register and use table functions
tEnv.createTemporarySystemFunction("split", SplitFunction.class);
tEnv.createTemporarySystemFunction("analyze_words", WordAnalyzer.class);
// Use with LATERAL JOIN
Table result = orders
.joinLateral(call("split", $("product_tags")).as("tag"))
.select($("order_id"), $("tag"));
// Use with LEFT JOIN LATERAL for optional results
Table analysis = documents
.leftOuterJoinLateral(call("analyze_words", $("title")).as("word", "length"))
.select($("document_id"), $("word"), $("length"));Base class for creating custom aggregate functions that accumulate values over multiple rows and return a single result.
/**
* Base class for user-defined aggregate functions
* @param <T> Type of the aggregation result
* @param <ACC> Type of the accumulator used during aggregation
*/
public abstract class AggregateFunction<T, ACC> extends UserDefinedFunction {
/**
* Create and initialize a new accumulator
* @return New accumulator instance
*/
public abstract ACC createAccumulator();
/**
* Accumulate input values into the accumulator
* @param accumulator Current accumulator state
* @param args Input values to accumulate
*/
public abstract void accumulate(ACC accumulator, Object... args);
/**
* Extract the final result from the accumulator
* @param accumulator Final accumulator state
* @return Aggregated result
*/
public abstract T getValue(ACC accumulator);
/**
* Retract input values from the accumulator (for streaming updates)
* @param accumulator Current accumulator state
* @param args Input values to retract
*/
public void retract(ACC accumulator, Object... args) {
// Optional: implement for streaming scenarios with retractions
}
/**
* Merge two accumulators (for distributed aggregation)
* @param accumulator Target accumulator
* @param others Accumulators to merge from
*/
public void merge(ACC accumulator, Iterable<ACC> others) {
// Optional: implement for distributed scenarios
}
/**
* Reset the accumulator to initial state
* @param accumulator Accumulator to reset
*/
public void resetAccumulator(ACC accumulator) {
// Optional: implement for reusing accumulators
}
}AggregateFunction Example:
// Custom weighted average function
public class WeightedAvg extends AggregateFunction<Double, WeightedAvgAccumulator> {
// Accumulator class
public static class WeightedAvgAccumulator {
public double sum = 0;
public double weightSum = 0;
}
@Override
public WeightedAvgAccumulator createAccumulator() {
return new WeightedAvgAccumulator();
}
@Override
public void accumulate(WeightedAvgAccumulator acc, Double value, Double weight) {
if (value != null && weight != null) {
acc.sum += value * weight;
acc.weightSum += weight;
}
}
@Override
public Double getValue(WeightedAvgAccumulator acc) {
if (acc.weightSum == 0) {
return null;
}
return acc.sum / acc.weightSum;
}
@Override
public void retract(WeightedAvgAccumulator acc, Double value, Double weight) {
if (value != null && weight != null) {
acc.sum -= value * weight;
acc.weightSum -= weight;
}
}
@Override
public void merge(WeightedAvgAccumulator acc, Iterable<WeightedAvgAccumulator> others) {
for (WeightedAvgAccumulator other : others) {
acc.sum += other.sum;
acc.weightSum += other.weightSum;
}
}
}
// Register and use aggregate function
tEnv.createTemporarySystemFunction("weighted_avg", WeightedAvg.class);
Table result = sales
.groupBy($("product_category"))
.select($("product_category"),
call("weighted_avg", $("price"), $("quantity")).as("weighted_avg_price"));Base class for creating asynchronous scalar functions that can perform non-blocking I/O operations.
/**
* Base class for asynchronous user-defined scalar functions
*/
public abstract class AsyncScalarFunction extends UserDefinedFunction {
/**
* Asynchronous evaluation method
* @param resultFuture CompletableFuture to complete with the result
* @param args Input arguments
*/
public abstract void eval(CompletableFuture<Object> resultFuture, Object... args);
/**
* Optional method to specify timeout for async operations
* @return Timeout duration in milliseconds
*/
public long getTimeout() {
return 60000; // Default 60 seconds
}
}AsyncScalarFunction Example:
// Async function for external service lookup
public class AsyncEnrichFunction extends AsyncScalarFunction {
private transient HttpClient httpClient;
@Override
public void open(FunctionContext context) throws Exception {
httpClient = HttpClient.newHttpClient();
}
public void eval(CompletableFuture<String> resultFuture, String userId) {
if (userId == null) {
resultFuture.complete(null);
return;
}
HttpRequest request = HttpRequest.newBuilder()
.uri(URI.create("https://api.example.com/users/" + userId))
.build();
httpClient.sendAsync(request, HttpResponse.BodyHandlers.ofString())
.thenApply(HttpResponse::body)
.thenAccept(resultFuture::complete)
.exceptionally(throwable -> {
resultFuture.complete(null); // Handle errors gracefully
return null;
});
}
@Override
public void close() throws Exception {
if (httpClient != null) {
// Cleanup resources
}
}
}Base class for creating asynchronous table functions for non-blocking one-to-many transformations.
/**
* Base class for asynchronous user-defined table functions
* @param <T> Type of the output rows
*/
public abstract class AsyncTableFunction<T> extends UserDefinedFunction {
/**
* Asynchronous evaluation method
* @param resultFuture CompletableFuture to complete with collection of results
* @param args Input arguments
*/
public abstract void eval(CompletableFuture<Collection<T>> resultFuture, Object... args);
}Advanced table function for complex transformations with access to multiple input tables and state.
/**
* Base class for process table functions with advanced capabilities
* @param <T> Type of the output rows
*/
public abstract class ProcessTableFunction<T> extends UserDefinedFunction {
/**
* Process method with access to context
* @param ctx Processing context with state and timer access
* @param args Input arguments
*/
public abstract void eval(ProcessContext ctx, Object... args) throws Exception;
/**
* Processing context interface
*/
public interface ProcessContext {
/**
* Get keyed state for maintaining function state
* @param stateDescriptor State descriptor
* @return State instance
*/
<S extends State> S getState(StateDescriptor<S, ?> stateDescriptor);
/**
* Emit an output row
* @param result Row to emit
*/
void collect(T result);
/**
* Get current processing time
* @return Current processing time timestamp
*/
long currentProcessingTime();
/**
* Get current watermark
* @return Current event time watermark
*/
long currentWatermark();
}
}Methods for registering user-defined functions in the TableEnvironment.
/**
* Register a function class as a temporary system function
* @param name Function name for SQL usage
* @param functionClass Function implementation class
*/
public void createTemporarySystemFunction(String name, Class<? extends UserDefinedFunction> functionClass);
/**
* Register a function instance as a temporary system function
* @param name Function name for SQL usage
* @param functionInstance Function implementation instance
*/
public void createTemporarySystemFunction(String name, UserDefinedFunction functionInstance);
/**
* Register a function in a specific catalog and database
* @param path Full path to the function (catalog.database.function)
* @param functionClass Function implementation class
*/
public void createFunction(String path, Class<? extends UserDefinedFunction> functionClass);
/**
* Drop a temporary system function
* @param name Function name to drop
* @return true if function existed and was dropped
*/
public boolean dropTemporarySystemFunction(String name);Annotations for providing type information to the Flink runtime.
/**
* Annotation for providing function-level type hints
*/
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
public @interface FunctionHint {
/**
* Hint for function input parameters
*/
DataTypeHint[] input() default {};
/**
* Hint for function output type
*/
DataTypeHint output() default @DataTypeHint();
/**
* Whether function is deterministic
*/
boolean isDeterministic() default true;
}
/**
* Annotation for providing data type hints
*/
@Target({ElementType.PARAMETER, ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
public @interface DataTypeHint {
/**
* Data type specification string
*/
String value() default "";
/**
* Whether the type is nullable
*/
DefaultBoolean allowRawGlobally() default DefaultBoolean.TRUE;
}Type Hint Examples:
@FunctionHint(
input = {@DataTypeHint("STRING"), @DataTypeHint("INT")},
output = @DataTypeHint("ARRAY<STRING>")
)
public class RepeatString extends ScalarFunction {
public String[] eval(String str, Integer count) {
if (str == null || count == null || count <= 0) {
return new String[0];
}
String[] result = new String[count];
Arrays.fill(result, str);
return result;
}
}
// Complex type hint for nested structure
@FunctionHint(output = @DataTypeHint("ROW<name STRING, stats ROW<count BIGINT, avg DOUBLE>>"))
public class AnalyzeData extends TableFunction<Row> {
public void eval(String data) {
// Implementation that emits Row objects matching the hint
}
}Methods for managing function resources and state.
/**
* Base class providing lifecycle methods for all user-defined functions
*/
public abstract class UserDefinedFunction {
/**
* Initialize function resources when function is first used
* @param context Function context with configuration and metrics
*/
public void open(FunctionContext context) throws Exception {
// Override to initialize resources
}
/**
* Clean up function resources when function is no longer needed
*/
public void close() throws Exception {
// Override to clean up resources
}
/**
* Check if function calls are deterministic
* @return true if function is deterministic (same input = same output)
*/
public boolean isDeterministic() {
return true;
}
}
/**
* Function context providing access to configuration and metrics
*/
public interface FunctionContext {
/**
* Get metric group for function metrics
* @return MetricGroup for registering custom metrics
*/
MetricGroup getMetricGroup();
/**
* Get function configuration
* @return Configuration object with job and function parameters
*/
Configuration getJobParameter();
}Function Lifecycle Example:
public class DatabaseLookupFunction extends AsyncScalarFunction {
private transient Connection connection;
private transient Counter lookupCounter;
@Override
public void open(FunctionContext context) throws Exception {
// Initialize database connection
String jdbcUrl = context.getJobParameter().getString("database.url");
connection = DriverManager.getConnection(jdbcUrl);
// Register metrics
lookupCounter = context.getMetricGroup().counter("lookup_count");
}
public void eval(CompletableFuture<String> resultFuture, String key) {
lookupCounter.inc();
CompletableFuture.supplyAsync(() -> {
try (PreparedStatement stmt = connection.prepareStatement("SELECT value FROM lookup WHERE key = ?")) {
stmt.setString(1, key);
ResultSet rs = stmt.executeQuery();
return rs.next() ? rs.getString("value") : null;
} catch (SQLException e) {
throw new RuntimeException(e);
}
}).thenAccept(resultFuture::complete);
}
@Override
public void close() throws Exception {
if (connection != null) {
connection.close();
}
}
}Install with Tessl CLI
npx tessl i tessl/maven-org-apache-flink--flink-table-api-java-uber