Hive built-in function support enabling use of Hive UDFs within Flink SQL queries. The function module provides seamless compatibility with Hive's extensive library of built-in functions and enables registration of custom user-defined functions.
Module implementation that provides access to Hive's built-in functions within Flink SQL.
/**
* Module providing Hive built-in functions for Flink SQL
* Enables transparent use of Hive functions in Flink queries
*/
public class HiveModule implements Module {
/**
* Create HiveModule with default Hive version (detected from classpath)
*/
public HiveModule();
/**
* Create HiveModule with specific Hive version
* @param hiveVersion Hive version string (e.g., "2.3.9")
*/
public HiveModule(String hiveVersion);
/**
* Create HiveModule with specific Hive version and class loader
* @param hiveVersion Hive version string (e.g., "2.3.9")
* @param classLoader Class loader for function loading
*/
public HiveModule(String hiveVersion, ClassLoader classLoader);
/**
* Create HiveModule with specific Hive version, configuration, and class loader
* @param hiveVersion Hive version string (e.g., "2.3.9")
* @param config Readable configuration for module settings
* @param classLoader Class loader for function loading
*/
public HiveModule(String hiveVersion, ReadableConfig config, ClassLoader classLoader);
/**
* List all available Hive built-in functions
* @return Set of function names available through this module
*/
public Set<String> listFunctions();
/**
* Get function definition for a specific function name
* @param name Function name (case-insensitive)
* @return Optional containing function definition if available
*/
public Optional<FunctionDefinition> getFunctionDefinition(String name);
}Usage Examples:
import org.apache.flink.table.module.hive.HiveModule;
import org.apache.flink.table.api.TableEnvironment;
// Create table environment
TableEnvironment tableEnv = TableEnvironment.create(settings);
// Load Hive module with default version
HiveModule hiveModule = new HiveModule();
tableEnv.loadModule("hive", hiveModule);
// Or specify explicit Hive version
HiveModule specificModule = new HiveModule("2.3.9");
tableEnv.loadModule("hive", specificModule);
// List available functions
Set<String> functions = hiveModule.listFunctions();
System.out.println("Available Hive functions: " + functions.size());
// Check if specific function exists
Optional<FunctionDefinition> concatWs = hiveModule.getFunctionDefinition("concat_ws");
if (concatWs.isPresent()) {
System.out.println("concat_ws function is available");
}
// Use Hive functions in SQL queries
Table result = tableEnv.sqlQuery("""
SELECT
concat_ws('|', first_name, last_name) as full_name,
upper(email) as email_upper,
size(split(address, ' ')) as address_parts,
from_unixtime(created_timestamp) as created_date,
if(active = 1, 'ACTIVE', 'INACTIVE') as status
FROM users
""");The HiveModule provides access to a comprehensive set of Hive built-in functions:
-- String manipulation functions
SELECT
concat('Hello', ' ', 'World') as greeting,
concat_ws('|', col1, col2, col3) as pipe_separated,
upper(name) as name_upper,
lower(email) as email_lower,
length(description) as desc_length,
substr(text, 1, 10) as first_10_chars,
trim(padded_string) as trimmed,
regexp_replace(phone, '[^0-9]', '') as clean_phone,
split(csv_data, ',') as array_values,
reverse(string_col) as reversed
FROM my_table;-- Date/time manipulation functions
SELECT
from_unixtime(unix_timestamp) as formatted_date,
unix_timestamp('2024-01-01 12:00:00') as unix_ts,
year(date_col) as year_part,
month(date_col) as month_part,
day(date_col) as day_part,
date_add(date_col, 30) as thirty_days_later,
date_sub(current_date(), 7) as week_ago,
datediff(end_date, start_date) as days_between,
date_format(datetime_col, 'yyyy-MM-dd') as formatted
FROM events;-- Mathematical and statistical functions
SELECT
abs(negative_value) as absolute,
round(decimal_value, 2) as rounded,
ceil(float_value) as ceiling,
floor(float_value) as floored,
greatest(val1, val2, val3) as maximum,
least(val1, val2, val3) as minimum,
rand() as random_value,
pow(base, exponent) as power,
sqrt(number) as square_root,
sin(angle) as sine_value
FROM calculations;-- Array and map functions
SELECT
size(array_col) as array_length,
array_contains(tags, 'important') as has_important_tag,
sort_array(string_array) as sorted_array,
map_keys(properties) as property_keys,
map_values(properties) as property_values,
explode(array_col) as individual_elements
FROM structured_data;-- Type casting and conversion functions
SELECT
cast(string_number as int) as integer_value,
cast(timestamp_col as date) as date_only,
string(numeric_col) as string_representation,
int(boolean_col) as boolean_as_int,
double(string_decimal) as decimal_value
FROM mixed_types;-- Conditional and null-handling functions
SELECT
if(score > 80, 'PASS', 'FAIL') as result,
case
when grade >= 90 then 'A'
when grade >= 80 then 'B'
when grade >= 70 then 'C'
else 'F'
end as letter_grade,
coalesce(preferred_name, first_name, 'Unknown') as display_name,
nvl(optional_field, 'N/A') as with_default,
isnull(nullable_col) as is_null_check,
isnotnull(nullable_col) as is_not_null_check
FROM student_grades;While HiveModule provides built-in functions, you can also register custom Hive UDFs:
// Register custom Hive UDF
tableEnv.createTemporaryFunction("my_custom_udf", MyHiveUDF.class);
// Use in SQL queries
Table result = tableEnv.sqlQuery("""
SELECT
user_id,
my_custom_udf(input_data) as processed_data
FROM user_data
""");Configure module loading order to control function resolution:
// Load modules in specific order
tableEnv.loadModule("hive", new HiveModule("2.3.9"));
tableEnv.loadModule("core", CoreModule.INSTANCE);
// List loaded modules
String[] modules = tableEnv.listModules();
System.out.println("Loaded modules: " + Arrays.toString(modules));
// Use module order for function resolution
// Functions in earlier modules take precedenceCombine with Hive catalog for comprehensive function access:
// Set up both catalog and module
HiveCatalog catalog = new HiveCatalog("hive", "default", "/etc/hive/conf", null, "2.3.9");
HiveModule module = new HiveModule("2.3.9");
tableEnv.registerCatalog("hive", catalog);
tableEnv.loadModule("hive", module);
tableEnv.useCatalog("hive");
// Access both built-in and user-defined functions
Table result = tableEnv.sqlQuery("""
SELECT
-- Built-in Hive function from module
concat_ws('|', first_name, last_name) as full_name,
-- Custom UDF registered in catalog
my_database.my_custom_function(data) as processed,
-- Standard Flink function
CURRENT_TIMESTAMP as processing_time
FROM user_profiles
""");// Configure function execution for performance
Configuration config = new Configuration();
// Enable object reuse for UDF performance
config.setBoolean("table.exec.resource.default-parallelism", true);
// Configure state backend for stateful UDFs
config.setString("state.backend", "rocksdb");
config.setString("state.checkpoints.dir", "hdfs://namenode:9000/checkpoints");
TableEnvironment tableEnv = TableEnvironment.create(
EnvironmentSettings.newInstance()
.withConfiguration(config)
.build()
);// Complex data processing with Hive functions
Table processedData = tableEnv.sqlQuery("""
WITH parsed_logs AS (
SELECT
regexp_extract(log_line, '(\\d{4}-\\d{2}-\\d{2})', 1) as log_date,
regexp_extract(log_line, 'level=(\\w+)', 1) as log_level,
split(log_line, '\\|') as log_parts,
size(split(log_line, '\\|')) as part_count
FROM raw_logs
WHERE log_line IS NOT NULL
),
enhanced_logs AS (
SELECT
*,
from_unixtime(unix_timestamp(log_date, 'yyyy-MM-dd')) as parsed_date,
if(log_level IN ('ERROR', 'FATAL'), 1, 0) as is_error,
map('date', log_date, 'level', log_level) as log_metadata
FROM parsed_logs
WHERE part_count >= 3
)
SELECT
date_format(parsed_date, 'yyyy-MM') as month,
log_level,
count(*) as log_count,
sum(is_error) as error_count,
collect_list(log_metadata) as monthly_logs
FROM enhanced_logs
GROUP BY date_format(parsed_date, 'yyyy-MM'), log_level
ORDER BY month DESC, log_level
""");
// Execute and print results
processedData.execute().print();-- Create temporary function from Java class
CREATE TEMPORARY FUNCTION my_hash AS 'com.company.udfs.HashFunction';
-- Create catalog function (persisted in Hive metastore)
CREATE FUNCTION my_catalog.analytics.custom_aggregator AS 'com.company.udfs.CustomAggregator'
USING JAR 'hdfs://namenode:9000/udf-jars/custom-functions.jar';// Use Hive functions in streaming queries
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
// Load Hive module for streaming
tableEnv.loadModule("hive", new HiveModule("2.3.9"));
// Process streaming data with Hive functions
Table streamResult = tableEnv.sqlQuery("""
SELECT
window_start,
window_end,
concat_ws(':', user_id, session_id) as user_session,
count(*) as event_count,
collect_list(event_type) as event_types,
max(from_unixtime(event_timestamp)) as latest_event
FROM TABLE(
HOP(TABLE source_stream, DESCRIPTOR(event_time), INTERVAL '1' MINUTE, INTERVAL '5' MINUTE)
)
GROUP BY window_start, window_end, user_id, session_id
""");