CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-apache-flink--flink-table

Apache Flink's Table API and SQL module for unified stream and batch processing

Pending
Overview
Eval results
Files

user-defined-functions.mddocs/

User-Defined Functions

Flink's Table API provides a comprehensive framework for creating custom scalar, table, and aggregate functions. UDFs enable extending the built-in function library with domain-specific logic while maintaining type safety and performance optimization.

Capabilities

Scalar Functions

Functions that take one or more input values and return a single output value.

/**
 * Base class for user-defined scalar functions
 * Users extend this class and implement eval() methods
 */
abstract class ScalarFunction extends UserDefinedFunction {
    // Users implement one or more eval() methods with different signatures
    // public ReturnType eval(InputType1 input1, InputType2 input2, ...);
    
    /**
     * Gets the function context for accessing runtime information
     * @return FunctionContext providing runtime context
     */
    FunctionContext getFunctionContext();
}

/**
 * Base class for all user-defined functions
 */
abstract class UserDefinedFunction implements FunctionDefinition {
    /**
     * Optional method called when function is opened
     * @param context Function context for initialization
     */
    void open(FunctionContext context) throws Exception;
    
    /**
     * Optional method called when function is closed
     */
    void close() throws Exception;
    
    /**
     * Indicates whether the function is deterministic
     * @return true if function always returns same result for same inputs
     */
    boolean isDeterministic();
    
    /**
     * Gets the type inference for this function
     * @return TypeInference defining input/output types
     */
    TypeInference getTypeInference();
}

Usage Examples:

// Simple scalar function
public class UpperCaseFunction extends ScalarFunction {
    public String eval(String input) {
        return input != null ? input.toUpperCase() : null;
    }
}

// Multiple eval signatures for overloading
public class AddFunction extends ScalarFunction {
    public Integer eval(Integer a, Integer b) {
        return (a != null && b != null) ? a + b : null;
    }
    
    public Double eval(Double a, Double b) {
        return (a != null && b != null) ? a + b : null;
    }
    
    public String eval(String a, String b) {
        return (a != null && b != null) ? a + b : null;
    }
}

// Function with context and lifecycle
public class HashFunction extends ScalarFunction {
    private MessageDigest md5;
    
    @Override
    public void open(FunctionContext context) throws Exception {
        md5 = MessageDigest.getInstance("MD5");
    }
    
    public String eval(String input) {
        if (input == null) return null;
        
        byte[] hash = md5.digest(input.getBytes());
        return DatatypeConverter.printHexBinary(hash);
    }
}

// Registration and usage
tableEnv.createTemporarySystemFunction("my_upper", new UpperCaseFunction());
Table result = tableEnv.sqlQuery("SELECT my_upper(name) FROM users");

Table Functions

Functions that take zero or more input values and return multiple rows (table-valued functions).

/**
 * Base class for user-defined table functions
 * @param <T> Type of output rows
 */
abstract class TableFunction<T> extends UserDefinedFunction {
    // Users implement one or more eval() methods that call collect()
    // public void eval(InputType1 input1, InputType2 input2, ...);
    
    /**
     * Emits a result row from the table function
     * @param result Result row to emit
     */
    protected void collect(T result);
    
    /**
     * Gets the result type of this table function
     * @return DataType representing the output row structure
     */
    DataType getResultType();
}

Usage Examples:

// Split string into multiple rows
@FunctionHint(output = @DataTypeHint("ROW<word STRING>"))
public class SplitFunction extends TableFunction<Row> {
    
    public void eval(String str) {
        if (str != null) {
            for (String word : str.split("\\s+")) {
                collect(Row.of(word));
            }
        }
    }
}

// Generate number sequence
@FunctionHint(output = @DataTypeHint("ROW<num INT>"))
public class RangeFunction extends TableFunction<Row> {
    
    public void eval(Integer start, Integer end) {
        if (start != null && end != null) {
            for (int i = start; i <= end; i++) {
                collect(Row.of(i));
            }
        }
    }
}

// Registration and usage
tableEnv.createTemporarySystemFunction("split_words", new SplitFunction());

// SQL usage with LATERAL TABLE
Table result = tableEnv.sqlQuery(
    "SELECT t.word, COUNT(*) as word_count " +
    "FROM documents d, LATERAL TABLE(split_words(d.content)) AS t(word) " +
    "GROUP BY t.word"
);

// Table API usage
Table documents = tableEnv.from("documents");
Table words = documents
    .joinLateral(call("split_words", $("content")).as("word"))
    .select($("doc_id"), $("word"));

Aggregate Functions

Functions that aggregate multiple input rows into a single output value.

/**
 * Base class for user-defined aggregate functions
 * @param <T> Type of the final result
 * @param <ACC> Type of the accumulator
 */
abstract class AggregateFunction<T, ACC> extends ImperativeAggregateFunction<T, ACC> {
    /**
     * Creates a new accumulator for aggregation
     * @return New accumulator instance
     */
    public abstract ACC createAccumulator();
    
