CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-apache-flink--flink-table-api-java

Java API for Apache Flink's Table ecosystem, enabling type-safe table operations and SQL query execution.

Pending
Overview
Eval results
Files

user-defined-functions.mddocs/

User-Defined Functions

Flink Table API supports custom functions to extend the built-in function library. You can create scalar functions, table functions, aggregate functions, and table aggregate functions to implement domain-specific logic.

Capabilities

Scalar Functions

User-defined scalar functions take zero, one, or multiple scalar values and return a single scalar value.

/**
 * Base class for scalar user-defined functions
 * Users must extend this class and implement eval() methods
 */
public abstract class ScalarFunction extends UserDefinedFunction {
    /**
     * Users implement one or more eval methods with different signatures
     * The eval method name is fixed - Flink uses reflection to find matching methods
     * 
     * Example signatures:
     * public String eval(String input);
     * public Integer eval(Integer a, Integer b);
     * public Double eval(Double... values);
     */
}

Usage Examples:

// Custom string manipulation function
public class StringHashFunction extends ScalarFunction {
    public String eval(String input) {
        if (input == null) {
            return null;
        }
        return "hash_" + Math.abs(input.hashCode());
    }
    
    public String eval(String input, String prefix) {
        if (input == null) {
            return null;
        }
        return prefix + "_" + Math.abs(input.hashCode());
    }
}

// Register and use the function
StringHashFunction hashFunc = new StringHashFunction();
tableEnv.createTemporaryFunction("string_hash", hashFunc);

// Use in Table API
Table result = sourceTable.select(
    $("id"),
    $("name"),
    call("string_hash", $("name")).as("name_hash"),
    call("string_hash", $("name"), lit("user")).as("prefixed_hash")
);

// Use in SQL
Table sqlResult = tableEnv.sqlQuery(
    "SELECT id, name, string_hash(name) as name_hash " +
    "FROM source_table"
);

// Mathematical function example
public class PowerFunction extends ScalarFunction {
    public Double eval(Double base, Double exponent) {
        if (base == null || exponent == null) {
            return null;
        }
        return Math.pow(base, exponent);
    }
    
    public Long eval(Long base, Long exponent) {
        if (base == null || exponent == null) {
            return null;
        }
        return (long) Math.pow(base, exponent);
    }
}

Table Functions

User-defined table functions take zero, one, or multiple scalar values and return multiple rows (table).

/**
 * Base class for table user-defined functions
 * Users must extend this class and implement eval() methods
 * Use collect() to emit output rows
 */
public abstract class TableFunction<T> extends UserDefinedFunction {
    /**
     * Emits a result row to the output table
     * @param result Row data to emit
     */
    protected void collect(T result);
    
    /**
     * Users implement eval methods that call collect() for each output row
     * The eval method name is fixed - Flink uses reflection to find matching methods
     */
}

Usage Examples:

// Split string into multiple rows
public class SplitFunction extends TableFunction<Row> {
    public void eval(String str, String separator) {
        if (str == null || separator == null) {
            return;
        }
        
        String[] parts = str.split(separator);
        for (int i = 0; i < parts.length; i++) {
            collect(Row.of(parts[i].trim(), i));
        }
    }
}

// Register and use table function
SplitFunction splitFunc = new SplitFunction();
tableEnv.createTemporaryFunction("split_string", splitFunc);

// Use with LATERAL TABLE in SQL
Table result = tableEnv.sqlQuery(
    "SELECT t.id, t.name, s.word, s.position " +
    "FROM source_table t, " +
    "LATERAL TABLE(split_string(t.tags, ',')) AS s(word, position)"
);

// Use in Table API with joinLateral
Table lateralResult = sourceTable
    .joinLateral(call("split_string", $("tags"), lit(",")))
    .select($("id"), $("name"), $("f0").as("word"), $("f1").as("position"));

// Generate series function
public class GenerateSeriesFunction extends TableFunction<Integer> {
    public void eval(Integer start, Integer end) {
        if (start == null || end == null) {
            return;
        }
        
        for (int i = start; i <= end; i++) {
            collect(i);
        }
    }
    
