or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

catalog-integration.mdconfiguration.mdfunction-module.mdindex.mdpartition-management.mdtable-source-sink.md
tile.json

function-module.mddocs/

Function Module Integration

Hive built-in function support enabling use of Hive UDFs within Flink SQL queries. The function module provides seamless compatibility with Hive's extensive library of built-in functions and enables registration of custom user-defined functions.

Capabilities

HiveModule

Module implementation that provides access to Hive's built-in functions within Flink SQL.

/**
 * Module providing Hive built-in functions for Flink SQL
 * Enables transparent use of Hive functions in Flink queries
 */
public class HiveModule implements Module {
    
    /**
     * Create HiveModule with default Hive version (detected from classpath)
     */
    public HiveModule();
    
    /**
     * Create HiveModule with specific Hive version
     * @param hiveVersion Hive version string (e.g., "2.3.9")
     */
    public HiveModule(String hiveVersion);
    
    /**
     * Create HiveModule with specific Hive version and class loader
     * @param hiveVersion Hive version string (e.g., "2.3.9")
     * @param classLoader Class loader for function loading
     */
    public HiveModule(String hiveVersion, ClassLoader classLoader);
    
    /**
     * Create HiveModule with specific Hive version, configuration, and class loader
     * @param hiveVersion Hive version string (e.g., "2.3.9")
     * @param config Readable configuration for module settings
     * @param classLoader Class loader for function loading
     */
    public HiveModule(String hiveVersion, ReadableConfig config, ClassLoader classLoader);
    
    /**
     * List all available Hive built-in functions
     * @return Set of function names available through this module
     */
    public Set<String> listFunctions();
    
    /**
     * Get function definition for a specific function name
     * @param name Function name (case-insensitive)
     * @return Optional containing function definition if available
     */
    public Optional<FunctionDefinition> getFunctionDefinition(String name);
}

Usage Examples:

import org.apache.flink.table.module.hive.HiveModule;
import org.apache.flink.table.api.TableEnvironment;

// Create table environment
TableEnvironment tableEnv = TableEnvironment.create(settings);

// Load Hive module with default version
HiveModule hiveModule = new HiveModule();
tableEnv.loadModule("hive", hiveModule);

// Or specify explicit Hive version
HiveModule specificModule = new HiveModule("2.3.9");
tableEnv.loadModule("hive", specificModule);

// List available functions
Set<String> functions = hiveModule.listFunctions();
System.out.println("Available Hive functions: " + functions.size());

// Check if specific function exists
Optional<FunctionDefinition> concatWs = hiveModule.getFunctionDefinition("concat_ws");
if (concatWs.isPresent()) {
    System.out.println("concat_ws function is available");
}

// Use Hive functions in SQL queries
Table result = tableEnv.sqlQuery("""
    SELECT 
        concat_ws('|', first_name, last_name) as full_name,
        upper(email) as email_upper,
        size(split(address, ' ')) as address_parts,
        from_unixtime(created_timestamp) as created_date,
        if(active = 1, 'ACTIVE', 'INACTIVE') as status
    FROM users
""");

Available Hive Functions

The HiveModule provides access to a comprehensive set of Hive built-in functions:

String Functions

-- String manipulation functions
SELECT 
    concat('Hello', ' ', 'World') as greeting,
    concat_ws('|', col1, col2, col3) as pipe_separated,
    upper(name) as name_upper,
    lower(email) as email_lower,
    length(description) as desc_length,
    substr(text, 1, 10) as first_10_chars,
    trim(padded_string) as trimmed,
    regexp_replace(phone, '[^0-9]', '') as clean_phone,
    split(csv_data, ',') as array_values,
    reverse(string_col) as reversed
FROM my_table;

Date and Time Functions

-- Date/time manipulation functions
SELECT 
    from_unixtime(unix_timestamp) as formatted_date,
    unix_timestamp('2024-01-01 12:00:00') as unix_ts,
    year(date_col) as year_part,
    month(date_col) as month_part,
    day(date_col) as day_part,
    date_add(date_col, 30) as thirty_days_later,
    date_sub(current_date(), 7) as week_ago,
    datediff(end_date, start_date) as days_between,
    date_format(datetime_col, 'yyyy-MM-dd') as formatted
FROM events;

Mathematical Functions

-- Mathematical and statistical functions
SELECT 
    abs(negative_value) as absolute,
    round(decimal_value, 2) as rounded,
    ceil(float_value) as ceiling,
    floor(float_value) as floored,
    greatest(val1, val2, val3) as maximum,
    least(val1, val2, val3) as minimum,
    rand() as random_value,
    pow(base, exponent) as power,
    sqrt(number) as square_root,
    sin(angle) as sine_value
FROM calculations;

Collection Functions

-- Array and map functions
SELECT 
    size(array_col) as array_length,
    array_contains(tags, 'important') as has_important_tag,
    sort_array(string_array) as sorted_array,
    map_keys(properties) as property_keys,
    map_values(properties) as property_values,
    explode(array_col) as individual_elements
FROM structured_data;

Type Conversion Functions

-- Type casting and conversion functions
SELECT 
    cast(string_number as int) as integer_value,
    cast(timestamp_col as date) as date_only,
    string(numeric_col) as string_representation,
    int(boolean_col) as boolean_as_int,
    double(string_decimal) as decimal_value
