Comprehensive support for Hive User-Defined Functions including simple UDFs, generic UDFs, table-generating functions (UDTFs), and aggregate functions (UDAFs). This enables seamless integration of existing Hive UDFs within Spark SQL queries.
Wrapper for simple Hive UDFs that work with basic data types.
/**
* Expression wrapper for simple Hive UDFs
* Handles basic data type conversion between Hive and Catalyst
*/
case class HiveSimpleUDF(
funcWrapper: HiveFunctionWrapper,
children: Seq[Expression]
) extends Expression with HiveInspectors with CodegenFallback with Logging {
override def dataType: DataType
override def nullable: Boolean = true
override def eval(input: InternalRow): Any
override def prettyName: String
}Usage Examples:
import org.apache.spark.sql.SparkSession
val spark = SparkSession.builder()
.enableHiveSupport()
.getOrCreate()
// Register and use simple Hive UDF
spark.sql("CREATE TEMPORARY FUNCTION simple_upper AS 'org.apache.hadoop.hive.ql.udf.UDFUpper'")
spark.sql("SELECT simple_upper('hello world') as result").show()
// Result: HELLO WORLD
// Use built-in Hive simple UDFs
spark.sql("SELECT substr('Apache Spark', 1, 6) as result").show()
// Result: ApacheWrapper for generic Hive UDFs that can handle complex data types and advanced operations.
/**
* Expression wrapper for generic Hive UDFs
* Supports complex data types and provides full ObjectInspector integration
*/
case class HiveGenericUDF(
funcWrapper: HiveFunctionWrapper,
children: Seq[Expression]
) extends Expression with HiveInspectors with CodegenFallback with Logging {
override def dataType: DataType
override def nullable: Boolean = true
override def eval(input: InternalRow): Any
override def prettyName: String
}Usage Examples:
// Register generic UDF for JSON processing
spark.sql("""
CREATE TEMPORARY FUNCTION get_json_object
AS 'org.apache.hadoop.hive.ql.udf.generic.GenericUDFJsonObject'
""")
// Use generic UDF
val jsonData = """{"name": "Alice", "age": 25, "city": "NYC"}"""
spark.sql(s"""
SELECT
get_json_object('$jsonData', '$$.name') as name,
get_json_object('$jsonData', '$$.age') as age
""").show()
// Register custom generic UDF for array operations
spark.sql("""
CREATE TEMPORARY FUNCTION array_contains
AS 'org.apache.hadoop.hive.ql.udf.generic.GenericUDFArrayContains'
""")
spark.sql("""
SELECT array_contains(array(1,2,3,4), 3) as contains_three
""").show()
// Result: trueSupport for Hive UDTFs that generate multiple rows from a single input row.
/**
* Generator wrapper for Hive UDTFs (User-Defined Table-Generating Functions)
* Converts single input rows into multiple output rows
*/
case class HiveGenericUDTF(
funcWrapper: HiveFunctionWrapper,
children: Seq[Expression]
) extends Generator with HiveInspectors with CodegenFallback with Logging {
override def elementSchema: StructType
override def eval(input: InternalRow): TraversableOnce[InternalRow]
override def prettyName: String
}Usage Examples:
// Register explode UDTF for array expansion
spark.sql("""
CREATE TEMPORARY FUNCTION hive_explode
AS 'org.apache.hadoop.hive.ql.udf.generic.GenericUDTFExplode'
""")
// Use UDTF to expand arrays
spark.sql("""
SELECT hive_explode(array('a', 'b', 'c')) as item
""").show()
// Results:
// +----+
// |item|
// +----+
// | a |
// | b |
// | c |
// +----+
// Register stack UDTF for pivoting data
spark.sql("""
CREATE TEMPORARY FUNCTION stack
AS 'org.apache.hadoop.hive.ql.udf.generic.GenericUDTFStack'
""")
spark.sql("""
SELECT stack(2, 'A', 1, 'B', 2) as (letter, number)
""").show()
// Results:
// +------+------+
// |letter|number|
// +------+------+
// | A| 1|
// | B| 2|
// +------+------+Support for Hive UDAFs that perform custom aggregation operations.
/**
* Aggregate function wrapper for Hive UDAFs
* Provides custom aggregation logic with proper state management
*/
case class HiveUDAFFunction(
funcWrapper: HiveFunctionWrapper,
children: Seq[Expression]
) extends TypedImperativeAggregate[Any] with HiveInspectors with Logging {
override def nullable: Boolean = true
override def dataType: DataType
override def prettyName: String
override def createAggregationBuffer(): Any
override def update(buffer: Any, input: InternalRow): Any
override def merge(buffer: Any, input: Any): Any
override def eval(buffer: Any): Any
override def serialize(buffer: Any): Array[Byte]
override def deserialize(storageFormat: Array[Byte]): Any
}Usage Examples:
// Register custom UDAF for advanced statistics
spark.sql("""
CREATE TEMPORARY FUNCTION variance
AS 'org.apache.hadoop.hive.ql.udf.generic.GenericUDAFVariance'
""")
// Use UDAF in aggregation query
spark.sql("""
SELECT
department,
variance(salary) as salary_variance
FROM employees
GROUP BY department
""").show()
// Register percentile UDAF
spark.sql("""
CREATE TEMPORARY FUNCTION percentile_approx
AS 'org.apache.hadoop.hive.ql.udf.generic.GenericUDAFPercentileApprox'
""")
spark.sql("""
SELECT
percentile_approx(age, 0.5) as median_age,
percentile_approx(age, 0.95) as p95_age
FROM users
""").show()Wrapper for Hive UDTFs that generate multiple rows from single input rows, extending the Generator interface.
/**
* Generator wrapper for Hive UDTFs (User-Defined Table-Generating Functions)
* Converts single input rows into multiple output rows with full schema support
*/
case class HiveGenericUDTF(
funcWrapper: HiveFunctionWrapper,
children: Seq[Expression]
) extends Generator with HiveInspectors with CodegenFallback with Logging {
override def elementSchema: StructType
override def eval(input: InternalRow): TraversableOnce[InternalRow]
override def prettyName: String
override def terminate(): TraversableOnce[InternalRow]
override def close(): Unit
}Usage Examples:
// Register custom UDTF for data expansion
spark.sql("""
CREATE TEMPORARY FUNCTION my_explode_json
AS 'com.example.JsonExplodeUDTF'
""")
// Use UDTF in LATERAL VIEW
spark.sql("""
SELECT t.id, exploded.key, exploded.value
FROM my_table t
LATERAL VIEW my_explode_json(t.json_data) exploded AS key, value
""").show()
// Built-in UDTF examples
spark.sql("""
SELECT explode(array('a', 'b', 'c')) as item
""").show()
spark.sql("""
SELECT stack(3, 'col1', 1, 'col2', 2, 'col3', 3) as (name, value)
""").show()Wrapper for Hive UDAFs providing custom aggregation with proper state management and distributed execution support.
/**
* Aggregate function wrapper for Hive UDAFs
* Provides custom aggregation logic with proper state management for distributed execution
*/
case class HiveUDAFFunction(
funcWrapper: HiveFunctionWrapper,
children: Seq[Expression]
) extends TypedImperativeAggregate[Any] with HiveInspectors with Logging {
override def nullable: Boolean = true
override def dataType: DataType
override def prettyName: String
override def createAggregationBuffer(): Any
override def update(buffer: Any, input: InternalRow): Any
override def merge(buffer: Any, input: Any): Any
override def eval(buffer: Any): Any
override def serialize(buffer: Any): Array[Byte]
override def deserialize(storageFormat: Array[Byte]): Any
}Usage Examples:
// Register custom UDAF for advanced analytics
spark.sql("""
CREATE TEMPORARY FUNCTION geometric_mean
AS 'com.example.GeometricMeanUDAF'
""")
// Use UDAF in aggregation queries
spark.sql("""
SELECT
department,
geometric_mean(salary) as geo_mean_salary,
percentile_approx(salary, 0.5) as median_salary
FROM employees
GROUP BY department
""").show()
// Window function usage
spark.sql("""
SELECT
name,
salary,
variance(salary) OVER (PARTITION BY department) as dept_variance
FROM employees
""").show()Utilities for registering and managing Hive UDFs in Spark sessions.
/**
* Function wrapper for Hive functions with class loading support
*/
case class HiveFunctionWrapper(functionClassName: String) {
def createFunction[UDFType](): UDFType
}Registration Examples:
import org.apache.spark.sql.SparkSession
val spark = SparkSession.builder()
.enableHiveSupport()
.getOrCreate()
// Register UDF from JAR
spark.sql("ADD JAR /path/to/custom-udfs.jar")
spark.sql("""
CREATE TEMPORARY FUNCTION my_custom_function
AS 'com.example.MyCustomUDF'
""")
// Register permanent function in metastore
spark.sql("""
CREATE FUNCTION my_db.my_permanent_function
AS 'com.example.MyPermanentUDF'
USING JAR '/path/to/custom-udfs.jar'
""")
// List available functions
spark.sql("SHOW FUNCTIONS LIKE '*custom*'").show()
// Get function information
spark.sql("DESCRIBE FUNCTION my_custom_function").show(truncate = false)Support for complex Hive data types in UDF operations.
/**
* HiveInspectors trait provides conversion utilities between
* Hive ObjectInspectors and Catalyst data types
*/
trait HiveInspectors {
/** Convert Java type to Catalyst DataType */
def javaTypeToDataType(clz: Type): DataType
/** Create wrapper for converting Catalyst data to Hive format */
def wrapperFor(oi: ObjectInspector, dataType: DataType): Any => Any
/** Create unwrapper for converting Hive data to Catalyst format */
def unwrapperFor(objectInspector: ObjectInspector): Any => Any
/** Convert Catalyst DataType to Hive ObjectInspector */
def toInspector(dataType: DataType): ObjectInspector
/** Convert Hive ObjectInspector to Catalyst DataType */
def inspectorToDataType(inspector: ObjectInspector): DataType
}Complex Type Examples:
// Working with array types in UDFs
spark.sql("""
SELECT
collect_list(name) as names,
size(collect_list(name)) as count
FROM users
GROUP BY department
""").show()
// Working with map types
spark.sql("""
SELECT
str_to_map('key1:value1,key2:value2', ',', ':') as parsed_map
""").show()
// Working with struct types
spark.sql("""
SELECT
named_struct('name', 'Alice', 'age', 25) as person_info
""").show()Access to extensive set of built-in Hive functions.
String Functions:
// String manipulation functions
spark.sql("SELECT concat('Hello', ' ', 'World') as greeting").show()
spark.sql("SELECT upper('apache spark') as upper_case").show()
spark.sql("SELECT regexp_replace('abc123def', '[0-9]+', 'XXX') as replaced").show()Date/Time Functions:
// Date and time functions
spark.sql("SELECT from_unixtime(unix_timestamp()) as current_time").show()
spark.sql("SELECT date_add('2023-01-01', 30) as future_date").show()
spark.sql("SELECT datediff('2023-12-31', '2023-01-01') as days_diff").show()Mathematical Functions:
// Mathematical functions
spark.sql("SELECT round(3.14159, 2) as rounded").show()
spark.sql("SELECT pow(2, 3) as power").show()
spark.sql("SELECT greatest(1, 5, 3, 2) as max_value").show()Conditional Functions:
// Conditional functions
spark.sql("""
SELECT
name,
CASE
WHEN age < 18 THEN 'Minor'
WHEN age < 65 THEN 'Adult'
ELSE 'Senior'
END as age_group
FROM users
""").show()
spark.sql("SELECT nvl(null_column, 'default_value') as coalesced").show()Common error patterns and exception handling for UDF operations:
import org.apache.spark.sql.AnalysisException
try {
// Attempt to use non-existent UDF
spark.sql("SELECT non_existent_udf('test')").show()
} catch {
case e: AnalysisException if e.getMessage.contains("undefined function") =>
println("UDF not found - check function registration")
case e: Exception =>
println(s"UDF execution error: ${e.getMessage}")
}
// Handle UDF registration errors
try {
spark.sql("CREATE TEMPORARY FUNCTION bad_udf AS 'invalid.class.name'")
} catch {
case e: ClassNotFoundException =>
println("UDF class not found - check classpath")
case e: Exception =>
println(s"UDF registration failed: ${e.getMessage}")
}Best practices for UDF performance:
// Prefer built-in functions over custom UDFs when possible
// GOOD: Use built-in functions
spark.sql("SELECT upper(name) FROM users")
// LESS OPTIMAL: Custom UDF for same functionality
// spark.sql("SELECT my_upper_udf(name) FROM users")
// Use vectorized operations when available
// For Spark 2.4+, some Hive UDFs support vectorization
spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", "true")
// Register UDFs once per session to avoid repeated registration overhead
val spark = SparkSession.builder().enableHiveSupport().getOrCreate()
spark.sql("CREATE TEMPORARY FUNCTION my_udf AS 'com.example.MyUDF'")
// Reuse throughout session