Complete support for Hive UDFs, UDAFs, and UDTFs with automatic registration and execution within Spark queries. This module provides seamless integration allowing Hive user-defined functions to work natively within Spark SQL expressions.
Support for simple Hive UDFs that extend org.apache.hadoop.hive.ql.exec.UDF.
/**
* Expression wrapper for simple Hive UDFs
* @param name UDF name for display purposes
* @param funcWrapper Wrapper containing the UDF class
* @param children Input expressions to the UDF
*/
case class HiveSimpleUDF(
name: String,
funcWrapper: HiveFunctionWrapper,
children: Seq[Expression]
) extends Expression with HiveInspectors with UserDefinedExpression {
/**
* Return data type of the UDF
*/
lazy val dataType: DataType
/**
* Evaluate the UDF with given input
* @param input Input row
* @return UDF result
*/
def eval(input: InternalRow): Any
/**
* Pretty name for display
*/
def prettyName: String
/**
* SQL representation of the UDF call
*/
def sql: String
}Usage Example:
import org.apache.spark.sql.hive.HiveFunctionWrapper
import org.apache.spark.sql.catalyst.expressions._
// Create wrapper for a Hive UDF class
val wrapper = HiveFunctionWrapper("com.example.MyHiveUDF")
// Create expression for the UDF
val udfExpr = HiveSimpleUDF(
name = "my_udf",
funcWrapper = wrapper,
children = Seq(Literal("input_string"))
)
// Use in expression evaluation
val result = udfExpr.eval(EmptyRow)
println(s"UDF result: $result")Support for generic Hive UDFs that extend org.apache.hadoop.hive.ql.udf.generic.GenericUDF.
/**
* Expression wrapper for generic Hive UDFs
* @param name UDF name for display purposes
* @param funcWrapper Wrapper containing the UDF class
* @param children Input expressions to the UDF
*/
case class HiveGenericUDF(
name: String,
funcWrapper: HiveFunctionWrapper,
children: Seq[Expression]
) extends Expression with HiveInspectors with UserDefinedExpression {
/**
* Return data type of the UDF
*/
lazy val dataType: DataType
/**
* Evaluate the UDF with given input
* @param input Input row
* @return UDF result
*/
def eval(input: InternalRow): Any
/**
* Pretty name for display
*/
def prettyName: String
/**
* SQL representation of the UDF call
*/
def sql: String
}Usage Example:
// Generic UDF for complex type handling
val genericWrapper = HiveFunctionWrapper("org.apache.hadoop.hive.ql.udf.generic.GenericUDFCase")
val genericUDF = HiveGenericUDF(
name = "case_when",
funcWrapper = genericWrapper,
children = Seq(
Literal(true),
Literal("true_value"),
Literal("false_value")
)
)
val result = genericUDF.eval(EmptyRow)Support for Hive UDTFs that generate multiple output rows from single input row.
/**
* Expression wrapper for Hive UDTFs
* @param name UDTF name for display purposes
* @param funcWrapper Wrapper containing the UDTF class
* @param children Input expressions to the UDTF
*/
case class HiveGenericUDTF(
name: String,
funcWrapper: HiveFunctionWrapper,
children: Seq[Expression]
) extends Generator with HiveInspectors with CodegenFallback with UserDefinedExpression {
/**
* Output schema of the UDTF
*/
lazy val elementSchema: StructType
/**
* Evaluate the UDTF and generate output rows
* @param input Input row
* @return Iterator of output rows
*/
def eval(input: InternalRow): IterableOnce[InternalRow]
/**
* Terminate the UDTF and return any final rows
* @return Iterator of final output rows
*/
def terminate(): IterableOnce[InternalRow]
/**
* Pretty name for display
*/
def prettyName: String
/**
* SQL representation of the UDTF call
*/
def sql: String
}Usage Example:
// UDTF that splits strings into multiple rows
val udtfWrapper = HiveFunctionWrapper("org.apache.hadoop.hive.ql.udf.generic.GenericUDTFExplode")
val udtf = HiveGenericUDTF(
name = "explode",
funcWrapper = udtfWrapper,
children = Seq(Literal.create(Array("a", "b", "c"), ArrayType(StringType)))
)
// Evaluate UDTF
val outputRows = udtf.eval(EmptyRow).toSeq
println(s"UDTF generated ${outputRows.length} rows")
// Terminate to get any final rows
val finalRows = udtf.terminate().toSeqSupport for Hive UDAFs that perform custom aggregation operations.
/**
* Expression wrapper for Hive UDAFs
* @param name UDAF name for display purposes
* @param funcWrapper Wrapper containing the UDAF class
* @param children Input expressions to the UDAF
* @param isUDAFBridgeRequired Whether UDAF bridge is needed
* @param mutableAggBufferOffset Offset in mutable aggregation buffer
* @param inputAggBufferOffset Offset in input aggregation buffer
*/
case class HiveUDAFFunction(
name: String,
funcWrapper: HiveFunctionWrapper,
children: Seq[Expression],
isUDAFBridgeRequired: Boolean,
mutableAggBufferOffset: Int,
inputAggBufferOffset: Int
) extends TypedImperativeAggregate[HiveUDAFBuffer] with HiveInspectors with UserDefinedExpression {
/**
* Create a new aggregation buffer
* @return New UDAF buffer
*/
def createAggregationBuffer(): HiveUDAFBuffer
/**
* Update aggregation buffer with new input
* @param buffer Current aggregation buffer
* @param input Input row
* @return Updated buffer
*/
def update(buffer: HiveUDAFBuffer, input: InternalRow): HiveUDAFBuffer
/**
* Merge two aggregation buffers
* @param buffer Target buffer
* @param input Source buffer to merge
* @return Merged buffer
*/
def merge(buffer: HiveUDAFBuffer, input: HiveUDAFBuffer): HiveUDAFBuffer
/**
* Get final result from aggregation buffer
* @param buffer Final aggregation buffer
* @return Aggregation result
*/
def eval(buffer: HiveUDAFBuffer): Any
/**
* Serialize aggregation buffer
* @param buffer Buffer to serialize
* @return Serialized buffer data
*/
def serialize(buffer: HiveUDAFBuffer): Array[Byte]
/**
* Deserialize aggregation buffer
* @param bytes Serialized buffer data
* @return Deserialized buffer
*/
def deserialize(bytes: Array[Byte]): HiveUDAFBuffer
/**
* Return data type of the aggregation result
*/
lazy val dataType: DataType
/**
* Pretty name for display
*/
def prettyName: String
/**
* SQL representation of the UDAF call
*/
def sql: String
}Usage Example:
// Custom aggregation function
val udafWrapper = HiveFunctionWrapper("com.example.MyHiveUDAF")
val udaf = HiveUDAFFunction(
name = "my_aggregate",
funcWrapper = udafWrapper,
children = Seq(col("value")),
isUDAFBridgeRequired = false,
mutableAggBufferOffset = 0,
inputAggBufferOffset = 0
)
// Use in aggregation context
val buffer = udaf.createAggregationBuffer()
val updatedBuffer = udaf.update(buffer, inputRow)
val result = udaf.eval(updatedBuffer)Buffer type for managing UDAF aggregation state.
/**
* Buffer for Hive UDAF operations
* @param buf Hive aggregation buffer
* @param canDoMerge Whether buffer supports merge operations
*/
case class HiveUDAFBuffer(buf: AggregationBuffer, canDoMerge: Boolean)Usage Example:
import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator.AggregationBuffer
// Create UDAF buffer
val hiveBuffer: AggregationBuffer = // created by Hive UDAF
val buffer = HiveUDAFBuffer(hiveBuffer, canDoMerge = true)
// Use in UDAF operations
val serialized = udaf.serialize(buffer)
val deserialized = udaf.deserialize(serialized)Wrapper class for Hive function classes.
/**
* Wrapper for Hive function classes
* @param functionClassName Fully qualified class name of the Hive function
*/
case class HiveFunctionWrapper(functionClassName: String) {
/**
* Create instance of the wrapped function
* @return Instance of the Hive function
*/
def createFunction[T]: T
/**
* Get the function class
* @return Class object for the function
*/
def functionClass: Class[_]
}Usage Example:
// Create wrapper
val wrapper = HiveFunctionWrapper("org.apache.hadoop.hive.ql.udf.UDFLength")
// Create function instance
val udfInstance = wrapper.createFunction[UDF]
// Get function class for reflection
val functionClass = wrapper.functionClass
println(s"Function class: ${functionClass.getName}")Integration with Spark SQL for automatic UDF registration.
// Example of registering Hive UDF in Spark session
def registerHiveUDF(
spark: SparkSession,
name: String,
className: String
): Unit = {
val wrapper = HiveFunctionWrapper(className)
// Register as Spark UDF
spark.udf.register(name, (inputs: Seq[Any]) => {
val udf = HiveSimpleUDF(name, wrapper, inputs.map(Literal(_)))
udf.eval(EmptyRow)
})
}Usage Example:
import org.apache.spark.sql.SparkSession
val spark = SparkSession.builder()
.appName("HiveUDFExample")
.enableHiveSupport()
.getOrCreate()
// Register custom Hive UDF
registerHiveUDF(spark, "my_length", "com.example.MyLengthUDF")
// Use in SQL
spark.sql("SELECT my_length(name) FROM users").show()
// Use in DataFrame API
import spark.implicits._
val df = Seq("hello", "world").toDF("text")
df.select(callUDF("my_length", $"text")).show()Support for complex UDF scenarios.
// UDF with complex input/output types
def createComplexUDF(
name: String,
className: String,
inputTypes: Seq[DataType],
outputType: DataType
): HiveGenericUDF = {
val wrapper = HiveFunctionWrapper(className)
val children = inputTypes.zipWithIndex.map { case (dataType, index) =>
BoundReference(index, dataType, nullable = true)
}
HiveGenericUDF(name, wrapper, children)
}
// UDTF with multiple output columns
def createMultiColumnUDTF(
name: String,
className: String,
inputExpression: Expression,
outputSchema: StructType
): HiveGenericUDTF = {
val wrapper = HiveFunctionWrapper(className)
HiveGenericUDTF(name, wrapper, Seq(inputExpression))
}Usage Example:
import org.apache.spark.sql.types._
// Complex UDF with struct input/output
val complexUDF = createComplexUDF(
name = "process_struct",
className = "com.example.StructProcessorUDF",
inputTypes = Seq(StructType(Seq(
StructField("id", IntegerType, false),
StructField("name", StringType, true)
))),
outputType = StringType
)
// Multi-column UDTF
val outputSchema = StructType(Seq(
StructField("key", StringType, false),
StructField("value", StringType, true)
))
val multiColUDTF = createMultiColumnUDTF(
name = "split_pairs",
className = "com.example.SplitPairsUDTF",
inputExpression = Literal("key1:value1,key2:value2"),
outputSchema = outputSchema
)Common error patterns in UDF execution.
// Handle UDF execution errors
def safeEvaluateUDF(udf: HiveSimpleUDF, input: InternalRow): Option[Any] = {
try {
Some(udf.eval(input))
} catch {
case _: UDFArgumentException =>
println(s"Invalid arguments for UDF ${udf.name}")
None
case _: HiveException =>
println(s"Hive execution error in UDF ${udf.name}")
None
case e: Exception =>
println(s"Unexpected error in UDF ${udf.name}: ${e.getMessage}")
None
}
}Optimization tips for UDF usage.
// Cache UDF instances for repeated use
class UDFCache {
private val cache = mutable.Map[String, HiveFunctionWrapper]()
def getOrCreateWrapper(className: String): HiveFunctionWrapper = {
cache.getOrElseUpdate(className, HiveFunctionWrapper(className))
}
}
// Batch UDF evaluation
def batchEvaluateUDF(
udf: HiveSimpleUDF,
inputs: Seq[InternalRow]
): Seq[Any] = {
// Prepare UDF once
val preparedUDF = udf // UDF preparation happens lazily
// Evaluate for all inputs
inputs.map(preparedUDF.eval)
}Usage Example:
val cache = new UDFCache()
// Reuse wrapper for multiple UDFs of same class
val wrapper1 = cache.getOrCreateWrapper("com.example.MyUDF")
val wrapper2 = cache.getOrCreateWrapper("com.example.MyUDF")
assert(wrapper1 eq wrapper2) // Same instance
// Batch evaluation
val inputs = Seq(
InternalRow(UTF8String.fromString("hello")),
InternalRow(UTF8String.fromString("world"))
)
val udf = HiveSimpleUDF("length", wrapper1, Seq(BoundReference(0, StringType, true)))
val results = batchEvaluateUDF(udf, inputs)
println(s"Batch results: ${results.mkString(", ")}")