or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

core-hive-integration.mdexecution-engine.mdfile-formats.mdindex.mdmetastore-operations.mdudf-integration.md
tile.json

udf-integration.mddocs/

UDF Integration

Apache Spark Hive integration provides comprehensive support for executing Hive User-Defined Functions (UDFs), User-Defined Aggregate Functions (UDAFs), and User-Defined Table-Generating Functions (UDTFs) within Spark SQL queries.

Overview

The UDF integration system allows Spark to execute existing Hive UDFs without modification, providing seamless compatibility with existing Hive-based data processing pipelines. Spark wraps Hive UDFs in specialized expression classes that handle the translation between Spark's internal row format and Hive's object format.

HiveSimpleUDF

Wrapper for Hive simple UDFs that extend org.apache.hadoop.hive.ql.exec.UDF.

case class HiveSimpleUDF(
  name: String, 
  funcWrapper: HiveFunctionWrapper, 
  children: Seq[Expression]
) extends Expression with CodegenFallback with Logging {

  def eval(input: InternalRow): Any
  def dataType: DataType
  def nullable: Boolean
  def prettyName: String = name
  override def toString: String
}

Usage Example

import org.apache.spark.sql.SparkSession

val spark = SparkSession.builder()
  .enableHiveSupport()
  .getOrCreate()

// Register a simple Hive UDF
spark.sql("""
CREATE TEMPORARY FUNCTION my_upper AS 'com.example.UpperCaseUDF'
""")

// Use the UDF in queries
val result = spark.sql("""
SELECT my_upper(name) as upper_name 
FROM employee
""")

result.show()

Creating Custom Simple UDFs

To create a Hive UDF that works with Spark:

package com.example;

import org.apache.hadoop.hive.ql.exec.UDF;
import org.apache.hadoop.io.Text;

public class UpperCaseUDF extends UDF {
    public Text evaluate(Text input) {
        if (input == null) return null;
        return new Text(input.toString().toUpperCase());
    }
}

HiveGenericUDF

Wrapper for Hive generic UDFs that extend org.apache.hadoop.hive.ql.udf.generic.GenericUDF.

case class HiveGenericUDF(
  name: String,
  funcWrapper: HiveFunctionWrapper, 
  children: Seq[Expression]
) extends Expression with CodegenFallback with Logging {

  def eval(input: InternalRow): Any
  def dataType: DataType  
  def nullable: Boolean
  def prettyName: String = name
  def foldable: Boolean
  override def toString: String
}

Usage Example

// Register a generic UDF
spark.sql("""
CREATE TEMPORARY FUNCTION json_extract AS 'org.apache.hadoop.hive.ql.udf.generic.GenericUDFJsonExtract'
""")

// Use the generic UDF
val result = spark.sql("""
SELECT json_extract(json_column, '$.name') as extracted_name
FROM json_table
""")

Generic UDF Advantages

Generic UDFs provide more flexibility than simple UDFs:

  • Support for complex data types (arrays, maps, structs)
  • Runtime type checking and conversion
  • Better performance through object inspector framework
  • Support for variable arguments

HiveGenericUDTF

Wrapper for Hive User-Defined Table-Generating Functions that produce multiple rows from a single input row.

case class HiveGenericUDTF(
  name: String,
  funcWrapper: HiveFunctionWrapper,
  children: Seq[Expression]
) extends Generator with CodegenFallback with Logging {

  def eval(input: InternalRow): TraversableOnce[InternalRow]
  def terminate(): TraversableOnce[InternalRow]
  def dataType: DataType
  def nullable: Boolean  
  def prettyName: String = name
  override def toString: String
}

Usage Example

// Register a UDTF (e.g., explode-like function)
spark.sql("""
CREATE TEMPORARY FUNCTION split_words AS 'com.example.SplitWordsUDTF'
""")

// Use UDTF in LATERAL VIEW
val result = spark.sql("""
SELECT word
FROM sentences 
LATERAL VIEW split_words(text) exploded_table AS word
""")

UDTF Lifecycle

UDTFs follow a specific lifecycle:

  1. Initialize: Setup phase with input object inspectors
  2. Process: Called for each input row, may produce 0 or more output rows
  3. Termine: Called at the end, may produce final output rows

