Apache Flink SQL connector for Apache Hive 2.3.6 with Scala 2.11 binary compatibility
—
Access to Hive built-in functions including UDF, UDAF, and UDTF through the HiveModule system with version-specific compatibility. Enables seamless usage of Hive's extensive function library within Flink SQL queries.
Main module providing access to Hive built-in functions with version compatibility.
/**
* Module providing Hive built-in metadata and functions
* Enables access to Hive UDF, UDAF, and UDTF functions
*/
public class HiveModule implements Module {
/**
* Create HiveModule for specific Hive version
* @param hiveVersion - Hive version string (e.g., "2.3.6")
*/
public HiveModule(String hiveVersion);
/**
* Create HiveModule with default latest supported version
*/
public HiveModule();
/**
* List all available functions in this module
* @return Set of function names
*/
public Set<String> listFunctions();
/**
* Get function definition by name
* @param name - Function name to look up
* @return Optional FunctionDefinition if function exists
*/
public Optional<FunctionDefinition> getFunctionDefinition(String name);
}Factory for creating HiveModule instances through service discovery.
/**
* Factory for creating HiveModule instances
* Used by Flink's module loading system
*/
public class HiveModuleFactory implements ModuleFactory {
/**
* Get the factory identifier
* @return "hive" identifier string
*/
public String factoryIdentifier();
/**
* Create HiveModule instance from context
* @param context - Module creation context with options
* @return New HiveModule instance
*/
public Module createModule(Context context);
/**
* Get required configuration options
* @return Set of required ConfigOption objects
*/
public Set<ConfigOption<?>> requiredOptions();
/**
* Get optional configuration options
* @return Set of optional ConfigOption objects
*/
public Set<ConfigOption<?>> optionalOptions();
}Base interfaces and classes for wrapping Hive functions.
/**
* Base interface for all Hive function wrappers
* Marker interface to identify Hive-originated functions
*/
public interface HiveFunction {
// Marker interface - no methods
}
/**
* Wrapper for Hive function implementations
* Provides access to underlying Hive function instance
*/
public class HiveFunctionWrapper<UDFType> implements Serializable {
/**
* Create wrapper for Hive function
* @param className - Hive function class name
*/
public HiveFunctionWrapper(String className);
/**
* Create instance of the wrapped function
* @return New instance of the Hive function
*/
public UDFType createFunction();
/**
* Get the class name of the wrapped function
* @return Fully qualified class name
*/
public String getClassName();
}Wrappers for Hive UDF (User Defined Function) implementations.
/**
* Wrapper for Hive Generic UDF functions
* Handles complex types and object inspection
*/
public class HiveGenericUDF extends ScalarFunction implements HiveFunction {
/**
* Create wrapper for Generic UDF
* @param hiveFunctionWrapper - Wrapper for the Hive function
* @param hiveShim - Version-specific Hive compatibility shim
*/
public HiveGenericUDF(HiveFunctionWrapper<GenericUDF> hiveFunctionWrapper, HiveShim hiveShim);
/**
* Evaluate function with given arguments
* @param arguments - Function arguments
* @return Function result
*/
public Object eval(Object... arguments);
/**
* Get result type information
* @param signature - Function signature
* @return Type information for result
*/
public TypeInformation<?> getResultType(Class<?>[] signature);
}
/**
* Wrapper for Hive Simple UDF functions
* Handles primitive types and simple objects
*/
public class HiveSimpleUDF extends ScalarFunction implements HiveFunction {
/**
* Create wrapper for Simple UDF
* @param hiveFunctionWrapper - Wrapper for the Hive function
* @param hiveShim - Version-specific Hive compatibility shim
*/
public HiveSimpleUDF(HiveFunctionWrapper<UDF> hiveFunctionWrapper, HiveShim hiveShim);
/**
* Evaluate function with given arguments
* @param arguments - Function arguments
* @return Function result
*/
public Object eval(Object... arguments);
}Wrappers for Hive UDAF (User Defined Aggregate Function) implementations.
/**
* Wrapper for Hive Generic UDAF functions
* Provides aggregation capabilities with accumulators
*/
public class HiveGenericUDAF extends AggregateFunction<Object, GenericUDAFEvaluator.AggregationBuffer> implements HiveFunction {
/**
* Create wrapper for Generic UDAF
* @param funcWrapper - Wrapper for the Hive function
* @param hiveShim - Version-specific Hive compatibility shim
*/
public HiveGenericUDAF(HiveFunctionWrapper<GenericUDAF> funcWrapper, HiveShim hiveShim);
/**
* Create accumulator for aggregation
* @return New accumulator instance
*/
public GenericUDAFEvaluator.AggregationBuffer createAccumulator();
/**
* Get final result from accumulator
* @param accumulator - Accumulator with aggregated state
* @return Final aggregation result
*/
public Object getValue(GenericUDAFEvaluator.AggregationBuffer accumulator);
/**
* Accumulate value into aggregator
* @param accumulator - Current accumulator
* @param input - Input value to accumulate
*/
public void accumulate(GenericUDAFEvaluator.AggregationBuffer accumulator, Object... input);
/**
* Retract value from aggregator (for streaming)
* @param accumulator - Current accumulator
* @param input - Input value to retract
*/
public void retract(GenericUDAFEvaluator.AggregationBuffer accumulator, Object... input);
/**
* Merge two accumulators
* @param accumulator - Target accumulator
* @param iterable - Accumulators to merge
*/
public void merge(GenericUDAFEvaluator.AggregationBuffer accumulator, Iterable<GenericUDAFEvaluator.AggregationBuffer> iterable);
}Wrappers for Hive UDTF (User Defined Table Function) implementations.
/**
* Wrapper for Hive Generic UDTF functions
* Provides table-valued function capabilities
*/
public class HiveGenericUDTF extends TableFunction<Row> implements HiveFunction {
/**
* Create wrapper for Generic UDTF
* @param hiveFunctionWrapper - Wrapper for the Hive function
* @param hiveShim - Version-specific Hive compatibility shim
*/
public HiveGenericUDTF(HiveFunctionWrapper<GenericUDTF> hiveFunctionWrapper, HiveShim hiveShim);
/**
* Evaluate function and emit results
* @param args - Function arguments
*/
public void eval(Object... args);
/**
* Get result type information
* @param signature - Function signature
* @return Type information for result rows
*/
public TypeInformation<Row> getResultType(Class<?>[] signature);
}Classes for handling type conversion between Flink and Hive data types.
/**
* Factory for creating Hive object conversion utilities
*/
public class HiveInspectors {
/**
* Get object inspector for Flink data type
* @param dataType - Flink data type
* @return Hive ObjectInspector for the type
*/
public static ObjectInspector getObjectInspector(DataType dataType);
/**
* Get primitive object inspector for Java class
* @param clazz - Java class
* @return PrimitiveObjectInspector for the class
*/
public static PrimitiveObjectInspector getPrimitiveJavaObjectInspector(Class<?> clazz);
}
/**
* Interface for converting between Hive and Flink object representations
*/
public interface HiveObjectConversion {
/**
* Convert Flink object to Hive representation
* @param flinkObject - Flink object to convert
* @return Hive-compatible object
*/
Object toHiveObject(Object flinkObject);
/**
* Convert Hive object to Flink representation
* @param hiveObject - Hive object to convert
* @return Flink-compatible object
*/
Object toFlinkObject(Object hiveObject);
}
/**
* Identity conversion that performs no transformation
*/
public class IdentityConversion implements HiveObjectConversion {
public Object toHiveObject(Object flinkObject);
public Object toFlinkObject(Object hiveObject);
}Factory for creating Flink function definitions from Hive functions.
/**
* Factory for creating FunctionDefinition from Hive functions
*/
public class HiveFunctionDefinitionFactory {
/**
* Create function definition from Hive function
* @param name - Function name
* @param functionInfo - Hive function information
* @param hiveShim - Version-specific compatibility shim
* @param classLoader - Class loader for function classes
* @return FunctionDefinition for use in Flink
*/
public static FunctionDefinition createFunctionDefinitionFromHiveFunction(
String name,
FunctionInfo functionInfo,
HiveShim hiveShim,
ClassLoader classLoader
);
}Usage Examples:
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.module.hive.HiveModule;
import org.apache.flink.table.catalog.hive.HiveCatalog;
// Set up table environment with Hive module
TableEnvironment tableEnv = TableEnvironment.create(EnvironmentSettings.inBatchMode());
// Register Hive catalog
HiveCatalog hiveCatalog = new HiveCatalog("hive", "default", "/opt/hive/conf", null, "2.3.6");
tableEnv.registerCatalog("hive", hiveCatalog);
tableEnv.useCatalog("hive");
// Load Hive module to access Hive functions
HiveModule hiveModule = new HiveModule("2.3.6");
tableEnv.loadModule("hive", hiveModule);
// Use Hive built-in functions in SQL
Table result = tableEnv.sqlQuery(
"SELECT " +
" customer_id," +
" CONCAT(first_name, ' ', last_name) as full_name," + // Hive CONCAT function
" REGEXP_REPLACE(phone, '[^0-9]', '') as clean_phone," + // Hive REGEXP_REPLACE
" SIZE(order_items) as item_count," + // Hive SIZE function
" EXPLODE(order_items) as item " + // Hive EXPLODE UDTF
"FROM hive_catalog.customers.customer_orders"
);
result.execute().print();import org.apache.flink.table.functions.hive.HiveGenericUDF;
import org.apache.flink.table.functions.hive.HiveFunctionWrapper;
// Register custom Hive UDF in Flink
HiveFunctionWrapper<GenericUDF> wrapper = new HiveFunctionWrapper<>("com.example.MyCustomUDF");
HiveGenericUDF customUDF = new HiveGenericUDF(wrapper, hiveShim);
// Register the function in table environment
tableEnv.createTemporaryFunction("my_custom_udf", customUDF);
// Use the custom function in SQL
Table result = tableEnv.sqlQuery(
"SELECT customer_id, my_custom_udf(customer_data) as processed_data " +
"FROM hive_catalog.customers.raw_data"
);// List all available Hive functions
HiveModule hiveModule = new HiveModule("2.3.6");
Set<String> functions = hiveModule.listFunctions();
System.out.println("Available Hive functions:");
functions.stream()
.sorted()
.forEach(System.out::println);
// Get specific function definition
Optional<FunctionDefinition> concatDef = hiveModule.getFunctionDefinition("concat");
if (concatDef.isPresent()) {
System.out.println("Found CONCAT function: " + concatDef.get());
}public interface Module {
/**
* List all functions provided by this module
* @return Set of function names
*/
Set<String> listFunctions();
/**
* Get function definition by name
* @param name - Function name
* @return Optional function definition
*/
Optional<FunctionDefinition> getFunctionDefinition(String name);
}
public interface ModuleFactory extends Factory {
/**
* Create module from context
* @param context - Creation context
* @return Module instance
*/
Module createModule(Context context);
}
public abstract class ScalarFunction extends UserDefinedFunction {
/**
* Evaluation method for scalar functions
* @param args - Function arguments
* @return Function result
*/
public abstract Object eval(Object... args);
}
public abstract class AggregateFunction<T, ACC> extends UserDefinedFunction {
/**
* Create accumulator for aggregation
* @return New accumulator instance
*/
public abstract ACC createAccumulator();
/**
* Get final result from accumulator
* @param accumulator - Final accumulator state
* @return Aggregation result
*/
public abstract T getValue(ACC accumulator);
/**
* Accumulate input into accumulator
* @param accumulator - Current accumulator
* @param input - Input to accumulate
*/
public abstract void accumulate(ACC accumulator, Object... input);
}
public abstract class TableFunction<T> extends UserDefinedFunction {
/**
* Emit result rows from table function
* @param result - Result to emit
*/
protected void collect(T result);
/**
* Evaluation method for table functions
* @param args - Function arguments
*/
public abstract void eval(Object... args);
}
public class FlinkHiveUDFException extends RuntimeException {
public FlinkHiveUDFException(String message);
public FlinkHiveUDFException(String message, Throwable cause);
}Install with Tessl CLI
npx tessl i tessl/maven-org-apache-flink--flink-sql-connector-hive-2-3-6-2-11