0
# Function Module Integration
1
2
Hive built-in function support enabling use of Hive UDFs within Flink SQL queries. The function module provides seamless compatibility with Hive's extensive library of built-in functions and enables registration of custom user-defined functions.
3
4
## Capabilities
5
6
### HiveModule
7
8
Module implementation that provides access to Hive's built-in functions within Flink SQL.
9
10
```java { .api }
11
/**
12
* Module providing Hive built-in functions for Flink SQL
13
* Enables transparent use of Hive functions in Flink queries
14
*/
15
public class HiveModule implements Module {
16
17
/**
18
* Create HiveModule with default Hive version (detected from classpath)
19
*/
20
public HiveModule();
21
22
/**
23
* Create HiveModule with specific Hive version
24
* @param hiveVersion Hive version string (e.g., "2.3.9")
25
*/
26
public HiveModule(String hiveVersion);
27
28
/**
29
* Create HiveModule with specific Hive version and class loader
30
* @param hiveVersion Hive version string (e.g., "2.3.9")
31
* @param classLoader Class loader for function loading
32
*/
33
public HiveModule(String hiveVersion, ClassLoader classLoader);
34
35
/**
36
* Create HiveModule with specific Hive version, configuration, and class loader
37
* @param hiveVersion Hive version string (e.g., "2.3.9")
38
* @param config Readable configuration for module settings
39
* @param classLoader Class loader for function loading
40
*/
41
public HiveModule(String hiveVersion, ReadableConfig config, ClassLoader classLoader);
42
43
/**
44
* List all available Hive built-in functions
45
* @return Set of function names available through this module
46
*/
47
public Set<String> listFunctions();
48
49
/**
50
* Get function definition for a specific function name
51
* @param name Function name (case-insensitive)
52
* @return Optional containing function definition if available
53
*/
54
public Optional<FunctionDefinition> getFunctionDefinition(String name);
55
}
56
```
57
58
**Usage Examples:**
59
60
```java
61
import org.apache.flink.table.module.hive.HiveModule;
62
import org.apache.flink.table.api.TableEnvironment;
63
64
// Create table environment
65
TableEnvironment tableEnv = TableEnvironment.create(settings);
66
67
// Load Hive module with default version
68
HiveModule hiveModule = new HiveModule();
69
tableEnv.loadModule("hive", hiveModule);
70
71
// Or specify explicit Hive version
72
HiveModule specificModule = new HiveModule("2.3.9");
73
tableEnv.loadModule("hive", specificModule);
74
75
// List available functions
76
Set<String> functions = hiveModule.listFunctions();
77
System.out.println("Available Hive functions: " + functions.size());
78
79
// Check if specific function exists
80
Optional<FunctionDefinition> concatWs = hiveModule.getFunctionDefinition("concat_ws");
81
if (concatWs.isPresent()) {
82
System.out.println("concat_ws function is available");
83
}
84
85
// Use Hive functions in SQL queries
86
Table result = tableEnv.sqlQuery("""
87
SELECT
88
concat_ws('|', first_name, last_name) as full_name,
89
upper(email) as email_upper,
90
size(split(address, ' ')) as address_parts,
91
from_unixtime(created_timestamp) as created_date,
92
if(active = 1, 'ACTIVE', 'INACTIVE') as status
93
FROM users
94
""");
95
```
96
97
98
### Available Hive Functions
99
100
The HiveModule provides access to a comprehensive set of Hive built-in functions:
101
102
#### String Functions
103
104
```sql
105
-- String manipulation functions
106
SELECT
107
concat('Hello', ' ', 'World') as greeting,
108
concat_ws('|', col1, col2, col3) as pipe_separated,
109
upper(name) as name_upper,
110
lower(email) as email_lower,
111
length(description) as desc_length,
112
substr(text, 1, 10) as first_10_chars,
113
trim(padded_string) as trimmed,
114
regexp_replace(phone, '[^0-9]', '') as clean_phone,
115
split(csv_data, ',') as array_values,
116
reverse(string_col) as reversed
117
FROM my_table;
118
```
119
120
#### Date and Time Functions
121
122
```sql
123
-- Date/time manipulation functions
124
SELECT
125
from_unixtime(unix_timestamp) as formatted_date,
126
unix_timestamp('2024-01-01 12:00:00') as unix_ts,
127
year(date_col) as year_part,
128
month(date_col) as month_part,
129
day(date_col) as day_part,
130
date_add(date_col, 30) as thirty_days_later,
131
date_sub(current_date(), 7) as week_ago,
132
datediff(end_date, start_date) as days_between,
133
date_format(datetime_col, 'yyyy-MM-dd') as formatted
134
FROM events;
135
```
136
137
#### Mathematical Functions
138
139
```sql
140
-- Mathematical and statistical functions
141
SELECT
142
abs(negative_value) as absolute,
143
round(decimal_value, 2) as rounded,
144
ceil(float_value) as ceiling,
145
floor(float_value) as floored,
146
greatest(val1, val2, val3) as maximum,
147
least(val1, val2, val3) as minimum,
148
rand() as random_value,
149
pow(base, exponent) as power,
150
sqrt(number) as square_root,
151
sin(angle) as sine_value
152
FROM calculations;
153
```
154
155
#### Collection Functions
156
157
```sql
158
-- Array and map functions
159
SELECT
160
size(array_col) as array_length,
161
array_contains(tags, 'important') as has_important_tag,
162
sort_array(string_array) as sorted_array,
163
map_keys(properties) as property_keys,
164
map_values(properties) as property_values,
165
explode(array_col) as individual_elements
166
FROM structured_data;
167
```
168
169
#### Type Conversion Functions
170
171
```sql
172
-- Type casting and conversion functions
173
SELECT
174
cast(string_number as int) as integer_value,
175
cast(timestamp_col as date) as date_only,
176
string(numeric_col) as string_representation,
177
int(boolean_col) as boolean_as_int,
178
double(string_decimal) as decimal_value
179
FROM mixed_types;
180
```
181
182
#### Conditional Functions
183
184
```sql
185
-- Conditional and null-handling functions
186
SELECT
187
if(score > 80, 'PASS', 'FAIL') as result,
188
case
189
when grade >= 90 then 'A'
190
when grade >= 80 then 'B'
191
when grade >= 70 then 'C'
192
else 'F'
193
end as letter_grade,
194
coalesce(preferred_name, first_name, 'Unknown') as display_name,
195
nvl(optional_field, 'N/A') as with_default,
196
isnull(nullable_col) as is_null_check,
197
isnotnull(nullable_col) as is_not_null_check
198
FROM student_grades;
199
```
200
201
## Advanced Function Integration
202
203
### Custom UDF Registration
204
205
While HiveModule provides built-in functions, you can also register custom Hive UDFs:
206
207
```java
208
// Register custom Hive UDF
209
tableEnv.createTemporaryFunction("my_custom_udf", MyHiveUDF.class);
210
211
// Use in SQL queries
212
Table result = tableEnv.sqlQuery("""
213
SELECT
214
user_id,
215
my_custom_udf(input_data) as processed_data
216
FROM user_data
217
""");
218
```
219
220
### Function Resolution Order
221
222
Configure module loading order to control function resolution:
223
224
```java
225
// Load modules in specific order
226
tableEnv.loadModule("hive", new HiveModule("2.3.9"));
227
tableEnv.loadModule("core", CoreModule.INSTANCE);
228
229
// List loaded modules
230
String[] modules = tableEnv.listModules();
231
System.out.println("Loaded modules: " + Arrays.toString(modules));
232
233
// Use module order for function resolution
234
// Functions in earlier modules take precedence
235
```
236
237
### Function Catalog Integration
238
239
Combine with Hive catalog for comprehensive function access:
240
241
```java
242
// Set up both catalog and module
243
HiveCatalog catalog = new HiveCatalog("hive", "default", "/etc/hive/conf", null, "2.3.9");
244
HiveModule module = new HiveModule("2.3.9");
245
246
tableEnv.registerCatalog("hive", catalog);
247
tableEnv.loadModule("hive", module);
248
tableEnv.useCatalog("hive");
249
250
// Access both built-in and user-defined functions
251
Table result = tableEnv.sqlQuery("""
252
SELECT
253
-- Built-in Hive function from module
254
concat_ws('|', first_name, last_name) as full_name,
255
-- Custom UDF registered in catalog
256
my_database.my_custom_function(data) as processed,
257
-- Standard Flink function
258
CURRENT_TIMESTAMP as processing_time
259
FROM user_profiles
260
""");
261
```
262
263
### Performance Considerations
264
265
```java
266
// Configure function execution for performance
267
Configuration config = new Configuration();
268
269
// Enable object reuse for UDF performance
270
config.setBoolean("table.exec.resource.default-parallelism", true);
271
272
// Configure state backend for stateful UDFs
273
config.setString("state.backend", "rocksdb");
274
config.setString("state.checkpoints.dir", "hdfs://namenode:9000/checkpoints");
275
276
TableEnvironment tableEnv = TableEnvironment.create(
277
EnvironmentSettings.newInstance()
278
.withConfiguration(config)
279
.build()
280
);
281
```
282
283
### Complex Function Usage Examples
284
285
```java
286
// Complex data processing with Hive functions
287
Table processedData = tableEnv.sqlQuery("""
288
WITH parsed_logs AS (
289
SELECT
290
regexp_extract(log_line, '(\\d{4}-\\d{2}-\\d{2})', 1) as log_date,
291
regexp_extract(log_line, 'level=(\\w+)', 1) as log_level,
292
split(log_line, '\\|') as log_parts,
293
size(split(log_line, '\\|')) as part_count
294
FROM raw_logs
295
WHERE log_line IS NOT NULL
296
),
297
enhanced_logs AS (
298
SELECT
299
*,
300
from_unixtime(unix_timestamp(log_date, 'yyyy-MM-dd')) as parsed_date,
301
if(log_level IN ('ERROR', 'FATAL'), 1, 0) as is_error,
302
map('date', log_date, 'level', log_level) as log_metadata
303
FROM parsed_logs
304
WHERE part_count >= 3
305
)
306
SELECT
307
date_format(parsed_date, 'yyyy-MM') as month,
308
log_level,
309
count(*) as log_count,
310
sum(is_error) as error_count,
311
collect_list(log_metadata) as monthly_logs
312
FROM enhanced_logs
313
GROUP BY date_format(parsed_date, 'yyyy-MM'), log_level
314
ORDER BY month DESC, log_level
315
""");
316
317
// Execute and print results
318
processedData.execute().print();
319
```
320
321
## Integration Patterns
322
323
### SQL DDL Function Definition
324
325
```sql
326
-- Create temporary function from Java class
327
CREATE TEMPORARY FUNCTION my_hash AS 'com.company.udfs.HashFunction';
328
329
-- Create catalog function (persisted in Hive metastore)
330
CREATE FUNCTION my_catalog.analytics.custom_aggregator AS 'com.company.udfs.CustomAggregator'
331
USING JAR 'hdfs://namenode:9000/udf-jars/custom-functions.jar';
332
```
333
334
### Streaming Function Usage
335
336
```java
337
// Use Hive functions in streaming queries
338
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
339
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
340
341
// Load Hive module for streaming
342
tableEnv.loadModule("hive", new HiveModule("2.3.9"));
343
344
// Process streaming data with Hive functions
345
Table streamResult = tableEnv.sqlQuery("""
346
SELECT
347
window_start,
348
window_end,
349
concat_ws(':', user_id, session_id) as user_session,
350
count(*) as event_count,
351
collect_list(event_type) as event_types,
352
max(from_unixtime(event_timestamp)) as latest_event
353
FROM TABLE(
354
HOP(TABLE source_stream, DESCRIPTOR(event_time), INTERVAL '1' MINUTE, INTERVAL '5' MINUTE)
355
)
356
GROUP BY window_start, window_end, user_id, session_id
357
""");
358
```