Apache Flink SQL connector for Apache Hive 3.1.2, enabling unified batch and stream processing with Hive tables.
—
Module system that provides access to Hive built-in functions within Flink SQL environments. The HiveModule enables compatibility with existing Hive UDFs while maintaining function behavior consistency across Hive and Flink systems.
Primary module implementation that loads and provides Hive built-in functions to Flink SQL.
/**
* Module that provides Hive built-in functions to Flink SQL
* Enables access to Hive UDFs and maintains function behavior consistency
*/
public class HiveModule implements Module {
/**
* Creates a HiveModule with default Hive version
*/
public HiveModule();
/**
* Creates a HiveModule with specific Hive version
* @param hiveVersion Hive version for function compatibility (e.g., "3.1.2")
*/
public HiveModule(String hiveVersion);
/**
* Returns all function definitions provided by this module
* @return Set of function definition names
*/
public Set<String> listFunctions();
/**
* Gets a specific function definition by name
* @param name Function name (case-insensitive)
* @return Optional containing FunctionDefinition if found
*/
public Optional<FunctionDefinition> getFunctionDefinition(String name);
}Usage Examples:
-- Load Hive module to access Hive functions
LOAD MODULE hive WITH ('hive-version' = '3.1.2');
-- Use Hive built-in string functions
SELECT
substr(name, 1, 3) as name_prefix,
upper(category) as category_upper,
concat(first_name, ' ', last_name) as full_name,
regexp_replace(phone, '(\\d{3})(\\d{3})(\\d{4})', '($1) $2-$3') as formatted_phone
FROM customer_table;
-- Use Hive date/time functions
SELECT
unix_timestamp(date_string, 'yyyy-MM-dd') as timestamp_value,
from_unixtime(unix_timestamp()) as current_datetime,
date_add(current_date(), 7) as week_from_now,
date_format(event_time, 'yyyy-MM-dd HH:mm:ss') as formatted_time
FROM events_table;
-- Use Hive aggregate functions
SELECT
category,
collect_list(product_name) as product_names,
collect_set(brand) as unique_brands,
percentile_approx(price, 0.5) as median_price,
histogram_numeric(price, 10) as price_distribution
FROM products_table
GROUP BY category;// Programmatic module loading
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.module.hive.HiveModule;
TableEnvironment tableEnv = TableEnvironment.create(settings);
// Load Hive module with specific version
tableEnv.loadModule("hive", new HiveModule("3.1.2"));
// List loaded modules
String[] modules = tableEnv.listModules();
System.out.println("Loaded modules: " + Arrays.toString(modules));
// Use Hive functions in Table API
Table result = tableEnv.from("source_table")
.select($("name"), call("upper", $("category")).as("category_upper"))
.where(call("length", $("name")).isGreater(5));The HiveModule provides access to hundreds of Hive built-in functions across multiple categories:
-- Text manipulation and pattern matching
substr(string, start, length) -- Extract substring
upper(string), lower(string) -- Case conversion
concat(string1, string2, ...) -- String concatenation
concat_ws(separator, string1, ...) -- Concatenate with separator
regexp_replace(string, pattern, replacement) -- Regex replacement
regexp_extract(string, pattern, group) -- Regex extraction
split(string, pattern) -- Split string into array
trim(string), ltrim(string), rtrim(string) -- Whitespace removal
length(string) -- String length
reverse(string) -- Reverse string-- Date/time manipulation and formatting
unix_timestamp() -- Current Unix timestamp
unix_timestamp(string, pattern) -- Parse date string to timestamp
from_unixtime(bigint) -- Convert timestamp to string
from_unixtime(bigint, pattern) -- Convert with custom format
current_date() -- Current date
current_timestamp() -- Current timestamp
date_add(date, days) -- Add days to date
date_sub(date, days) -- Subtract days from date
datediff(date1, date2) -- Difference in days
date_format(date, pattern) -- Format date as string
year(date), month(date), day(date) -- Extract date parts
hour(timestamp), minute(timestamp) -- Extract time parts-- Numeric operations and calculations
round(double, digits) -- Round to specified digits
ceil(double), floor(double) -- Ceiling and floor functions
abs(double) -- Absolute value
pow(double, double) -- Power function
sqrt(double) -- Square root
log(double), log10(double) -- Logarithm functions
sin(double), cos(double), tan(double) -- Trigonometric functions
rand(), rand(seed) -- Random number generation-- Conditional logic and case handling
if(condition, value_if_true, value_if_false) -- Simple conditional
case when condition1 then value1
when condition2 then value2
else default_value end -- Multi-condition case
coalesce(value1, value2, ...) -- Return first non-null value
nvl(value, default_value) -- Null value substitution
isnull(value), isnotnull(value) -- Null checking-- Advanced aggregation beyond standard SQL
collect_list(column) -- Collect values into array
collect_set(column) -- Collect unique values into array
percentile_approx(column, percentile) -- Approximate percentile
histogram_numeric(column, bins) -- Numeric histogram
var_pop(column), var_samp(column) -- Population/sample variance
stddev_pop(column), stddev_samp(column) -- Population/sample std deviation-- Complex data type manipulation
array_contains(array, value) -- Check if array contains value
size(array_or_map) -- Get array/map size
sort_array(array) -- Sort array elements
map_keys(map), map_values(map) -- Get map keys/valuesWhen multiple modules provide functions with the same name, Flink uses module loading order for resolution:
// Module loading order affects function resolution
tableEnv.loadModule("core", CoreModule.INSTANCE); // Loaded first
tableEnv.loadModule("hive", new HiveModule()); // Loaded second
// If both modules have a function named "upper":
// - Core module's "upper" takes precedence
// - Use "hive.upper" to explicitly call Hive version-- Explicit module prefixing
SELECT
upper(name) as core_upper, -- Uses core module's upper()
hive.upper(name) as hive_upper, -- Explicitly uses Hive's upper()
hive.collect_list(values) as list -- Hive-specific function
FROM source_table;The HiveModule supports version-specific function loading to maintain compatibility:
/**
* Version compatibility constants and utilities
*/
public class HiveShimLoader {
public static final String HIVE_VERSION_V2_3_0 = "2.3.0";
public static final String HIVE_VERSION_V2_3_6 = "2.3.6";
public static final String HIVE_VERSION_V3_1_0 = "3.1.0";
public static final String HIVE_VERSION_V3_1_2 = "3.1.2";
public static final String HIVE_VERSION_V3_1_3 = "3.1.3";
/**
* Gets the default Hive version for the current environment
* @return Default Hive version string
*/
public static String getHiveVersion();
}Version-Specific Considerations:
The HiveModule also provides access to custom Hive UDFs when they are available in the Hive metastore:
-- Register custom UDF in Hive (external to Flink)
-- CREATE FUNCTION my_custom_function AS 'com.company.MyUDF';
-- Use custom UDF in Flink after loading Hive module
LOAD MODULE hive WITH ('hive-version' = '3.1.2');
SELECT
id,
my_custom_function(input_data) as processed_data
FROM source_table;// Handle function loading errors
try {
tableEnv.loadModule("hive", new HiveModule("3.1.2"));
} catch (Exception e) {
// Handle module loading failures
// Common causes: incompatible Hive version, missing dependencies
log.error("Failed to load Hive module", e);
}
// List available functions for debugging
tableEnv.listUserDefinedFunctions(); // Shows loaded UDFs
tableEnv.listModules(); // Shows loaded modulesCommon Issues:
Install with Tessl CLI
npx tessl i tessl/maven-org-apache-flink--flink-sql-connector-hive-3-1-2-2-12