HiveUDAFFunction

Wrapper for Hive User-Defined Aggregate Functions that perform aggregation operations.

case class HiveUDAFFunction(
  name: String,
  funcWrapper: HiveFunctionWrapper,
  children: Seq[Expression],
  isUDAFBridgeRequired: Boolean = false,
  mutableAggBufferOffset: Int = 0,
  inputAggBufferOffset: Int = 0
) extends TypedImperativeAggregate[GenericUDAFEvaluator.AggregationBuffer] with Logging {

  def createAggregationBuffer(): AggregationBuffer
  def update(buffer: AggregationBuffer, input: InternalRow): AggregationBuffer  
  def merge(buffer: AggregationBuffer, input: AggregationBuffer): AggregationBuffer
  def eval(buffer: AggregationBuffer): Any
  def serialize(buffer: AggregationBuffer): Array[Byte]
  def deserialize(storageFormat: Array[Byte]): AggregationBuffer
  def prettyName: String = name
}

Usage Example

// Register a UDAF
spark.sql("""
CREATE TEMPORARY FUNCTION my_avg AS 'com.example.AverageUDAF'
""")

// Use UDAF in aggregation query
val result = spark.sql("""
SELECT department, my_avg(salary) as avg_salary
FROM employee
GROUP BY department
""")

UDAF Aggregation Process

UDAFs implement a distributed aggregation process:

  1. Partial aggregation: Each partition computes partial results
  2. Merge: Partial results are combined across partitions
  3. Final evaluation: Final result is computed from merged state

HiveFunctionWrapper

Core wrapper class that loads and manages Hive function instances.

case class HiveFunctionWrapper(
  className: String,
  instance: AnyRef
) extends Serializable {

  def createFunction[T](): T
  def getMethodName(): String
  def getParameterTypes(): Array[Class[_]]
}

Function Loading

// Create function wrapper
val wrapper = HiveFunctionWrapper("com.example.MyUDF", null)

// Create function instance
val udfInstance = wrapper.createFunction[UDF]()

Function Registration

Temporary Functions

Register functions for the current session:

// Register UDF from JAR
spark.sql("""
CREATE TEMPORARY FUNCTION my_func AS 'com.example.MyFunction'
""")

// Register with JAR location
spark.sql("""
CREATE FUNCTION my_func AS 'com.example.MyFunction' 
USING JAR '/path/to/udf.jar'
""")

Permanent Functions

Register functions in Hive metastore:

// Create permanent function
spark.sql("""
CREATE FUNCTION my_database.my_func AS 'com.example.MyFunction'
USING JAR 'hdfs://path/to/udf.jar'
""")

// Use permanent function
val result = spark.sql("""
SELECT my_database.my_func(column) FROM table
""")

Function Discovery

List and inspect registered functions:

// List all functions
spark.sql("SHOW FUNCTIONS").show()

// List functions matching pattern
spark.sql("SHOW FUNCTIONS LIKE 'my_*'").show()

// Describe function
spark.sql("DESCRIBE FUNCTION my_func").show()

// Show extended info
spark.sql("DESCRIBE FUNCTION EXTENDED my_func").show()

Data Type Mapping

Mapping between Hive and Spark data types in UDF integration:

Primitive Types

  • BOOLEAN ↔ BooleanType
  • TINYINT ↔ ByteType
  • SMALLINT ↔ ShortType
  • INT ↔ IntegerType
  • BIGINT ↔ LongType
  • FLOAT ↔ FloatType
  • DOUBLE ↔ DoubleType
  • STRING ↔ StringType
  • BINARY ↔ BinaryType
  • TIMESTAMP ↔ TimestampType
  • DATE ↔ DateType

Complex Types

  • ARRAY<T> ↔ ArrayType(T)
  • MAP<K,V> ↔ MapType(K,V)
  • STRUCT<...> ↔ StructType(...)

Usage in UDFs

// Java UDF handling complex types
public class ComplexUDF extends GenericUDF {
    @Override
    public Object evaluate(DeferredObject[] arguments) throws HiveException {
        // Handle array input
        ListObjectInspector listOI = (ListObjectInspector) arguments[0].get();
        List<?> inputList = listOI.getList(arguments[0].get());
        
        // Process and return result
        return result;
    }
}