    public void eval(Integer start, Integer end, Integer step) {
        if (start == null || end == null || step == null || step == 0) {
            return;
        }
        
        if (step > 0) {
            for (int i = start; i <= end; i += step) {
                collect(i);
            }
        } else {
            for (int i = start; i >= end; i += step) {
                collect(i);
            }
        }
    }
}

Aggregate Functions

User-defined aggregate functions take multiple rows and compute a single aggregate result.

/**
 * Base class for aggregate user-defined functions
 * Users must implement accumulator management methods
 */
public abstract class AggregateFunction<T, ACC> extends UserDefinedFunction {
    /**
     * Creates a new accumulator for aggregation
     * @return New accumulator instance
     */
    public abstract ACC createAccumulator();
    
    /**
     * Accumulates input values into the accumulator
     * @param accumulator Current accumulator state
     * @param input Input values to accumulate (one or more parameters)
     */
    public abstract void accumulate(ACC accumulator, Object... input);
    
    /**
     * Extracts the final result from the accumulator
     * @param accumulator Final accumulator state
     * @return Aggregate result
     */
    public abstract T getValue(ACC accumulator);
    
    /**
     * Retracts input values from the accumulator (optional)
     * Only needed for streaming scenarios with retractions
     * @param accumulator Current accumulator state
     * @param input Input values to retract
     */
    public void retract(ACC accumulator, Object... input) {
        // Optional - implement if retraction is needed
    }
    
    /**
     * Merges two accumulators (optional)
     * Needed for session windows and some optimization scenarios
     * @param accumulator Target accumulator
     * @param other Source accumulator to merge from
     */
    public void merge(ACC accumulator, Iterable<ACC> other) {
        // Optional - implement if merging is needed
    }
}

Usage Examples:

// Custom average function with accumulator
public class WeightedAverageFunction extends AggregateFunction<Double, WeightedAverageAccumulator> {
    
    // Accumulator class
    public static class WeightedAverageAccumulator {
        public double sum = 0.0;
        public double weightSum = 0.0;
    }
    
    @Override
    public WeightedAverageAccumulator createAccumulator() {
        return new WeightedAverageAccumulator();
    }
    
    @Override
    public void accumulate(WeightedAverageAccumulator acc, Double value, Double weight) {
        if (value != null && weight != null) {
            acc.sum += value * weight;
            acc.weightSum += weight;
        }
    }
    
    @Override
    public Double getValue(WeightedAverageAccumulator acc) {
        if (acc.weightSum == 0.0) {
            return null;
        }
        return acc.sum / acc.weightSum;
    }
    
    @Override
    public void retract(WeightedAverageAccumulator acc, Double value, Double weight) {
        if (value != null && weight != null) {
            acc.sum -= value * weight;
            acc.weightSum -= weight;
        }
    }
    
    @Override
    public void merge(WeightedAverageAccumulator acc, Iterable<WeightedAverageAccumulator> others) {
        for (WeightedAverageAccumulator other : others) {
            acc.sum += other.sum;
            acc.weightSum += other.weightSum;
        }
    }
}

// Register and use aggregate function
WeightedAverageFunction weightedAvg = new WeightedAverageFunction();
tableEnv.createTemporaryFunction("weighted_avg", weightedAvg);

// Use in Table API
Table result = sourceTable
    .groupBy($("category"))
    .select(
        $("category"),
        call("weighted_avg", $("price"), $("quantity")).as("weighted_avg_price")
    );

// Use in SQL
Table sqlResult = tableEnv.sqlQuery(
    "SELECT category, weighted_avg(price, quantity) as weighted_avg_price " +
    "FROM source_table " +
    "GROUP BY category"
);

// Custom string concatenation aggregate
public class StringConcatFunction extends AggregateFunction<String, StringBuilder> {
    
    @Override
    public StringBuilder createAccumulator() {
        return new StringBuilder();
    }
    
