CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-apache-flink--flink-table-api-java-uber

Comprehensive uber JAR that consolidates all Java APIs for Apache Flink's Table/SQL ecosystem, enabling developers to write table programs and integrate with other Flink APIs through a single dependency.

Pending
Overview
Eval results
Files

functions.mddocs/

User-Defined Functions

Framework for extending Apache Flink with custom scalar functions, table functions, and aggregate functions for specialized processing requirements.

Capabilities

ScalarFunction

Base class for creating custom scalar functions that take one or more input parameters and return a single result value.

/**
 * Base class for user-defined scalar functions
 */
public abstract class ScalarFunction extends UserDefinedFunction {
    /**
     * Evaluation method that must be implemented for the function logic.
     * Method can be overloaded for different parameter combinations.
     * @param args Input arguments (types must match function signature)
     * @return Computed result value
     */
    public abstract Object eval(Object... args);
    
    /**
     * Optional method to specify the result type when it cannot be inferred
     * @param signature Array of argument types
     * @return DataType of the function result
     */
    public DataType getResultType(DataType[] signature);
    
    /**
     * Optional method to specify type inference for parameters
     * @return TypeInference specification
     */
    public TypeInference getTypeInference();
}

ScalarFunction Example:

// Custom hash function
public class HashFunction extends ScalarFunction {
    public String eval(String input) {
        if (input == null) {
            return null;
        }
        return Integer.toHexString(input.hashCode());
    }
    
    // Overloaded version for multiple inputs
    public String eval(String input1, String input2) {
        if (input1 == null || input2 == null) {
            return null;
        }
        return Integer.toHexString((input1 + input2).hashCode());
    }
}

// Register and use the function
tEnv.createTemporarySystemFunction("hash", HashFunction.class);

Table result = orders
    .select($("order_id"), 
            call("hash", $("customer_email")).as("customer_hash"),
            call("hash", $("order_id").cast(DataTypes.STRING()), 
                        $("customer_email")).as("order_hash"));

TableFunction

Base class for creating custom table functions that take one or more input parameters and return multiple rows (one-to-many transformation).

/**
 * Base class for user-defined table functions
 * @param <T> Type of the output rows
 */
public abstract class TableFunction<T> extends UserDefinedFunction {
    /**
     * Evaluation method that must be implemented for the function logic.
     * Use collect() method to emit output rows.
     * @param args Input arguments (types must match function signature)
     */
    public abstract void eval(Object... args);
    
    /**
     * Emit an output row from the table function
     * @param result Row to emit
     */
    protected void collect(T result);
    
    /**
     * Optional method to specify the result type when it cannot be inferred
     * @param signature Array of argument types
     * @return DataType of the function result rows
     */
    public DataType getResultType(DataType[] signature);
}

TableFunction Example:

// Function to split comma-separated values into rows
@FunctionHint(output = @DataTypeHint("STRING"))
public class SplitFunction extends TableFunction<String> {
    public void eval(String str) {
        if (str != null) {
            for (String s : str.split(",")) {
                collect(s.trim());
            }
        }
    }
}

// Function returning structured rows
@FunctionHint(output = @DataTypeHint("ROW<word STRING, length INT>"))
public class WordAnalyzer extends TableFunction<Row> {
    public void eval(String sentence) {
        if (sentence != null) {
            for (String word : sentence.split("\\s+")) {
                collect(Row.of(word, word.length()));
            }
        }
    }
}

// Register and use table functions
tEnv.createTemporarySystemFunction("split", SplitFunction.class);
tEnv.createTemporarySystemFunction("analyze_words", WordAnalyzer.class);

// Use with LATERAL JOIN
Table result = orders
    .joinLateral(call("split", $("product_tags")).as("tag"))
    .select($("order_id"), $("tag"));

// Use with LEFT JOIN LATERAL for optional results
Table analysis = documents
    .leftOuterJoinLateral(call("analyze_words", $("title")).as("word", "length"))
    .select($("document_id"), $("word"), $("length"));

AggregateFunction

Base class for creating custom aggregate functions that accumulate values over multiple rows and return a single result.

/**
 * Base class for user-defined aggregate functions
 * @param <T> Type of the aggregation result
 * @param <ACC> Type of the accumulator used during aggregation
 */
public abstract class AggregateFunction<T, ACC> extends UserDefinedFunction {
    /**
     * Create and initialize a new accumulator
     * @return New accumulator instance
     */
    public abstract ACC createAccumulator();
    
    /**
     * Accumulate input values into the accumulator
     * @param accumulator Current accumulator state
     * @param args Input values to accumulate
     */
    public abstract void accumulate(ACC accumulator, Object... args);
    
    /**
     * Extract the final result from the accumulator
     * @param accumulator Final accumulator state
     * @return Aggregated result
     */
    public abstract T getValue(ACC accumulator);
    
    /**
     * Retract input values from the accumulator (for streaming updates)
     * @param accumulator Current accumulator state
     * @param args Input values to retract
     */
    public void retract(ACC accumulator, Object... args) {
        // Optional: implement for streaming scenarios with retractions
    }
    