Performance Considerations

UDF Performance Tips

  1. Use Generic UDFs: Better performance than simple UDFs for complex operations
  2. Minimize Object Creation: Reuse objects where possible in UDF evaluation
  3. Leverage Vectorization: Some UDFs can benefit from vectorized execution
  4. Consider Native Functions: Use Spark's built-in functions when available

Example Optimized UDF

public class OptimizedUDF extends GenericUDF {
    private Text result = new Text(); // Reuse object
    
    @Override
    public Object evaluate(DeferredObject[] arguments) throws HiveException {
        String input = arguments[0].get().toString();
        result.set(processString(input)); // Reuse Text object
        return result;
    }
}

Error Handling

Common UDF Errors

ClassNotFoundException: UDF class not found

// Solution: Add JAR to classpath
spark.sql("ADD JAR '/path/to/udf.jar'")

Method Not Found: Incorrect UDF method signature

// Ensure UDF implements correct evaluate() method
public Object evaluate(DeferredObject[] args) throws HiveException

Serialization Issues: UDF not serializable for distributed execution

// Make UDF implement Serializable or use transient fields
public class MyUDF extends GenericUDF implements Serializable

Exception Handling in UDFs

public class SafeUDF extends GenericUDF {
    @Override
    public Object evaluate(DeferredObject[] arguments) throws HiveException {
        try {
            // UDF logic
            return processInput(arguments[0].get());
        } catch (Exception e) {
            // Handle errors gracefully
            LOG.warn("UDF error: " + e.getMessage());
            return null; // or appropriate default value
        }
    }
}

Testing UDFs

Unit Testing

import org.apache.spark.sql.test.SharedSparkSession

class UDFIntegrationSuite extends SparkFunSuite with SharedSparkSession {
  test("custom UDF execution") {
    spark.sql("CREATE TEMPORARY FUNCTION test_udf AS 'com.example.TestUDF'")
    
    val result = spark.sql("SELECT test_udf('input') as output").collect()
    assert(result(0).getString(0) == "expected_output")
  }
}

Integration Testing

// Test with actual Hive UDFs
class HiveUDFSuite extends SparkFunSuite with SharedSparkSession {
  test("hive builtin UDF") {
    val result = spark.sql("SELECT upper('hello') as upper_hello").collect()
    assert(result(0).getString(0) == "HELLO")
  }
}

Migration and Compatibility

Migrating from Hive

Most Hive UDFs work without modification in Spark:

  1. Copy JAR files to Spark classpath
  2. Register functions using CREATE FUNCTION
  3. Test functionality with representative data
  4. Monitor performance and optimize if needed

Version Compatibility

UDF compatibility across Hive versions:

  • Simple UDFs: Generally compatible across versions
  • Generic UDFs: May require Hive version-specific compilation
  • Built-in UDFs: Spark provides compatibility layer

Types

// Base expression interface
trait Expression extends TreeNode[Expression] {
  def dataType: DataType
  def nullable: Boolean
  def eval(input: InternalRow): Any
  def prettyName: String
}

// Generator for table-generating functions
trait Generator extends Expression {
  def eval(input: InternalRow): TraversableOnce[InternalRow]
  def terminate(): TraversableOnce[InternalRow]
}

// Aggregate function interface
trait TypedImperativeAggregate[T] extends ImperativeAggregate {
  def createAggregationBuffer(): T
  def update(buffer: T, input: InternalRow): T
  def merge(buffer: T, input: T): T
  def eval(buffer: T): Any
}

// Hive UDAF aggregation buffer
trait AggregationBuffer {
  def reset(): Unit
  def copy(): AggregationBuffer
}

// Object inspector for Hive type system
trait ObjectInspector {
  def getCategory(): ObjectInspector.Category
  def getTypeName(): String
}

// Function wrapper for Hive functions
case class HiveFunctionWrapper(className: String, instance: AnyRef) extends Serializable {
  def createFunction[T](): T
}

// Internal row representation
trait InternalRow {
  def numFields: Int
  def get(ordinal: Int, dataType: DataType): Any
  def isNullAt(ordinal: Int): Boolean
}