    @Override
    public void accumulate(StringBuilder acc, String value, String separator) {
        if (value != null) {
            if (acc.length() > 0 && separator != null) {
                acc.append(separator);
            }
            acc.append(value);
        }
    }
    
    @Override
    public String getValue(StringBuilder acc) {
        return acc.toString();
    }
}

Table Aggregate Functions

Table aggregate functions take multiple rows and return multiple rows (like table functions but with aggregation semantics).

/**
 * Base class for table aggregate functions
 * Combines aspects of both table functions and aggregate functions
 */
public abstract class TableAggregateFunction<T, ACC> extends UserDefinedFunction {
    /**
     * Creates a new accumulator
     * @return New accumulator instance
     */
    public abstract ACC createAccumulator();
    
    /**
     * Accumulates input values
     * @param accumulator Current accumulator state
     * @param input Input values
     */
    public abstract void accumulate(ACC accumulator, Object... input);
    
    /**
     * Emits result rows from the final accumulator state
     * @param accumulator Final accumulator state
     * @param out Collector for emitting output rows
     */
    public abstract void emitValue(ACC accumulator, Collector<T> out);
    
    /**
     * Emits updated result rows during streaming processing
     * @param accumulator Current accumulator state
     * @param out Collector for emitting output rows
     */
    public void emitUpdateWithRetract(ACC accumulator, RetractableCollector<T> out) {
        // Optional - implement for streaming scenarios with retractions
    }
}

Usage Examples:

// Top-N function returning multiple rows per group
public class TopNFunction extends TableAggregateFunction<Row, TopNAccumulator> {
    
    private int n;
    
    public TopNFunction(int n) {
        this.n = n;
    }
    
    public static class TopNAccumulator {
        public List<Double> topValues = new ArrayList<>();
    }
    
    @Override
    public TopNAccumulator createAccumulator() {
        return new TopNAccumulator();
    }
    
    @Override
    public void accumulate(TopNAccumulator acc, Double value) {
        if (value != null) {
            acc.topValues.add(value);
            acc.topValues.sort((a, b) -> Double.compare(b, a)); // Descending order
            
            // Keep only top N values
            if (acc.topValues.size() > n) {
                acc.topValues = acc.topValues.subList(0, n);
            }
        }
    }
    
    @Override
    public void emitValue(TopNAccumulator acc, Collector<Row> out) {
        for (int i = 0; i < acc.topValues.size(); i++) {
            out.collect(Row.of(acc.topValues.get(i), i + 1));
        }
    }
}

// Register and use table aggregate function
TopNFunction topN = new TopNFunction(3);
tableEnv.createTemporaryFunction("top_n", topN);

// Use in Table API
Table topResults = sourceTable
    .groupBy($("category"))
    .flatAggregate(call("top_n", $("score")))
    .select($("category"), $("f0").as("score"), $("f1").as("rank"));

Function Registration Methods

Various ways to register functions in the table environment.

/**
 * Register a function instance with the given name
 * @param name Function name for SQL and Table API usage
 * @param function Function instance
 */
public void createTemporaryFunction(String name, UserDefinedFunction function);

/**
 * Register a function class by name
 * @param name Function name
 * @param functionClass Function class
 */
public void createTemporarySystemFunction(String name, Class<? extends UserDefinedFunction> functionClass);

/**
 * Register function in specific catalog and database
 * @param path Catalog path (catalog.database.function)
 * @param function Function instance
 */
public void createFunction(String path, UserDefinedFunction function);

/**
 * Drop a temporary function
 * @param name Function name to drop
 * @return true if function was dropped
 */
public boolean dropTemporaryFunction(String name);

Usage Examples:

// Register with instance
MyCustomFunction customFunc = new MyCustomFunction();
tableEnv.createTemporaryFunction("my_func", customFunc);

// Register with class
tableEnv.createTemporarySystemFunction("power_func", PowerFunction.class);

// Register in specific catalog
tableEnv.createFunction("my_catalog.my_db.custom_func", customFunc);

// Register via SQL DDL
tableEnv.executeSql(
    "CREATE TEMPORARY FUNCTION my_hash AS 'com.example.StringHashFunction'"
);