    /**
     * Merge two accumulators (for distributed aggregation)
     * @param accumulator Target accumulator
     * @param others Accumulators to merge from
     */
    public void merge(ACC accumulator, Iterable<ACC> others) {
        // Optional: implement for distributed scenarios
    }
    
    /**
     * Reset the accumulator to initial state
     * @param accumulator Accumulator to reset
     */
    public void resetAccumulator(ACC accumulator) {
        // Optional: implement for reusing accumulators
    }
}

AggregateFunction Example:

// Custom weighted average function
public class WeightedAvg extends AggregateFunction<Double, WeightedAvgAccumulator> {
    
    // Accumulator class
    public static class WeightedAvgAccumulator {
        public double sum = 0;
        public double weightSum = 0;
    }
    
    @Override
    public WeightedAvgAccumulator createAccumulator() {
        return new WeightedAvgAccumulator();
    }
    
    @Override
    public void accumulate(WeightedAvgAccumulator acc, Double value, Double weight) {
        if (value != null && weight != null) {
            acc.sum += value * weight;
            acc.weightSum += weight;
        }
    }
    
    @Override
    public Double getValue(WeightedAvgAccumulator acc) {
        if (acc.weightSum == 0) {
            return null;
        }
        return acc.sum / acc.weightSum;
    }
    
    @Override
    public void retract(WeightedAvgAccumulator acc, Double value, Double weight) {
        if (value != null && weight != null) {
            acc.sum -= value * weight;
            acc.weightSum -= weight;
        }
    }
    
    @Override
    public void merge(WeightedAvgAccumulator acc, Iterable<WeightedAvgAccumulator> others) {
        for (WeightedAvgAccumulator other : others) {
            acc.sum += other.sum;
            acc.weightSum += other.weightSum;
        }
    }
}

// Register and use aggregate function
tEnv.createTemporarySystemFunction("weighted_avg", WeightedAvg.class);

Table result = sales
    .groupBy($("product_category"))
    .select($("product_category"),
            call("weighted_avg", $("price"), $("quantity")).as("weighted_avg_price"));

AsyncScalarFunction

Base class for creating asynchronous scalar functions that can perform non-blocking I/O operations.

/**
 * Base class for asynchronous user-defined scalar functions
 */
public abstract class AsyncScalarFunction extends UserDefinedFunction {
    /**
     * Asynchronous evaluation method
     * @param resultFuture CompletableFuture to complete with the result
     * @param args Input arguments
     */
    public abstract void eval(CompletableFuture<Object> resultFuture, Object... args);
    
    /**
     * Optional method to specify timeout for async operations
     * @return Timeout duration in milliseconds
     */
    public long getTimeout() {
        return 60000; // Default 60 seconds
    }
}

AsyncScalarFunction Example:

// Async function for external service lookup
public class AsyncEnrichFunction extends AsyncScalarFunction {
    private transient HttpClient httpClient;
    
    @Override
    public void open(FunctionContext context) throws Exception {
        httpClient = HttpClient.newHttpClient();
    }
    
    public void eval(CompletableFuture<String> resultFuture, String userId) {
        if (userId == null) {
            resultFuture.complete(null);
            return;
        }
        
        HttpRequest request = HttpRequest.newBuilder()
            .uri(URI.create("https://api.example.com/users/" + userId))
            .build();
            
        httpClient.sendAsync(request, HttpResponse.BodyHandlers.ofString())
            .thenApply(HttpResponse::body)
            .thenAccept(resultFuture::complete)
            .exceptionally(throwable -> {
                resultFuture.complete(null); // Handle errors gracefully
                return null;
            });
    }
    
    @Override
    public void close() throws Exception {
        if (httpClient != null) {
            // Cleanup resources
        }
    }
}

AsyncTableFunction

Base class for creating asynchronous table functions for non-blocking one-to-many transformations.

/**
 * Base class for asynchronous user-defined table functions
 * @param <T> Type of the output rows
 */
public abstract class AsyncTableFunction<T> extends UserDefinedFunction {
    /**
     * Asynchronous evaluation method
     * @param resultFuture CompletableFuture to complete with collection of results
     * @param args Input arguments
     */
    public abstract void eval(CompletableFuture<Collection<T>> resultFuture, Object... args);
}

ProcessTableFunction

Advanced table function for complex transformations with access to multiple input tables and state.

/**
 * Base class for process table functions with advanced capabilities
 * @param <T> Type of the output rows
 */
public abstract class ProcessTableFunction<T> extends UserDefinedFunction {
    /**
     * Process method with access to context
     * @param ctx Processing context with state and timer access
     * @param args Input arguments
     */
    public abstract void eval(ProcessContext ctx, Object... args) throws Exception;
    
    /**
     * Processing context interface
     */
    public interface ProcessContext {
        /**
         * Get keyed state for maintaining function state
         * @param stateDescriptor State descriptor
         * @return State instance
         */
        <S extends State> S getState(StateDescriptor<S, ?> stateDescriptor);
        
        /**
         * Emit an output row
         * @param result Row to emit
         */
        void collect(T result);
        
