Module system integration enabling access to Hive built-in functions within Flink SQL, including string functions, date functions, and mathematical operations.
Module implementation that provides access to Hive built-in functions within Flink SQL, enabling compatibility with existing Hive queries and workflows.
/**
* Provides Hive built-in functions to Flink
* Enables Hive function compatibility within Flink SQL queries
*/
class HiveModule implements Module {
/**
* Create HiveModule with default Hive version detection
* Automatically detects Hive version from classpath
*/
HiveModule();
/**
* Create HiveModule with specific Hive version
* @param hiveVersion - Hive version string (e.g., "2.3.6")
*/
HiveModule(String hiveVersion);
/**
* List all available Hive functions
* @return Set of function names available through this module
*/
Set<String> listFunctions();
/**
* Get function definition for a specific function name
* @param name - Function name (case-insensitive)
* @return Optional containing FunctionDefinition if function exists
*/
Optional<FunctionDefinition> getFunctionDefinition(String name);
/**
* Get the Hive version used by this module
* @return Hive version string
*/
String getHiveVersion();
}Usage Examples:
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.module.hive.HiveModule;
TableEnvironment tableEnv = TableEnvironment.create(settings);
// Load Hive module with default version
tableEnv.loadModule("hive", new HiveModule());
// Load Hive module with specific version
tableEnv.loadModule("hive", new HiveModule("2.3.6"));
// List loaded modules
String[] modules = tableEnv.listModules();
System.out.println("Loaded modules: " + Arrays.toString(modules));
// Use Hive functions in SQL
tableEnv.executeSql(
"SELECT " +
" customer_name, " +
" concat('Customer: ', customer_name) as formatted_name, " +
" upper(customer_name) as upper_name, " +
" length(customer_name) as name_length, " +
" year(registration_date) as reg_year " +
"FROM customer_data"
).print();
// Use Hive date functions
tableEnv.executeSql(
"SELECT " +
" order_date, " +
" date_format(order_date, 'yyyy-MM') as year_month, " +
" date_add(order_date, 30) as plus_30_days, " +
" datediff(CURRENT_DATE, order_date) as days_ago " +
"FROM orders"
).print();Configuration options for HiveModule behavior and version selection.
/**
* Configuration options for HiveModule
* Controls module behavior and Hive version compatibility
*/
class HiveModuleOptions {
/**
* Hive version configuration option
* Used to specify which Hive version compatibility to use
*/
@Documentation.TableOption(execMode = Documentation.ExecMode.BATCH_STREAMING)
static final ConfigOption<String> HIVE_VERSION = ConfigOptions
.key("hive-version")
.stringType()
.defaultValue("2.3.6")
.withDescription("Hive version to use for function compatibility");
}Usage Examples:
import org.apache.flink.configuration.Configuration;
import org.apache.flink.table.module.hive.HiveModuleOptions;
// Configure Hive version via configuration
Configuration config = new Configuration();
config.setString(HiveModuleOptions.HIVE_VERSION.key(), "2.3.6");
// Apply configuration to table environment
tableEnv.getConfig().addConfiguration(config);
// Load module with configuration
tableEnv.loadModule("hive", new HiveModule());Bridge interface for Hive UDF integration, enabling custom Hive functions to work within Flink.
/**
* Bridge interface for Hive UDF integration
* Connects Hive UDF, GenericUDF, and GenericUDTF with Flink's type system
*/
interface HiveFunction {
/**
* Set argument types and constant values for function execution
* Called during query planning to prepare function with type information
* @param constantArguments - Array of constant argument values (null for non-constants)
* @param argTypes - Array of argument data types
*/
void setArgumentTypesAndConstants(Object[] constantArguments, DataType[] argTypes);
/**
* Get the result data type for this Hive function
* Returns the Flink data type corresponding to the Hive function's return type
* @param constantArguments - Array of constant argument values
* @param argTypes - Array of argument data types
* @return DataType representing the function's return type
*/
DataType getHiveResultType(Object[] constantArguments, DataType[] argTypes);
}The HiveModule provides access to a comprehensive set of Hive built-in functions:
-- String manipulation functions
SELECT
concat('Hello', ' ', 'World') as greeting,
upper('foobar') as upper_case,
lower('FOOBAR') as lower_case,
length('Hello World') as str_length,
substr('Hello World', 1, 5) as substring,
trim(' spaces ') as trimmed,
ltrim(' left spaces') as left_trimmed,
rtrim('right spaces ') as right_trimmed,
reverse('Hello') as reversed,
regexp_replace('foo123bar', '[0-9]+', 'XXX') as regex_replaced
FROM dual;-- Date and time manipulation functions
SELECT
year(order_date) as order_year,
month(order_date) as order_month,
dayofweek(order_date) as day_of_week,
date_format(order_date, 'yyyy-MM-dd') as formatted_date,
date_add(order_date, 30) as plus_30_days,
date_sub(order_date, 7) as minus_7_days,
datediff(CURRENT_DATE, order_date) as days_since_order,
unix_timestamp(order_date) as unix_ts,
from_unixtime(1609459200) as from_unix
FROM orders;-- Mathematical and numerical functions
SELECT
abs(-42) as absolute_value,
ceil(3.7) as ceiling,
floor(3.7) as floor_value,
round(3.14159, 2) as rounded,
sqrt(25) as square_root,
pow(2, 3) as power,
exp(1) as exponential,
ln(2.718) as natural_log,
log10(100) as log_base_10,
sin(3.14159/2) as sine,
cos(0) as cosine,
rand() as random_number
FROM dual;-- Conditional and logical functions
SELECT
customer_id,
if(amount > 1000, 'High Value', 'Regular') as customer_type,
case
when amount > 5000 then 'Premium'
when amount > 1000 then 'Gold'
else 'Standard'
end as tier,
coalesce(discount_amount, 0) as final_discount,
nvl(customer_notes, 'No notes') as notes,
greatest(amount, tax_amount, shipping_cost) as max_cost,
least(amount, tax_amount, shipping_cost) as min_cost
FROM orders;-- Aggregate functions (work in GROUP BY contexts)
SELECT
region,
count(*) as order_count,
sum(amount) as total_amount,
avg(amount) as avg_amount,
min(amount) as min_amount,
max(amount) as max_amount,
stddev(amount) as amount_stddev,
variance(amount) as amount_variance,
percentile_approx(amount, 0.5) as median_amount,
collect_list(customer_id) as customer_list,
collect_set(product_category) as categories
FROM orders
GROUP BY region;-- Array and map manipulation functions
SELECT
array(1, 2, 3, 4, 5) as number_array,
size(tags) as tag_count,
array_contains(tags, 'promotion') as has_promotion,
sort_array(tags) as sorted_tags,
map('key1', 'value1', 'key2', 'value2') as example_map,
map_keys(metadata) as meta_keys,
map_values(metadata) as meta_values
FROM products;-- Type conversion functions
SELECT
cast('123' as int) as str_to_int,
cast(123.45 as string) as num_to_str,
cast('2023-12-01' as date) as str_to_date,
cast(amount as decimal(10,2)) as formatted_amount,
binary('Hello World') as to_binary
FROM transactions;// Module loading order affects function resolution priority
tableEnv.loadModule("core", CoreModule.INSTANCE); // Built-in Flink functions
tableEnv.loadModule("hive", new HiveModule("2.3.6")); // Hive functions
// Hive functions will override Flink functions with same names
// To prioritize Flink functions, load core module after Hive module
tableEnv.unloadModule("core");
tableEnv.unloadModule("hive");
tableEnv.loadModule("hive", new HiveModule("2.3.6"));
tableEnv.loadModule("core", CoreModule.INSTANCE);
// Check function resolution
tableEnv.executeSql("SHOW FUNCTIONS").print();Existing Hive UDFs can be used directly in Flink:
// Register custom Hive UDF
tableEnv.executeSql(
"CREATE TEMPORARY FUNCTION my_custom_func AS 'com.example.MyHiveUDF' " +
"USING JAR '/path/to/my-udf.jar'"
);
// Use custom UDF in SQL
tableEnv.executeSql(
"SELECT customer_id, my_custom_func(customer_data) as processed_data " +
"FROM customers"
).print();Different Hive versions provide different sets of functions:
The connector automatically provides version-appropriate function implementations based on the specified Hive version.