Integration support for Hive User Defined Functions (UDFs), User Defined Aggregate Functions (UDAFs), and User Defined Table Functions (UDTFs) within Spark SQL.
import org.apache.spark.sql.hive.HiveSimpleUDF
import org.apache.spark.sql.hive.HiveGenericUDF
import org.apache.spark.sql.hive.HiveUDAFFunction
import org.apache.spark.sql.hive.HiveGenericUDTF
import org.apache.spark.sql.catalyst.expressions.Expression
import org.apache.spark.sql.hive.HiveUDFExpressionBuilderWrapper for Hive Simple UDFs that operate on basic data types.
/**
* Wrapper for Hive Simple UDF that operates on basic types
* @param name Function name
* @param funcWrapper Hive UDF instance wrapper
* @param children Input expressions
*/
case class HiveSimpleUDF(
name: String,
funcWrapper: HiveFunctionWrapper,
children: Seq[Expression]
) extends Expression with HiveInspectors {
/** Evaluate UDF with input values */
override def eval(input: InternalRow): Any
/** Generate code for UDF evaluation */
override def doGenerate(ctx: CodegenContext, ev: ExprCode): ExprCode
/** Data type returned by this UDF */
override def dataType: DataType
/** Whether this UDF is deterministic */
override def deterministic: Boolean
/** String representation of UDF call */
override def prettyName: String
}Wrapper for Hive Generic UDFs with complex type support and object inspectors.
/**
* Wrapper for Hive Generic UDF with complex type support
* @param name Function name
* @param funcWrapper Hive Generic UDF wrapper
* @param children Input expressions
*/
case class HiveGenericUDF(
name: String,
funcWrapper: HiveFunctionWrapper,
children: Seq[Expression]
) extends Expression with HiveInspectors {
/** Initialize UDF with object inspectors */
def initialize(objectInspectors: Array[ObjectInspector]): ObjectInspector
/** Evaluate UDF with Hive objects */
override def eval(input: InternalRow): Any
/** UDF return data type */
override def dataType: DataType
/** Whether UDF supports code generation */
override def supportCodegen: Boolean
/** Get UDF usage information */
def getDisplayString(children: Array[String]): String
}Support for Hive UDAFs that perform aggregation operations.
/**
* Wrapper for Hive User Defined Aggregate Functions
* @param name Function name
* @param funcWrapper Hive UDAF wrapper
* @param children Input expressions
* @param isDistinct Whether aggregation is distinct
*/
case class HiveUDAFFunction(
name: String,
funcWrapper: HiveFunctionWrapper,
children: Seq[Expression],
isDistinct: Boolean
) extends AggregateFunction with HiveInspectors {
/** Initialize aggregation buffer */
def createAggregationBuffer(): AggregationBuffer
/** Update aggregation buffer with new value */
def update(buffer: AggregationBuffer, input: InternalRow): Unit
/** Merge two aggregation buffers */
def merge(buffer1: AggregationBuffer, buffer2: AggregationBuffer): Unit
/** Get final aggregation result */
def evaluate(buffer: AggregationBuffer): Any
/** Aggregation buffer schema */
override def aggBufferSchema: StructType
/** Input aggregation buffer attributes */
override def inputAggBufferAttributes: Seq[AttributeReference]
}Support for Hive UDTFs that generate multiple output rows from single input row.
/**
* Wrapper for Hive User Defined Table Functions
* @param name Function name
* @param funcWrapper Hive UDTF wrapper
* @param children Input expressions
*/
case class HiveGenericUDTF(
name: String,
funcWrapper: HiveFunctionWrapper,
children: Seq[Expression]
) extends Generator with HiveInspectors {
/** Initialize UDTF with object inspectors */
def initialize(objectInspectors: Array[ObjectInspector]): StructObjectInspector
/** Process input row and generate output rows */
def process(args: Array[AnyRef]): Unit
/** Signal end of input and flush any remaining output */
def close(): Unit
/** Generate output rows from input */
override def eval(input: InternalRow): TraversableOnce[InternalRow]
/** Output schema for generated rows */
override def outputSchema: StructType
/** Whether UDTF terminates on null input */
override def terminate: Boolean
}Factory for creating UDF expressions from Hive function classes.
/**
* Builder for creating Hive UDF expressions
*/
object HiveUDFExpressionBuilder extends SparkUDFExpressionBuilder {
/**
* Create UDF expression from Hive function class
* @param name Function name
* @param clazz Hive UDF class
* @param input Input expressions
* @returns Appropriate UDF expression wrapper
*/
override def makeExpression(
name: String,
clazz: Class[_],
input: Seq[Expression]
): Expression
/**
* Check if class is a supported Hive UDF type
* @param clazz Class to check
* @returns true if supported UDF type
*/
def isHiveUDF(clazz: Class[_]): Boolean
/**
* Get UDF type from class
* @param clazz UDF class
* @returns UDF type string
*/
def getUDFType(clazz: Class[_]): String
}/**
* Register Hive UDF in Spark session
* @param name Function name to register
* @param className Fully qualified UDF class name
* @param database Optional database name
*/
def registerHiveUDF(
sparkSession: SparkSession,
name: String,
className: String,
database: Option[String] = None
): Unit/**
* Discover and load Hive UDFs from classpath
* @param sparkSession Active Spark session
* @param packagePrefix Package prefix to scan
* @returns Map of discovered UDF names to classes
*/
def discoverHiveUDFs(
sparkSession: SparkSession,
packagePrefix: String
): Map[String, Class[_]]import org.apache.spark.sql.SparkSession
val spark = SparkSession.builder()
.enableHiveSupport()
.getOrCreate()
// Use Hive built-in functions directly in SQL
val result = spark.sql("""
SELECT
reflect('java.lang.Math', 'abs', -5) as abs_value,
reflect('java.lang.String', 'valueOf', 42) as string_value,
java_method('java.lang.Math', 'sqrt', 16.0) as sqrt_value
FROM VALUES (1) as t(dummy)
""")
result.show()// Register custom Hive UDF class
spark.sql("""
CREATE FUNCTION my_upper
AS 'com.example.MyUpperUDF'
USING JAR '/path/to/my-udfs.jar'
""")
// Use the registered UDF
val df = spark.sql("""
SELECT my_upper(name) as upper_name
FROM users
""")
df.show()// Register custom UDAF
spark.sql("""
CREATE FUNCTION my_collect_set
AS 'com.example.MyCollectSetUDAF'
USING JAR '/path/to/my-udafs.jar'
""")
// Use UDAF in aggregation
val aggregated = spark.sql("""
SELECT
category,
my_collect_set(product_name) as unique_products
FROM products
GROUP BY category
""")
aggregated.show()// Register custom UDTF
spark.sql("""
CREATE FUNCTION explode_json
AS 'com.example.ExplodeJsonUDTF'
USING JAR '/path/to/my-udtfs.jar'
""")
// Use UDTF to generate multiple rows
val exploded = spark.sql("""
SELECT
id,
exploded_col
FROM events
LATERAL VIEW explode_json(json_data) exploded_table AS exploded_col
""")
exploded.show()import org.apache.spark.sql.hive.HiveUtils
import org.apache.spark.sql.catalyst.FunctionIdentifier
import org.apache.spark.sql.catalyst.catalog.CatalogFunction
// Create function definition
val functionDefinition = CatalogFunction(
identifier = FunctionIdentifier("custom_concat", Some("default")),
className = "com.example.CustomConcatUDF",
resources = Seq(FunctionResource(JarResource, "/path/to/udf.jar"))
)
// Register through catalog
spark.sessionState.catalog.createFunction(
functionDefinition,
ignoreIfExists = true
)
// Verify registration
val functions = spark.catalog.listFunctions("default")
functions.filter(_.name == "custom_concat").show()// UDF that works with complex types (arrays, maps, structs)
spark.sql("""
CREATE FUNCTION array_intersect
AS 'org.apache.hadoop.hive.ql.udf.generic.GenericUDFArrayIntersect'
""")
val complexQuery = spark.sql("""
SELECT
user_id,
array_intersect(user_interests, recommended_items) as matching_interests
FROM (
SELECT
user_id,
split(interests, ',') as user_interests,
split(recommendations, ',') as recommended_items
FROM user_profiles
)
""")
complexQuery.show()// Use Java reflection for dynamic function calls
val reflectionQuery = spark.sql("""
SELECT
user_name,
reflect('java.lang.String', 'toLowerCase', user_name) as lower_name,
reflect('java.util.UUID', 'randomUUID') as random_id,
java_method('java.lang.System', 'currentTimeMillis') as current_time
FROM users
""")
reflectionQuery.show()import org.apache.spark.sql.AnalysisException
try {
// Attempt to use non-existent UDF
val result = spark.sql("SELECT non_existent_udf(name) FROM users")
result.collect()
} catch {
case e: AnalysisException if e.getMessage.contains("Undefined function") =>
println("UDF not found or not registered")
case e: ClassNotFoundException =>
println("UDF class not found in classpath")
case e: Exception =>
println(s"UDF execution error: ${e.getMessage}")
}
// Safe UDF usage with null handling
val safeQuery = spark.sql("""
SELECT
user_id,
CASE
WHEN user_data IS NOT NULL
THEN my_custom_udf(user_data)
ELSE NULL
END as processed_data
FROM users
""")/**
* Wrapper for Hive function instances with proper class loading
*/
case class HiveFunctionWrapper(
functionClassName: String,
functionInstance: AnyRef
) {
def createFunction[T](): T
def getMethodInfo(): Array[Method]
}
/**
* Resource specification for UDF dependencies
*/
case class FunctionResource(
resourceType: FunctionResourceType,
uri: String
)
sealed trait FunctionResourceType
case object JarResource extends FunctionResourceType
case object FileResource extends FunctionResourceType
case object ArchiveResource extends FunctionResourceType/**
* Trait for working with Hive object inspectors
*/
trait HiveInspectors {
def toInspector(dataType: DataType): ObjectInspector
def wrapperFor(inspector: ObjectInspector, dataType: DataType): (Any) => Any
def unwrap(data: Any, inspector: ObjectInspector): AnyRef
}
/**
* Hive object inspector categories
*/
sealed trait ObjectInspectorCategory
case object PrimitiveObjectInspector extends ObjectInspectorCategory
case object ListObjectInspector extends ObjectInspectorCategory
case object MapObjectInspector extends ObjectInspectorCategory
case object StructObjectInspector extends ObjectInspectorCategory