Apache Spark Hive integration provides comprehensive support for executing Hive User-Defined Functions (UDFs), User-Defined Aggregate Functions (UDAFs), and User-Defined Table-Generating Functions (UDTFs) within Spark SQL queries.
The UDF integration system allows Spark to execute existing Hive UDFs without modification, providing seamless compatibility with existing Hive-based data processing pipelines. Spark wraps Hive UDFs in specialized expression classes that handle the translation between Spark's internal row format and Hive's object format.
Wrapper for Hive simple UDFs that extend org.apache.hadoop.hive.ql.exec.UDF.
case class HiveSimpleUDF(
name: String,
funcWrapper: HiveFunctionWrapper,
children: Seq[Expression]
) extends Expression with CodegenFallback with Logging {
def eval(input: InternalRow): Any
def dataType: DataType
def nullable: Boolean
def prettyName: String = name
override def toString: String
}import org.apache.spark.sql.SparkSession
val spark = SparkSession.builder()
.enableHiveSupport()
.getOrCreate()
// Register a simple Hive UDF
spark.sql("""
CREATE TEMPORARY FUNCTION my_upper AS 'com.example.UpperCaseUDF'
""")
// Use the UDF in queries
val result = spark.sql("""
SELECT my_upper(name) as upper_name
FROM employee
""")
result.show()To create a Hive UDF that works with Spark:
package com.example;
import org.apache.hadoop.hive.ql.exec.UDF;
import org.apache.hadoop.io.Text;
public class UpperCaseUDF extends UDF {
public Text evaluate(Text input) {
if (input == null) return null;
return new Text(input.toString().toUpperCase());
}
}Wrapper for Hive generic UDFs that extend org.apache.hadoop.hive.ql.udf.generic.GenericUDF.
case class HiveGenericUDF(
name: String,
funcWrapper: HiveFunctionWrapper,
children: Seq[Expression]
) extends Expression with CodegenFallback with Logging {
def eval(input: InternalRow): Any
def dataType: DataType
def nullable: Boolean
def prettyName: String = name
def foldable: Boolean
override def toString: String
}// Register a generic UDF
spark.sql("""
CREATE TEMPORARY FUNCTION json_extract AS 'org.apache.hadoop.hive.ql.udf.generic.GenericUDFJsonExtract'
""")
// Use the generic UDF
val result = spark.sql("""
SELECT json_extract(json_column, '$.name') as extracted_name
FROM json_table
""")Generic UDFs provide more flexibility than simple UDFs:
Wrapper for Hive User-Defined Table-Generating Functions that produce multiple rows from a single input row.
case class HiveGenericUDTF(
name: String,
funcWrapper: HiveFunctionWrapper,
children: Seq[Expression]
) extends Generator with CodegenFallback with Logging {
def eval(input: InternalRow): TraversableOnce[InternalRow]
def terminate(): TraversableOnce[InternalRow]
def dataType: DataType
def nullable: Boolean
def prettyName: String = name
override def toString: String
}// Register a UDTF (e.g., explode-like function)
spark.sql("""
CREATE TEMPORARY FUNCTION split_words AS 'com.example.SplitWordsUDTF'
""")
// Use UDTF in LATERAL VIEW
val result = spark.sql("""
SELECT word
FROM sentences
LATERAL VIEW split_words(text) exploded_table AS word
""")UDTFs follow a specific lifecycle:
Wrapper for Hive User-Defined Aggregate Functions that perform aggregation operations.
case class HiveUDAFFunction(
name: String,
funcWrapper: HiveFunctionWrapper,
children: Seq[Expression],
isUDAFBridgeRequired: Boolean = false,
mutableAggBufferOffset: Int = 0,
inputAggBufferOffset: Int = 0
) extends TypedImperativeAggregate[GenericUDAFEvaluator.AggregationBuffer] with Logging {
def createAggregationBuffer(): AggregationBuffer
def update(buffer: AggregationBuffer, input: InternalRow): AggregationBuffer
def merge(buffer: AggregationBuffer, input: AggregationBuffer): AggregationBuffer
def eval(buffer: AggregationBuffer): Any
def serialize(buffer: AggregationBuffer): Array[Byte]
def deserialize(storageFormat: Array[Byte]): AggregationBuffer
def prettyName: String = name
}// Register a UDAF
spark.sql("""
CREATE TEMPORARY FUNCTION my_avg AS 'com.example.AverageUDAF'
""")
// Use UDAF in aggregation query
val result = spark.sql("""
SELECT department, my_avg(salary) as avg_salary
FROM employee
GROUP BY department
""")UDAFs implement a distributed aggregation process:
Core wrapper class that loads and manages Hive function instances.
case class HiveFunctionWrapper(
className: String,
instance: AnyRef
) extends Serializable {
def createFunction[T](): T
def getMethodName(): String
def getParameterTypes(): Array[Class[_]]
}// Create function wrapper
val wrapper = HiveFunctionWrapper("com.example.MyUDF", null)
// Create function instance
val udfInstance = wrapper.createFunction[UDF]()Register functions for the current session:
// Register UDF from JAR
spark.sql("""
CREATE TEMPORARY FUNCTION my_func AS 'com.example.MyFunction'
""")
// Register with JAR location
spark.sql("""
CREATE FUNCTION my_func AS 'com.example.MyFunction'
USING JAR '/path/to/udf.jar'
""")Register functions in Hive metastore:
// Create permanent function
spark.sql("""
CREATE FUNCTION my_database.my_func AS 'com.example.MyFunction'
USING JAR 'hdfs://path/to/udf.jar'
""")
// Use permanent function
val result = spark.sql("""
SELECT my_database.my_func(column) FROM table
""")List and inspect registered functions:
// List all functions
spark.sql("SHOW FUNCTIONS").show()
// List functions matching pattern
spark.sql("SHOW FUNCTIONS LIKE 'my_*'").show()
// Describe function
spark.sql("DESCRIBE FUNCTION my_func").show()
// Show extended info
spark.sql("DESCRIBE FUNCTION EXTENDED my_func").show()Mapping between Hive and Spark data types in UDF integration:
// Java UDF handling complex types
public class ComplexUDF extends GenericUDF {
@Override
public Object evaluate(DeferredObject[] arguments) throws HiveException {
// Handle array input
ListObjectInspector listOI = (ListObjectInspector) arguments[0].get();
List<?> inputList = listOI.getList(arguments[0].get());
// Process and return result
return result;
}
}public class OptimizedUDF extends GenericUDF {
private Text result = new Text(); // Reuse object
@Override
public Object evaluate(DeferredObject[] arguments) throws HiveException {
String input = arguments[0].get().toString();
result.set(processString(input)); // Reuse Text object
return result;
}
}ClassNotFoundException: UDF class not found
// Solution: Add JAR to classpath
spark.sql("ADD JAR '/path/to/udf.jar'")Method Not Found: Incorrect UDF method signature
// Ensure UDF implements correct evaluate() method
public Object evaluate(DeferredObject[] args) throws HiveExceptionSerialization Issues: UDF not serializable for distributed execution
// Make UDF implement Serializable or use transient fields
public class MyUDF extends GenericUDF implements Serializablepublic class SafeUDF extends GenericUDF {
@Override
public Object evaluate(DeferredObject[] arguments) throws HiveException {
try {
// UDF logic
return processInput(arguments[0].get());
} catch (Exception e) {
// Handle errors gracefully
LOG.warn("UDF error: " + e.getMessage());
return null; // or appropriate default value
}
}
}import org.apache.spark.sql.test.SharedSparkSession
class UDFIntegrationSuite extends SparkFunSuite with SharedSparkSession {
test("custom UDF execution") {
spark.sql("CREATE TEMPORARY FUNCTION test_udf AS 'com.example.TestUDF'")
val result = spark.sql("SELECT test_udf('input') as output").collect()
assert(result(0).getString(0) == "expected_output")
}
}// Test with actual Hive UDFs
class HiveUDFSuite extends SparkFunSuite with SharedSparkSession {
test("hive builtin UDF") {
val result = spark.sql("SELECT upper('hello') as upper_hello").collect()
assert(result(0).getString(0) == "HELLO")
}
}Most Hive UDFs work without modification in Spark:
UDF compatibility across Hive versions:
// Base expression interface
trait Expression extends TreeNode[Expression] {
def dataType: DataType
def nullable: Boolean
def eval(input: InternalRow): Any
def prettyName: String
}
// Generator for table-generating functions
trait Generator extends Expression {
def eval(input: InternalRow): TraversableOnce[InternalRow]
def terminate(): TraversableOnce[InternalRow]
}
// Aggregate function interface
trait TypedImperativeAggregate[T] extends ImperativeAggregate {
def createAggregationBuffer(): T
def update(buffer: T, input: InternalRow): T
def merge(buffer: T, input: T): T
def eval(buffer: T): Any
}
// Hive UDAF aggregation buffer
trait AggregationBuffer {
def reset(): Unit
def copy(): AggregationBuffer
}
// Object inspector for Hive type system
trait ObjectInspector {
def getCategory(): ObjectInspector.Category
def getTypeName(): String
}
// Function wrapper for Hive functions
case class HiveFunctionWrapper(className: String, instance: AnyRef) extends Serializable {
def createFunction[T](): T
}
// Internal row representation
trait InternalRow {
def numFields: Int
def get(ordinal: Int, dataType: DataType): Any
def isNullAt(ordinal: Int): Boolean
}