// Drop function
boolean dropped = tableEnv.dropTemporaryFunction("my_func");

// Drop via SQL
tableEnv.executeSql("DROP TEMPORARY FUNCTION my_hash");

Advanced Function Features

Advanced patterns for function development and optimization.

/**
 * Function with type inference - override getResultType for complex return types
 */
public abstract class UserDefinedFunction {
    /**
     * Provides type information for the function result
     * @param signature Method signature information
     * @return TypeInformation for the result type
     */
    public TypeInformation<?> getResultType(Class<?>[] signature) {
        // Override to provide custom type information
        return null;
    }
    
    /**
     * Provides parameter type information
     * @param signature Method signature information
     * @return Array of TypeInformation for parameters
     */
    public TypeInformation<?>[] getParameterTypes(Class<?>[] signature) {
        // Override to provide custom parameter type information
        return null;
    }
}

Usage Examples:

// Function with custom type handling
public class ComplexReturnFunction extends ScalarFunction {
    
    // Return a complex type (Row with multiple fields)
    public Row eval(String input) {
        if (input == null) {
            return null;
        }
        
        String[] parts = input.split(":");
        return Row.of(parts[0], Integer.parseInt(parts[1]), Double.parseDouble(parts[2]));
    }
    
    @Override
    public TypeInformation<?> getResultType(Class<?>[] signature) {
        return Types.ROW(
            Types.STRING,   // field 0
            Types.INT,      // field 1  
            Types.DOUBLE    // field 2
        );
    }
}

// Function with configuration
public class ConfigurableFunction extends ScalarFunction {
    private String prefix;
    
    public ConfigurableFunction(String prefix) {
        this.prefix = prefix;
    }
    
    public String eval(String input) {
        return prefix + "_" + input;
    }
}

// Stateful function with open/close lifecycle
public class ResourceFunction extends ScalarFunction {
    private transient SomeResource resource;
    
    @Override
    public void open(FunctionContext context) throws Exception {
        super.open(context);
        // Initialize resources (database connections, etc.)
        this.resource = new SomeResource();
    }
    
    @Override
    public void close() throws Exception {
        super.close();
        // Clean up resources
        if (resource != null) {
            resource.close();
        }
    }
    
    public String eval(String input) {
        return resource.process(input);
    }
}

Best Practices and Performance

Guidelines for efficient function implementation.

Usage Examples:

// Efficient function with null handling
public class EfficientStringFunction extends ScalarFunction {
    
    // Handle null inputs early
    public String eval(String input) {
        if (input == null) {
            return null;
        }
        
        // Avoid creating unnecessary objects
        if (input.isEmpty()) {
            return "";
        }
        
        // Use StringBuilder for string concatenation
        StringBuilder sb = new StringBuilder(input.length() + 10);
        sb.append("processed_").append(input);
        return sb.toString();
    }
    
    // Provide overloaded methods for different input types
    public String eval(Integer input) {
        if (input == null) {
            return null;
        }
        return "processed_" + input;
    }
}

// Reusable accumulator for better performance
public class EfficientAggregateFunction extends AggregateFunction<Double, EfficientAggregateFunction.Acc> {
    
    public static class Acc {
        public double sum = 0.0;
        public long count = 0L;
        
        // Reset for reuse
        public void reset() {
            sum = 0.0;
            count = 0L;
        }
    }
    
    @Override
    public Acc createAccumulator() {
        return new Acc();
    }
    
    @Override
    public void accumulate(Acc acc, Double value) {
        if (value != null) {
            acc.sum += value;
            acc.count++;
        }
    }
    
    @Override
    public Double getValue(Acc acc) {
        return acc.count > 0 ? acc.sum / acc.count : null;
    }
}

Install with Tessl CLI

npx tessl i tessl/maven-org-apache-flink--flink-table-api-java

docs

aggregation-grouping.md

catalog-management.md

expressions.md

index.md

sql-integration.md

table-environment.md

table-operations.md

user-defined-functions.md

window-operations.md

tile.json