0
# Hive Functions
1
2
Access to Hive built-in functions including UDF, UDAF, and UDTF through the HiveModule system with version-specific compatibility. Enables seamless usage of Hive's extensive function library within Flink SQL queries.
3
4
## Capabilities
5
6
### HiveModule
7
8
Main module providing access to Hive built-in functions with version compatibility.
9
10
```java { .api }
11
/**
12
* Module providing Hive built-in metadata and functions
13
* Enables access to Hive UDF, UDAF, and UDTF functions
14
*/
15
public class HiveModule implements Module {
16
/**
17
* Create HiveModule for specific Hive version
18
* @param hiveVersion - Hive version string (e.g., "2.3.6")
19
*/
20
public HiveModule(String hiveVersion);
21
22
/**
23
* Create HiveModule with default latest supported version
24
*/
25
public HiveModule();
26
27
/**
28
* List all available functions in this module
29
* @return Set of function names
30
*/
31
public Set<String> listFunctions();
32
33
/**
34
* Get function definition by name
35
* @param name - Function name to look up
36
* @return Optional FunctionDefinition if function exists
37
*/
38
public Optional<FunctionDefinition> getFunctionDefinition(String name);
39
}
40
```
41
42
### HiveModuleFactory
43
44
Factory for creating HiveModule instances through service discovery.
45
46
```java { .api }
47
/**
48
* Factory for creating HiveModule instances
49
* Used by Flink's module loading system
50
*/
51
public class HiveModuleFactory implements ModuleFactory {
52
/**
53
* Get the factory identifier
54
* @return "hive" identifier string
55
*/
56
public String factoryIdentifier();
57
58
/**
59
* Create HiveModule instance from context
60
* @param context - Module creation context with options
61
* @return New HiveModule instance
62
*/
63
public Module createModule(Context context);
64
65
/**
66
* Get required configuration options
67
* @return Set of required ConfigOption objects
68
*/
69
public Set<ConfigOption<?>> requiredOptions();
70
71
/**
72
* Get optional configuration options
73
* @return Set of optional ConfigOption objects
74
*/
75
public Set<ConfigOption<?>> optionalOptions();
76
}
77
```
78
79
### Function Wrapper Interfaces
80
81
Base interfaces and classes for wrapping Hive functions.
82
83
```java { .api }
84
/**
85
* Base interface for all Hive function wrappers
86
* Marker interface to identify Hive-originated functions
87
*/
88
public interface HiveFunction {
89
// Marker interface - no methods
90
}
91
92
/**
93
* Wrapper for Hive function implementations
94
* Provides access to underlying Hive function instance
95
*/
96
public class HiveFunctionWrapper<UDFType> implements Serializable {
97
/**
98
* Create wrapper for Hive function
99
* @param className - Hive function class name
100
*/
101
public HiveFunctionWrapper(String className);
102
103
/**
104
* Create instance of the wrapped function
105
* @return New instance of the Hive function
106
*/
107
public UDFType createFunction();
108
109
/**
110
* Get the class name of the wrapped function
111
* @return Fully qualified class name
112
*/
113
public String getClassName();
114
}
115
```
116
117
### Scalar Function Wrappers
118
119
Wrappers for Hive UDF (User Defined Function) implementations.
120
121
```java { .api }
122
/**
123
* Wrapper for Hive Generic UDF functions
124
* Handles complex types and object inspection
125
*/
126
public class HiveGenericUDF extends ScalarFunction implements HiveFunction {
127
/**
128
* Create wrapper for Generic UDF
129
* @param hiveFunctionWrapper - Wrapper for the Hive function
130
* @param hiveShim - Version-specific Hive compatibility shim
131
*/
132
public HiveGenericUDF(HiveFunctionWrapper<GenericUDF> hiveFunctionWrapper, HiveShim hiveShim);
133
134
/**
135
* Evaluate function with given arguments
136
* @param arguments - Function arguments
137
* @return Function result
138
*/
139
public Object eval(Object... arguments);
140
141
/**
142
* Get result type information
143
* @param signature - Function signature
144
* @return Type information for result
145
*/
146
public TypeInformation<?> getResultType(Class<?>[] signature);
147
}
148
149
/**
150
* Wrapper for Hive Simple UDF functions
151
* Handles primitive types and simple objects
152
*/
153
public class HiveSimpleUDF extends ScalarFunction implements HiveFunction {
154
/**
155
* Create wrapper for Simple UDF
156
* @param hiveFunctionWrapper - Wrapper for the Hive function
157
* @param hiveShim - Version-specific Hive compatibility shim
158
*/
159
public HiveSimpleUDF(HiveFunctionWrapper<UDF> hiveFunctionWrapper, HiveShim hiveShim);
160
161
/**
162
* Evaluate function with given arguments
163
* @param arguments - Function arguments
164
* @return Function result
165
*/
166
public Object eval(Object... arguments);
167
}
168
```
169
170
### Aggregate Function Wrappers
171
172
Wrappers for Hive UDAF (User Defined Aggregate Function) implementations.
173
174
```java { .api }
175
/**
176
* Wrapper for Hive Generic UDAF functions
177
* Provides aggregation capabilities with accumulators
178
*/
179
public class HiveGenericUDAF extends AggregateFunction<Object, GenericUDAFEvaluator.AggregationBuffer> implements HiveFunction {
180
/**
181
* Create wrapper for Generic UDAF
182
* @param funcWrapper - Wrapper for the Hive function
183
* @param hiveShim - Version-specific Hive compatibility shim
184
*/
185
public HiveGenericUDAF(HiveFunctionWrapper<GenericUDAF> funcWrapper, HiveShim hiveShim);
186
187
/**
188
* Create accumulator for aggregation
189
* @return New accumulator instance
190
*/
191
public GenericUDAFEvaluator.AggregationBuffer createAccumulator();
192
193
/**
194
* Get final result from accumulator
195
* @param accumulator - Accumulator with aggregated state
196
* @return Final aggregation result
197
*/
198
public Object getValue(GenericUDAFEvaluator.AggregationBuffer accumulator);
199
200
/**
201
* Accumulate value into aggregator
202
* @param accumulator - Current accumulator
203
* @param input - Input value to accumulate
204
*/
205
public void accumulate(GenericUDAFEvaluator.AggregationBuffer accumulator, Object... input);
206
207
/**
208
* Retract value from aggregator (for streaming)
209
* @param accumulator - Current accumulator
210
* @param input - Input value to retract
211
*/
212
public void retract(GenericUDAFEvaluator.AggregationBuffer accumulator, Object... input);
213
214
/**
215
* Merge two accumulators
216
* @param accumulator - Target accumulator
217
* @param iterable - Accumulators to merge
218
*/
219
public void merge(GenericUDAFEvaluator.AggregationBuffer accumulator, Iterable<GenericUDAFEvaluator.AggregationBuffer> iterable);
220
}
221
```
222
223
### Table Function Wrappers
224
225
Wrappers for Hive UDTF (User Defined Table Function) implementations.
226
227
```java { .api }
228
/**
229
* Wrapper for Hive Generic UDTF functions
230
* Provides table-valued function capabilities
231
*/
232
public class HiveGenericUDTF extends TableFunction<Row> implements HiveFunction {
233
/**
234
* Create wrapper for Generic UDTF
235
* @param hiveFunctionWrapper - Wrapper for the Hive function
236
* @param hiveShim - Version-specific Hive compatibility shim
237
*/
238
public HiveGenericUDTF(HiveFunctionWrapper<GenericUDTF> hiveFunctionWrapper, HiveShim hiveShim);
239
240
/**
241
* Evaluate function and emit results
242
* @param args - Function arguments
243
*/
244
public void eval(Object... args);
245
246
/**
247
* Get result type information
248
* @param signature - Function signature
249
* @return Type information for result rows
250
*/
251
public TypeInformation<Row> getResultType(Class<?>[] signature);
252
}
253
```
254
255
### Object Conversion and Inspection
256
257
Classes for handling type conversion between Flink and Hive data types.
258
259
```java { .api }
260
/**
261
* Factory for creating Hive object conversion utilities
262
*/
263
public class HiveInspectors {
264
/**
265
* Get object inspector for Flink data type
266
* @param dataType - Flink data type
267
* @return Hive ObjectInspector for the type
268
*/
269
public static ObjectInspector getObjectInspector(DataType dataType);
270
271
/**
272
* Get primitive object inspector for Java class
273
* @param clazz - Java class
274
* @return PrimitiveObjectInspector for the class
275
*/
276
public static PrimitiveObjectInspector getPrimitiveJavaObjectInspector(Class<?> clazz);
277
}
278
279
/**
280
* Interface for converting between Hive and Flink object representations
281
*/
282
public interface HiveObjectConversion {
283
/**
284
* Convert Flink object to Hive representation
285
* @param flinkObject - Flink object to convert
286
* @return Hive-compatible object
287
*/
288
Object toHiveObject(Object flinkObject);
289
290
/**
291
* Convert Hive object to Flink representation
292
* @param hiveObject - Hive object to convert
293
* @return Flink-compatible object
294
*/
295
Object toFlinkObject(Object hiveObject);
296
}
297
298
/**
299
* Identity conversion that performs no transformation
300
*/
301
public class IdentityConversion implements HiveObjectConversion {
302
public Object toHiveObject(Object flinkObject);
303
public Object toFlinkObject(Object hiveObject);
304
}
305
```
306
307
### Function Definition Factory
308
309
Factory for creating Flink function definitions from Hive functions.
310
311
```java { .api }
312
/**
313
* Factory for creating FunctionDefinition from Hive functions
314
*/
315
public class HiveFunctionDefinitionFactory {
316
/**
317
* Create function definition from Hive function
318
* @param name - Function name
319
* @param functionInfo - Hive function information
320
* @param hiveShim - Version-specific compatibility shim
321
* @param classLoader - Class loader for function classes
322
* @return FunctionDefinition for use in Flink
323
*/
324
public static FunctionDefinition createFunctionDefinitionFromHiveFunction(
325
String name,
326
FunctionInfo functionInfo,
327
HiveShim hiveShim,
328
ClassLoader classLoader
329
);
330
}
331
```
332
333
**Usage Examples:**
334
335
```java
336
import org.apache.flink.table.api.TableEnvironment;
337
import org.apache.flink.table.module.hive.HiveModule;
338
import org.apache.flink.table.catalog.hive.HiveCatalog;
339
340
// Set up table environment with Hive module
341
TableEnvironment tableEnv = TableEnvironment.create(EnvironmentSettings.inBatchMode());
342
343
// Register Hive catalog
344
HiveCatalog hiveCatalog = new HiveCatalog("hive", "default", "/opt/hive/conf", null, "2.3.6");
345
tableEnv.registerCatalog("hive", hiveCatalog);
346
tableEnv.useCatalog("hive");
347
348
// Load Hive module to access Hive functions
349
HiveModule hiveModule = new HiveModule("2.3.6");
350
tableEnv.loadModule("hive", hiveModule);
351
352
// Use Hive built-in functions in SQL
353
Table result = tableEnv.sqlQuery(
354
"SELECT " +
355
" customer_id," +
356
" CONCAT(first_name, ' ', last_name) as full_name," + // Hive CONCAT function
357
" REGEXP_REPLACE(phone, '[^0-9]', '') as clean_phone," + // Hive REGEXP_REPLACE
358
" SIZE(order_items) as item_count," + // Hive SIZE function
359
" EXPLODE(order_items) as item " + // Hive EXPLODE UDTF
360
"FROM hive_catalog.customers.customer_orders"
361
);
362
363
result.execute().print();
364
```
365
366
```java
367
import org.apache.flink.table.functions.hive.HiveGenericUDF;
368
import org.apache.flink.table.functions.hive.HiveFunctionWrapper;
369
370
// Register custom Hive UDF in Flink
371
HiveFunctionWrapper<GenericUDF> wrapper = new HiveFunctionWrapper<>("com.example.MyCustomUDF");
372
HiveGenericUDF customUDF = new HiveGenericUDF(wrapper, hiveShim);
373
374
// Register the function in table environment
375
tableEnv.createTemporaryFunction("my_custom_udf", customUDF);
376
377
// Use the custom function in SQL
378
Table result = tableEnv.sqlQuery(
379
"SELECT customer_id, my_custom_udf(customer_data) as processed_data " +
380
"FROM hive_catalog.customers.raw_data"
381
);
382
```
383
384
```java
385
// List all available Hive functions
386
HiveModule hiveModule = new HiveModule("2.3.6");
387
Set<String> functions = hiveModule.listFunctions();
388
389
System.out.println("Available Hive functions:");
390
functions.stream()
391
.sorted()
392
.forEach(System.out::println);
393
394
// Get specific function definition
395
Optional<FunctionDefinition> concatDef = hiveModule.getFunctionDefinition("concat");
396
if (concatDef.isPresent()) {
397
System.out.println("Found CONCAT function: " + concatDef.get());
398
}
399
```
400
401
## Types
402
403
```java { .api }
404
public interface Module {
405
/**
406
* List all functions provided by this module
407
* @return Set of function names
408
*/
409
Set<String> listFunctions();
410
411
/**
412
* Get function definition by name
413
* @param name - Function name
414
* @return Optional function definition
415
*/
416
Optional<FunctionDefinition> getFunctionDefinition(String name);
417
}
418
419
public interface ModuleFactory extends Factory {
420
/**
421
* Create module from context
422
* @param context - Creation context
423
* @return Module instance
424
*/
425
Module createModule(Context context);
426
}
427
428
public abstract class ScalarFunction extends UserDefinedFunction {
429
/**
430
* Evaluation method for scalar functions
431
* @param args - Function arguments
432
* @return Function result
433
*/
434
public abstract Object eval(Object... args);
435
}
436
437
public abstract class AggregateFunction<T, ACC> extends UserDefinedFunction {
438
/**
439
* Create accumulator for aggregation
440
* @return New accumulator instance
441
*/
442
public abstract ACC createAccumulator();
443
444
/**
445
* Get final result from accumulator
446
* @param accumulator - Final accumulator state
447
* @return Aggregation result
448
*/
449
public abstract T getValue(ACC accumulator);
450
451
/**
452
* Accumulate input into accumulator
453
* @param accumulator - Current accumulator
454
* @param input - Input to accumulate
455
*/
456
public abstract void accumulate(ACC accumulator, Object... input);
457
}
458
459
public abstract class TableFunction<T> extends UserDefinedFunction {
460
/**
461
* Emit result rows from table function
462
* @param result - Result to emit
463
*/
464
protected void collect(T result);
465
466
/**
467
* Evaluation method for table functions
468
* @param args - Function arguments
469
*/
470
public abstract void eval(Object... args);
471
}
472
473
public class FlinkHiveUDFException extends RuntimeException {
474
public FlinkHiveUDFException(String message);
475
public FlinkHiveUDFException(String message, Throwable cause);
476
}
477
```