    /**
     * Extracts the final result from the accumulator
     * @param accumulator Final accumulator state
     * @return Aggregation result
     */
    public abstract T getValue(ACC accumulator);
    
    // Users implement accumulate() method(s)
    // public void accumulate(ACC accumulator, InputType1 input1, InputType2 input2, ...);
    
    /**
     * Optional: Retracts a value from the accumulator (for changelog streams)
     * @param accumulator Accumulator to retract from
     * @param input Input values to retract
     */
    // public void retract(ACC accumulator, InputType1 input1, InputType2 input2, ...);
    
    /**
     * Optional: Merges two accumulators (for batch processing)
     * @param accumulator Target accumulator
     * @param accumulators Source accumulators to merge
     */
    // public void merge(ACC accumulator, Iterable<ACC> accumulators);
}

Usage Examples:

// Weighted average aggregate function
public class WeightedAvgAccumulator {
    public double sum = 0.0;
    public double weightSum = 0.0;
}

public class WeightedAverage extends AggregateFunction<Double, WeightedAvgAccumulator> {
    
    @Override
    public WeightedAvgAccumulator createAccumulator() {
        return new WeightedAvgAccumulator();
    }
    
    public void accumulate(WeightedAvgAccumulator acc, Double value, Double weight) {
        if (value != null && weight != null) {
            acc.sum += value * weight;
            acc.weightSum += weight;
        }
    }
    
    @Override
    public Double getValue(WeightedAvgAccumulator acc) {
        return acc.weightSum != 0 ? acc.sum / acc.weightSum : null;
    }
    
    public void retract(WeightedAvgAccumulator acc, Double value, Double weight) {
        if (value != null && weight != null) {
            acc.sum -= value * weight;
            acc.weightSum -= weight;
        }
    }
    
    public void merge(WeightedAvgAccumulator acc, Iterable<WeightedAvgAccumulator> others) {
        for (WeightedAvgAccumulator other : others) {
            acc.sum += other.sum;
            acc.weightSum += other.weightSum;
        }
    }
}

// Registration and usage
tableEnv.createTemporarySystemFunction("weighted_avg", new WeightedAverage());

Table result = tableEnv.sqlQuery(
    "SELECT product_category, weighted_avg(price, quantity) as avg_price " +
    "FROM sales " +
    "GROUP BY product_category"
);

Table Aggregate Functions

Functions that aggregate multiple input rows into multiple output rows.

/**
 * Base class for user-defined table aggregate functions
 * @param <T> Type of the output rows
 * @param <ACC> Type of the accumulator
 */
abstract class TableAggregateFunction<T, ACC> extends ImperativeAggregateFunction<T, ACC> {
    /**
     * Creates a new accumulator for aggregation
     * @return New accumulator instance
     */
    public abstract ACC createAccumulator();
    
    // Users implement accumulate() method(s)
    // public void accumulate(ACC accumulator, InputType1 input1, InputType2 input2, ...);
    
    /**
     * Emits the final result from the accumulator
     * @param accumulator Final accumulator state
     * @param out Collector for emitting results
     */
    public abstract void emitValue(ACC accumulator, Collector<T> out);
    
    /**
     * Optional: Emits incremental updates with retraction
     * @param accumulator Current accumulator state
     * @param out Collector for emitting results with retraction
     */
    // public void emitUpdateWithRetract(ACC accumulator, RetractableCollector<T> out);
}

Usage Examples:

// Top N aggregate function
public class TopNAccumulator {
    public List<Tuple2<Integer, String>> topN = new ArrayList<>();
    public int n;
}

public class TopN extends TableAggregateFunction<Tuple2<Integer, String>, TopNAccumulator> {
    private int n;
    
    public TopN(int n) {
        this.n = n;
    }
    
    @Override
    public TopNAccumulator createAccumulator() {
        TopNAccumulator acc = new TopNAccumulator();
        acc.n = n;
        return acc;
    }
    
    public void accumulate(TopNAccumulator acc, Integer score, String name) {
        if (score != null && name != null) {
            acc.topN.add(Tuple2.of(score, name));
            acc.topN.sort((a, b) -> b.f0.compareTo(a.f0)); // Sort descending
            if (acc.topN.size() > acc.n) {
                acc.topN.remove(acc.topN.size() - 1);
            }
        }
    }
    
    @Override
    public void emitValue(TopNAccumulator acc, Collector<Tuple2<Integer, String>> out) {
        for (Tuple2<Integer, String> item : acc.topN) {
            out.collect(item);
        }
    }
}

// Registration and usage
tableEnv.createTemporarySystemFunction("top3", new TopN(3));

Table result = tableEnv.sqlQuery(
    "SELECT score, name " +
    "FROM (SELECT score, name FROM players) " +
    "GROUP BY () " + 
    "FLAT_AGGREGATE(top3(score, name)) AS (score, name)"
);

Async Functions

Functions that perform asynchronous operations for I/O bound tasks.

