Apache Spark Hive integration module that provides seamless integration with Apache Hive data warehouse software, enabling Spark SQL to work with Hive tables, metastore, and SerDes
Complete support for Hive UDFs, UDAFs, and UDTFs with automatic registration and execution within Spark queries. This module provides seamless integration allowing Hive user-defined functions to work natively within Spark SQL expressions.
Support for simple Hive UDFs that extend org.apache.hadoop.hive.ql.exec.UDF.
/**
* Expression wrapper for simple Hive UDFs
* @param name UDF name for display purposes
* @param funcWrapper Wrapper containing the UDF class
* @param children Input expressions to the UDF
*/
case class HiveSimpleUDF(
name: String,
funcWrapper: HiveFunctionWrapper,
children: Seq[Expression]
) extends Expression with HiveInspectors with UserDefinedExpression {
/**
* Return data type of the UDF
*/
lazy val dataType: DataType
/**
* Evaluate the UDF with given input
* @param input Input row
* @return UDF result
*/
def eval(input: InternalRow): Any
/**
* Pretty name for display
*/
def prettyName: String
/**
* SQL representation of the UDF call
*/
def sql: String
}Usage Example:
import org.apache.spark.sql.hive.HiveFunctionWrapper
import org.apache.spark.sql.catalyst.expressions._
// Create wrapper for a Hive UDF class
val wrapper = HiveFunctionWrapper("com.example.MyHiveUDF")
// Create expression for the UDF
val udfExpr = HiveSimpleUDF(
name = "my_udf",
funcWrapper = wrapper,
children = Seq(Literal("input_string"))
)
// Use in expression evaluation
val result = udfExpr.eval(EmptyRow)
println(s"UDF result: $result")Support for generic Hive UDFs that extend org.apache.hadoop.hive.ql.udf.generic.GenericUDF.
/**
* Expression wrapper for generic Hive UDFs
* @param name UDF name for display purposes
* @param funcWrapper Wrapper containing the UDF class
* @param children Input expressions to the UDF
*/
case class HiveGenericUDF(
name: String,
funcWrapper: HiveFunctionWrapper,
children: Seq[Expression]
) extends Expression with HiveInspectors with UserDefinedExpression {
/**
* Return data type of the UDF
*/
lazy val dataType: DataType
/**
* Evaluate the UDF with given input
* @param input Input row
* @return UDF result
*/
def eval(input: InternalRow): Any
/**
* Pretty name for display
*/
def prettyName: String
/**
* SQL representation of the UDF call
*/
def sql: String
}Usage Example:
// Generic UDF for complex type handling
val genericWrapper = HiveFunctionWrapper("org.apache.hadoop.hive.ql.udf.generic.GenericUDFCase")
val genericUDF = HiveGenericUDF(
name = "case_when",
funcWrapper = genericWrapper,
children = Seq(
Literal(true),
Literal("true_value"),
Literal("false_value")
)
)
val result = genericUDF.eval(EmptyRow)Support for Hive UDTFs that generate multiple output rows from single input row.
/**
* Expression wrapper for Hive UDTFs
* @param name UDTF name for display purposes
* @param funcWrapper Wrapper containing the UDTF class
* @param children Input expressions to the UDTF
*/
case class HiveGenericUDTF(
name: String,
funcWrapper: HiveFunctionWrapper,
children: Seq[Expression]
) extends Generator with HiveInspectors with CodegenFallback with UserDefinedExpression {
/**
* Output schema of the UDTF
*/
lazy val elementSchema: StructType
/**
* Evaluate the UDTF and generate output rows
* @param input Input row
* @return Iterator of output rows
*/
def eval(input: InternalRow): IterableOnce[InternalRow]
/**
* Terminate the UDTF and return any final rows
* @return Iterator of final output rows
*/
def terminate(): IterableOnce[InternalRow]
/**
* Pretty name for display
*/
def prettyName: String
/**
* SQL representation of the UDTF call
*/
def sql: String
}Usage Example:
// UDTF that splits strings into multiple rows
val udtfWrapper = HiveFunctionWrapper("org.apache.hadoop.hive.ql.udf.generic.GenericUDTFExplode")
val udtf = HiveGenericUDTF(
name = "explode",
funcWrapper = udtfWrapper,
children = Seq(Literal.create(Array("a", "b", "c"), ArrayType(StringType)))
)
// Evaluate UDTF
val outputRows = udtf.eval(EmptyRow).toSeq
println(s"UDTF generated ${outputRows.length} rows")
// Terminate to get any final rows
val finalRows = udtf.terminate().toSeqSupport for Hive UDAFs that perform custom aggregation operations.
/**
* Expression wrapper for Hive UDAFs
* @param name UDAF name for display purposes
* @param funcWrapper Wrapper containing the UDAF class
* @param children Input expressions to the UDAF
* @param isUDAFBridgeRequired Whether UDAF bridge is needed
* @param mutableAggBufferOffset Offset in mutable aggregation buffer
* @param inputAggBufferOffset Offset in input aggregation buffer
*/
case class HiveUDAFFunction(
name: String,
funcWrapper: HiveFunctionWrapper,
children: Seq[Expression],
isUDAFBridgeRequired: Boolean,
mutableAggBufferOffset: Int,
inputAggBufferOffset: Int
) extends TypedImperativeAggregate[HiveUDAFBuffer] with HiveInspectors with UserDefinedExpression {
/**
* Create a new aggregation buffer
* @return New UDAF buffer
*/
def createAggregationBuffer(): HiveUDAFBuffer
/**
* Update aggregation buffer with new input
* @param buffer Current aggregation buffer
* @param input Input row
* @return Updated buffer
*/
def update(buffer: HiveUDAFBuffer, input: InternalRow): HiveUDAFBuffer
/**
* Merge two aggregation buffers
* @param buffer Target buffer
* @param input Source buffer to merge
* @return Merged buffer
*/
def merge(buffer: HiveUDAFBuffer, input: HiveUDAFBuffer): HiveUDAFBuffer
/**
* Get final result from aggregation buffer
* @param buffer Final aggregation buffer
* @return Aggregation result
*/
def eval(buffer: HiveUDAFBuffer): Any
/**
* Serialize aggregation buffer
* @param buffer Buffer to serialize
* @return Serialized buffer data
*/
def serialize(buffer: HiveUDAFBuffer): Array[Byte]
/**
* Deserialize aggregation buffer
* @param bytes Serialized buffer data
* @return Deserialized buffer
*/
def deserialize(bytes: Array[Byte]): HiveUDAFBuffer
/**
* Return data type of the aggregation result
*/
lazy val dataType: DataType
/**
* Pretty name for display
*/
def prettyName: String
/**
* SQL representation of the UDAF call
*/
def sql: String
}Usage Example:
// Custom aggregation function
val udafWrapper = HiveFunctionWrapper("com.example.MyHiveUDAF")
val udaf = HiveUDAFFunction(
name = "my_aggregate",
funcWrapper = udafWrapper,
children = Seq(col("value")),
isUDAFBridgeRequired = false,
mutableAggBufferOffset = 0,
inputAggBufferOffset = 0
)
// Use in aggregation context
val buffer = udaf.createAggregationBuffer()
val updatedBuffer = udaf.update(buffer, inputRow)
val result = udaf.eval(updatedBuffer)Buffer type for managing UDAF aggregation state.
/**
* Buffer for Hive UDAF operations
* @param buf Hive aggregation buffer
* @param canDoMerge Whether buffer supports merge operations
*/
case class HiveUDAFBuffer(buf: AggregationBuffer, canDoMerge: Boolean)Usage Example:
import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator.AggregationBuffer
// Create UDAF buffer
val hiveBuffer: AggregationBuffer = // created by Hive UDAF
val buffer = HiveUDAFBuffer(hiveBuffer, canDoMerge = true)
// Use in UDAF operations
val serialized = udaf.serialize(buffer)
val deserialized = udaf.deserialize(serialized)Wrapper class for Hive function classes.
/**
* Wrapper for Hive function classes
* @param functionClassName Fully qualified class name of the Hive function
*/
case class HiveFunctionWrapper(functionClassName: String) {
/**
* Create instance of the wrapped function
* @return Instance of the Hive function
*/
def createFunction[T]: T
/**
* Get the function class
* @return Class object for the function
*/
def functionClass: Class[_]
}Usage Example:
// Create wrapper
val wrapper = HiveFunctionWrapper("org.apache.hadoop.hive.ql.udf.UDFLength")
// Create function instance
val udfInstance = wrapper.createFunction[UDF]
// Get function class for reflection
val functionClass = wrapper.functionClass
println(s"Function class: ${functionClass.getName}")Integration with Spark SQL for automatic UDF registration.
// Example of registering Hive UDF in Spark session
def registerHiveUDF(
spark: SparkSession,
name: String,
className: String
): Unit = {
val wrapper = HiveFunctionWrapper(className)
// Register as Spark UDF
spark.udf.register(name, (inputs: Seq[Any]) => {
val udf = HiveSimpleUDF(name, wrapper, inputs.map(Literal(_)))
udf.eval(EmptyRow)
})
}Usage Example:
import org.apache.spark.sql.SparkSession
val spark = SparkSession.builder()
.appName("HiveUDFExample")
.enableHiveSupport()
.getOrCreate()
// Register custom Hive UDF
registerHiveUDF(spark, "my_length", "com.example.MyLengthUDF")
// Use in SQL
spark.sql("SELECT my_length(name) FROM users").show()
// Use in DataFrame API
import spark.implicits._
val df = Seq("hello", "world").toDF("text")
df.select(callUDF("my_length", $"text")).show()Support for complex UDF scenarios.
// UDF with complex input/output types
def createComplexUDF(
name: String,
className: String,
inputTypes: Seq[DataType],
outputType: DataType
): HiveGenericUDF = {
val wrapper = HiveFunctionWrapper(className)
val children = inputTypes.zipWithIndex.map { case (dataType, index) =>
BoundReference(index, dataType, nullable = true)
}
HiveGenericUDF(name, wrapper, children)
}
// UDTF with multiple output columns
def createMultiColumnUDTF(
name: String,
className: String,
inputExpression: Expression,
outputSchema: StructType
): HiveGenericUDTF = {
val wrapper = HiveFunctionWrapper(className)
HiveGenericUDTF(name, wrapper, Seq(inputExpression))
}Usage Example:
import org.apache.spark.sql.types._
// Complex UDF with struct input/output
val complexUDF = createComplexUDF(
name = "process_struct",
className = "com.example.StructProcessorUDF",
inputTypes = Seq(StructType(Seq(
StructField("id", IntegerType, false),
StructField("name", StringType, true)
))),
outputType = StringType
)
// Multi-column UDTF
val outputSchema = StructType(Seq(
StructField("key", StringType, false),
StructField("value", StringType, true)
))
val multiColUDTF = createMultiColumnUDTF(
name = "split_pairs",
className = "com.example.SplitPairsUDTF",
inputExpression = Literal("key1:value1,key2:value2"),
outputSchema = outputSchema
)Common error patterns in UDF execution.
// Handle UDF execution errors
def safeEvaluateUDF(udf: HiveSimpleUDF, input: InternalRow): Option[Any] = {
try {
Some(udf.eval(input))
} catch {
case _: UDFArgumentException =>
println(s"Invalid arguments for UDF ${udf.name}")
None
case _: HiveException =>
println(s"Hive execution error in UDF ${udf.name}")
None
case e: Exception =>
println(s"Unexpected error in UDF ${udf.name}: ${e.getMessage}")
None
}
}Optimization tips for UDF usage.
// Cache UDF instances for repeated use
class UDFCache {
private val cache = mutable.Map[String, HiveFunctionWrapper]()
def getOrCreateWrapper(className: String): HiveFunctionWrapper = {
cache.getOrElseUpdate(className, HiveFunctionWrapper(className))
}
}
// Batch UDF evaluation
def batchEvaluateUDF(
udf: HiveSimpleUDF,
inputs: Seq[InternalRow]
): Seq[Any] = {
// Prepare UDF once
val preparedUDF = udf // UDF preparation happens lazily
// Evaluate for all inputs
inputs.map(preparedUDF.eval)
}Usage Example:
val cache = new UDFCache()
// Reuse wrapper for multiple UDFs of same class
val wrapper1 = cache.getOrCreateWrapper("com.example.MyUDF")
val wrapper2 = cache.getOrCreateWrapper("com.example.MyUDF")
assert(wrapper1 eq wrapper2) // Same instance
// Batch evaluation
val inputs = Seq(
InternalRow(UTF8String.fromString("hello")),
InternalRow(UTF8String.fromString("world"))
)
val udf = HiveSimpleUDF("length", wrapper1, Seq(BoundReference(0, StringType, true)))
val results = batchEvaluateUDF(udf, inputs)
println(s"Batch results: ${results.mkString(", ")}")Install with Tessl CLI
npx tessl i tessl/maven-org-apache-spark--spark-hive-2-13