FROM mixed_types;

Conditional Functions

-- Conditional and null-handling functions
SELECT 
    if(score > 80, 'PASS', 'FAIL') as result,
    case 
        when grade >= 90 then 'A'
        when grade >= 80 then 'B'
        when grade >= 70 then 'C'
        else 'F'
    end as letter_grade,
    coalesce(preferred_name, first_name, 'Unknown') as display_name,
    nvl(optional_field, 'N/A') as with_default,
    isnull(nullable_col) as is_null_check,
    isnotnull(nullable_col) as is_not_null_check
FROM student_grades;

Advanced Function Integration

Custom UDF Registration

While HiveModule provides built-in functions, you can also register custom Hive UDFs:

// Register custom Hive UDF
tableEnv.createTemporaryFunction("my_custom_udf", MyHiveUDF.class);

// Use in SQL queries
Table result = tableEnv.sqlQuery("""
    SELECT 
        user_id,
        my_custom_udf(input_data) as processed_data
    FROM user_data
""");

Function Resolution Order

Configure module loading order to control function resolution:

// Load modules in specific order
tableEnv.loadModule("hive", new HiveModule("2.3.9"));
tableEnv.loadModule("core", CoreModule.INSTANCE);

// List loaded modules
String[] modules = tableEnv.listModules();
System.out.println("Loaded modules: " + Arrays.toString(modules));

// Use module order for function resolution
// Functions in earlier modules take precedence

Function Catalog Integration

Combine with Hive catalog for comprehensive function access:

// Set up both catalog and module
HiveCatalog catalog = new HiveCatalog("hive", "default", "/etc/hive/conf", null, "2.3.9");
HiveModule module = new HiveModule("2.3.9");

tableEnv.registerCatalog("hive", catalog);
tableEnv.loadModule("hive", module);
tableEnv.useCatalog("hive");

// Access both built-in and user-defined functions
Table result = tableEnv.sqlQuery("""
    SELECT 
        -- Built-in Hive function from module
        concat_ws('|', first_name, last_name) as full_name,
        -- Custom UDF registered in catalog
        my_database.my_custom_function(data) as processed,
        -- Standard Flink function
        CURRENT_TIMESTAMP as processing_time
    FROM user_profiles
""");

Performance Considerations

// Configure function execution for performance
Configuration config = new Configuration();

// Enable object reuse for UDF performance
config.setBoolean("table.exec.resource.default-parallelism", true);

// Configure state backend for stateful UDFs
config.setString("state.backend", "rocksdb");
config.setString("state.checkpoints.dir", "hdfs://namenode:9000/checkpoints");

TableEnvironment tableEnv = TableEnvironment.create(
    EnvironmentSettings.newInstance()
        .withConfiguration(config)
        .build()
);

Complex Function Usage Examples

// Complex data processing with Hive functions
Table processedData = tableEnv.sqlQuery("""
    WITH parsed_logs AS (
        SELECT 
            regexp_extract(log_line, '(\\d{4}-\\d{2}-\\d{2})', 1) as log_date,
            regexp_extract(log_line, 'level=(\\w+)', 1) as log_level,
            split(log_line, '\\|') as log_parts,
            size(split(log_line, '\\|')) as part_count
        FROM raw_logs
        WHERE log_line IS NOT NULL
    ),
    enhanced_logs AS (
        SELECT 
            *,
            from_unixtime(unix_timestamp(log_date, 'yyyy-MM-dd')) as parsed_date,
            if(log_level IN ('ERROR', 'FATAL'), 1, 0) as is_error,
            map('date', log_date, 'level', log_level) as log_metadata
        FROM parsed_logs
        WHERE part_count >= 3
    )
    SELECT 
        date_format(parsed_date, 'yyyy-MM') as month,
        log_level,
        count(*) as log_count,
        sum(is_error) as error_count,
        collect_list(log_metadata) as monthly_logs
    FROM enhanced_logs
    GROUP BY date_format(parsed_date, 'yyyy-MM'), log_level
    ORDER BY month DESC, log_level
""");

// Execute and print results
processedData.execute().print();

Integration Patterns

SQL DDL Function Definition

-- Create temporary function from Java class
CREATE TEMPORARY FUNCTION my_hash AS 'com.company.udfs.HashFunction';

-- Create catalog function (persisted in Hive metastore)
CREATE FUNCTION my_catalog.analytics.custom_aggregator AS 'com.company.udfs.CustomAggregator'
USING JAR 'hdfs://namenode:9000/udf-jars/custom-functions.jar';

Streaming Function Usage

// Use Hive functions in streaming queries
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

// Load Hive module for streaming
tableEnv.loadModule("hive", new HiveModule("2.3.9"));

// Process streaming data with Hive functions
Table streamResult = tableEnv.sqlQuery("""
    SELECT 
        window_start,
        window_end,
        concat_ws(':', user_id, session_id) as user_session,
        count(*) as event_count,
        collect_list(event_type) as event_types,
        max(from_unixtime(event_timestamp)) as latest_event
    FROM TABLE(
        HOP(TABLE source_stream, DESCRIPTOR(event_time), INTERVAL '1' MINUTE, INTERVAL '5' MINUTE)
    )
    GROUP BY window_start, window_end, user_id, session_id
""");