/**
 * Async scalar function for I/O bound operations
 */
class AsyncScalarFunction extends UserDefinedFunction {
    // Users implement evalAsync() methods that return CompletableFuture
    // public CompletableFuture<ReturnType> evalAsync(InputType1 input1, InputType2 input2, ...);
}

/**
 * Base class for async table functions
 * @param <T> Type of output rows
 */
abstract class AsyncTableFunction<T> extends UserDefinedFunction {
    // Users implement evalAsync() methods that use AsyncCollector
    // public void evalAsync(InputType1 input1, AsyncCollector<T> collector);
}

Usage Examples:

// Async HTTP lookup function
public class HttpLookupFunction extends AsyncScalarFunction {
    private transient AsyncHttpClient httpClient;
    
    @Override
    public void open(FunctionContext context) throws Exception {
        httpClient = Dsl.asyncHttpClient();
    }
    
    public CompletableFuture<String> evalAsync(String url) {
        if (url == null) {
            return CompletableFuture.completedFuture(null);
        }
        
        return httpClient
            .prepareGet(url)
            .execute()
            .toCompletableFuture()
            .thenApply(response -> response.getResponseBody());
    }
    
    @Override
    public void close() throws Exception {
        if (httpClient != null) {
            httpClient.close();
        }
    }
}

// Registration and usage
tableEnv.createTemporarySystemFunction("http_get", new HttpLookupFunction());

Function Type Inference

Advanced type inference for complex function signatures.

/**
 * Annotation for providing type hints to functions
 */
@Target({ElementType.TYPE, ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
@interface FunctionHint {
    DataTypeHint[] input() default {};
    DataTypeHint output() default @DataTypeHint();
    boolean isDeterministic() default true;
}

/**
 * Annotation for specifying data types
 */
@Target({ElementType.TYPE, ElementType.METHOD, ElementType.FIELD})
@Retention(RetentionPolicy.RUNTIME)
@interface DataTypeHint {
    String value() default "";
    Class<?> bridgedTo() default void.class;
    boolean allowRawGlobally() default false;
}

Usage Examples:

// Function with explicit type hints
@FunctionHint(
    input = {@DataTypeHint("STRING"), @DataTypeHint("INT")},
    output = @DataTypeHint("ARRAY<STRING>")
)
public class RepeatFunction extends TableFunction<String[]> {
    
    public void eval(String str, Integer count) {
        if (str != null && count != null && count > 0) {
            String[] result = new String[count];
            Arrays.fill(result, str);
            collect(result);
        }
    }
}

// Complex return type with row structure
@FunctionHint(output = @DataTypeHint("ROW<id BIGINT, name STRING, score DOUBLE>"))
public class ParseResultFunction extends TableFunction<Row> {
    
    public void eval(String jsonString) {
        // Parse JSON and emit structured rows
        JsonObject json = JsonParser.parseString(jsonString).getAsJsonObject();
        collect(Row.of(
            json.get("id").getAsLong(),
            json.get("name").getAsString(),
            json.get("score").getAsDouble()
        ));
    }
}

Types

Function Base Classes

abstract class UserDefinedFunction implements FunctionDefinition {
    void open(FunctionContext context) throws Exception;
    void close() throws Exception;
    boolean isDeterministic();
    TypeInference getTypeInference();
}

abstract class ScalarFunction extends UserDefinedFunction {
    // Implementation-specific eval() methods
}

abstract class TableFunction<T> extends UserDefinedFunction {
    protected void collect(T result);
    DataType getResultType();
}

abstract class AggregateFunction<T, ACC> extends ImperativeAggregateFunction<T, ACC> {
    public abstract ACC createAccumulator();
    public abstract T getValue(ACC accumulator);
    // Optional: public void retract(ACC accumulator, ...);
    // Optional: public void merge(ACC accumulator, Iterable<ACC> others);
}

abstract class TableAggregateFunction<T, ACC> extends ImperativeAggregateFunction<T, ACC> {
    public abstract ACC createAccumulator();
    public abstract void emitValue(ACC accumulator, Collector<T> out);
    // Optional: public void emitUpdateWithRetract(ACC accumulator, RetractableCollector<T> out);
}

Function Context

interface FunctionContext {
    MetricGroup getMetricGroup();
    int getIndexOfThisSubtask();
    int getNumberOfParallelSubtasks();
    String getJobParameter(String key, String defaultValue);
    
    // Access to distributed cache
    File getCachedFile(String name);
}

Collectors and Async Support

interface Collector<T> {
    void collect(T record);
}

interface RetractableCollector<T> extends Collector<T> {
    void retract(T record);
}

interface AsyncCollector<T> {
    void collect(Collection<T> result);
    void complete(Collection<T> result);
}

Install with Tessl CLI

npx tessl i tessl/maven-org-apache-flink--flink-table

docs

catalog-system.md

datastream-integration.md

index.md

sql-execution.md

table-environment.md

table-operations.md

type-system.md

user-defined-functions.md

tile.json