        /**
         * Get current processing time
         * @return Current processing time timestamp
         */
        long currentProcessingTime();
        
        /**
         * Get current watermark
         * @return Current event time watermark
         */
        long currentWatermark();
    }
}

Function Registration

Methods for registering user-defined functions in the TableEnvironment.

/**
 * Register a function class as a temporary system function
 * @param name Function name for SQL usage
 * @param functionClass Function implementation class
 */
public void createTemporarySystemFunction(String name, Class<? extends UserDefinedFunction> functionClass);

/**
 * Register a function instance as a temporary system function
 * @param name Function name for SQL usage
 * @param functionInstance Function implementation instance
 */
public void createTemporarySystemFunction(String name, UserDefinedFunction functionInstance);

/**
 * Register a function in a specific catalog and database
 * @param path Full path to the function (catalog.database.function)
 * @param functionClass Function implementation class
 */
public void createFunction(String path, Class<? extends UserDefinedFunction> functionClass);

/**
 * Drop a temporary system function
 * @param name Function name to drop
 * @return true if function existed and was dropped
 */
public boolean dropTemporarySystemFunction(String name);

Type Hints and Annotations

Annotations for providing type information to the Flink runtime.

/**
 * Annotation for providing function-level type hints
 */
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
public @interface FunctionHint {
    /**
     * Hint for function input parameters
     */
    DataTypeHint[] input() default {};
    
    /**
     * Hint for function output type
     */
    DataTypeHint output() default @DataTypeHint();
    
    /**
     * Whether function is deterministic
     */
    boolean isDeterministic() default true;
}

/**
 * Annotation for providing data type hints
 */
@Target({ElementType.PARAMETER, ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
public @interface DataTypeHint {
    /**
     * Data type specification string
     */
    String value() default "";
    
    /**
     * Whether the type is nullable
     */
    DefaultBoolean allowRawGlobally() default DefaultBoolean.TRUE;
}

Type Hint Examples:

@FunctionHint(
    input = {@DataTypeHint("STRING"), @DataTypeHint("INT")},
    output = @DataTypeHint("ARRAY<STRING>")
)
public class RepeatString extends ScalarFunction {
    public String[] eval(String str, Integer count) {
        if (str == null || count == null || count <= 0) {
            return new String[0];
        }
        String[] result = new String[count];
        Arrays.fill(result, str);
        return result;
    }
}

// Complex type hint for nested structure
@FunctionHint(output = @DataTypeHint("ROW<name STRING, stats ROW<count BIGINT, avg DOUBLE>>"))
public class AnalyzeData extends TableFunction<Row> {
    public void eval(String data) {
        // Implementation that emits Row objects matching the hint
    }
}

Function Lifecycle

Methods for managing function resources and state.

/**
 * Base class providing lifecycle methods for all user-defined functions
 */
public abstract class UserDefinedFunction {
    /**
     * Initialize function resources when function is first used
     * @param context Function context with configuration and metrics
     */
    public void open(FunctionContext context) throws Exception {
        // Override to initialize resources
    }
    
    /**
     * Clean up function resources when function is no longer needed
     */
    public void close() throws Exception {
        // Override to clean up resources
    }
    
    /**
     * Check if function calls are deterministic
     * @return true if function is deterministic (same input = same output)
     */
    public boolean isDeterministic() {
        return true;
    }
}

/**
 * Function context providing access to configuration and metrics
 */
public interface FunctionContext {
    /**
     * Get metric group for function metrics
     * @return MetricGroup for registering custom metrics
     */
    MetricGroup getMetricGroup();
    
    /**
     * Get function configuration
     * @return Configuration object with job and function parameters
     */
    Configuration getJobParameter();
}

Function Lifecycle Example:

public class DatabaseLookupFunction extends AsyncScalarFunction {
    private transient Connection connection;
    private transient Counter lookupCounter;
    
    @Override
    public void open(FunctionContext context) throws Exception {
        // Initialize database connection
        String jdbcUrl = context.getJobParameter().getString("database.url");
        connection = DriverManager.getConnection(jdbcUrl);
        
        // Register metrics
        lookupCounter = context.getMetricGroup().counter("lookup_count");
    }
    
    public void eval(CompletableFuture<String> resultFuture, String key) {
        lookupCounter.inc();
        
        CompletableFuture.supplyAsync(() -> {
            try (PreparedStatement stmt = connection.prepareStatement("SELECT value FROM lookup WHERE key = ?")) {
                stmt.setString(1, key);
                ResultSet rs = stmt.executeQuery();
                return rs.next() ? rs.getString("value") : null;
            } catch (SQLException e) {
                throw new RuntimeException(e);
            }
        }).thenAccept(resultFuture::complete);
    }
    
    @Override
    public void close() throws Exception {
        if (connection != null) {
            connection.close();
        }
    }
}

Install with Tessl CLI

npx tessl i tessl/maven-org-apache-flink--flink-table-api-java-uber

docs

connectors.md

data-types.md

datastream-bridge.md

expressions.md

functions.md

index.md

sql-gateway.md

table-operations.md

tile.json