or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

configuration.mddata-conversion.mdexternal-catalog.mdhive-client.mdindex.mdudf-support.md
tile.json

udf-support.mddocs/

User-Defined Function Support

Complete support for Hive UDFs, UDAFs, and UDTFs with automatic registration and execution within Spark queries. This module provides seamless integration allowing Hive user-defined functions to work natively within Spark SQL expressions.

Capabilities

Hive Simple UDF

Support for simple Hive UDFs that extend org.apache.hadoop.hive.ql.exec.UDF.

/**
 * Expression wrapper for simple Hive UDFs
 * @param name UDF name for display purposes
 * @param funcWrapper Wrapper containing the UDF class
 * @param children Input expressions to the UDF
 */
case class HiveSimpleUDF(
  name: String,
  funcWrapper: HiveFunctionWrapper,
  children: Seq[Expression]
) extends Expression with HiveInspectors with UserDefinedExpression {
  
  /**
   * Return data type of the UDF
   */
  lazy val dataType: DataType
  
  /**
   * Evaluate the UDF with given input
   * @param input Input row
   * @return UDF result
   */
  def eval(input: InternalRow): Any
  
  /**
   * Pretty name for display
   */
  def prettyName: String
  
  /**
   * SQL representation of the UDF call
   */
  def sql: String
}

Usage Example:

import org.apache.spark.sql.hive.HiveFunctionWrapper
import org.apache.spark.sql.catalyst.expressions._

// Create wrapper for a Hive UDF class
val wrapper = HiveFunctionWrapper("com.example.MyHiveUDF")

// Create expression for the UDF
val udfExpr = HiveSimpleUDF(
  name = "my_udf",
  funcWrapper = wrapper,
  children = Seq(Literal("input_string"))
)

// Use in expression evaluation
val result = udfExpr.eval(EmptyRow)
println(s"UDF result: $result")

Hive Generic UDF

Support for generic Hive UDFs that extend org.apache.hadoop.hive.ql.udf.generic.GenericUDF.

/**
 * Expression wrapper for generic Hive UDFs
 * @param name UDF name for display purposes
 * @param funcWrapper Wrapper containing the UDF class
 * @param children Input expressions to the UDF
 */
case class HiveGenericUDF(
  name: String,
  funcWrapper: HiveFunctionWrapper,
  children: Seq[Expression]
) extends Expression with HiveInspectors with UserDefinedExpression {
  
  /**
   * Return data type of the UDF
   */
  lazy val dataType: DataType
  
  /**
   * Evaluate the UDF with given input
   * @param input Input row
   * @return UDF result
   */
  def eval(input: InternalRow): Any
  
  /**
   * Pretty name for display
   */
  def prettyName: String
  
  /**
   * SQL representation of the UDF call
   */
  def sql: String
}

Usage Example:

// Generic UDF for complex type handling
val genericWrapper = HiveFunctionWrapper("org.apache.hadoop.hive.ql.udf.generic.GenericUDFCase")

val genericUDF = HiveGenericUDF(
  name = "case_when",
  funcWrapper = genericWrapper,
  children = Seq(
    Literal(true),
    Literal("true_value"),
    Literal("false_value")
  )
)

val result = genericUDF.eval(EmptyRow)

Hive User-Defined Table Function (UDTF)

Support for Hive UDTFs that generate multiple output rows from single input row.

/**
 * Expression wrapper for Hive UDTFs
 * @param name UDTF name for display purposes
 * @param funcWrapper Wrapper containing the UDTF class
 * @param children Input expressions to the UDTF
 */
case class HiveGenericUDTF(
  name: String,
  funcWrapper: HiveFunctionWrapper,
  children: Seq[Expression]
) extends Generator with HiveInspectors with CodegenFallback with UserDefinedExpression {
  
  /**
   * Output schema of the UDTF
   */
  lazy val elementSchema: StructType
  
  /**
   * Evaluate the UDTF and generate output rows
   * @param input Input row
   * @return Iterator of output rows
   */
  def eval(input: InternalRow): IterableOnce[InternalRow]
  
  /**
   * Terminate the UDTF and return any final rows
   * @return Iterator of final output rows
   */
  def terminate(): IterableOnce[InternalRow]
  
  /**
   * Pretty name for display
   */
  def prettyName: String
  
  /**
   * SQL representation of the UDTF call
   */
  def sql: String
}

Usage Example:

// UDTF that splits strings into multiple rows
val udtfWrapper = HiveFunctionWrapper("org.apache.hadoop.hive.ql.udf.generic.GenericUDTFExplode")

val udtf = HiveGenericUDTF(
  name = "explode",
  funcWrapper = udtfWrapper,
  children = Seq(Literal.create(Array("a", "b", "c"), ArrayType(StringType)))
)

// Evaluate UDTF
val outputRows = udtf.eval(EmptyRow).toSeq
println(s"UDTF generated ${outputRows.length} rows")

// Terminate to get any final rows
val finalRows = udtf.terminate().toSeq

Hive User-Defined Aggregate Function (UDAF)

Support for Hive UDAFs that perform custom aggregation operations.

/**
 * Expression wrapper for Hive UDAFs
 * @param name UDAF name for display purposes
 * @param funcWrapper Wrapper containing the UDAF class
 * @param children Input expressions to the UDAF
 * @param isUDAFBridgeRequired Whether UDAF bridge is needed
 * @param mutableAggBufferOffset Offset in mutable aggregation buffer
 * @param inputAggBufferOffset Offset in input aggregation buffer
 */
case class HiveUDAFFunction(
  name: String,
  funcWrapper: HiveFunctionWrapper,
  children: Seq[Expression],
  isUDAFBridgeRequired: Boolean,
  mutableAggBufferOffset: Int,
  inputAggBufferOffset: Int
) extends TypedImperativeAggregate[HiveUDAFBuffer] with HiveInspectors with UserDefinedExpression {
  
  /**
   * Create a new aggregation buffer
   * @return New UDAF buffer
   */
  def createAggregationBuffer(): HiveUDAFBuffer
  
  /**
   * Update aggregation buffer with new input
   * @param buffer Current aggregation buffer
   * @param input Input row
   * @return Updated buffer
   */
  def update(buffer: HiveUDAFBuffer, input: InternalRow): HiveUDAFBuffer
  
  /**
   * Merge two aggregation buffers
   * @param buffer Target buffer
   * @param input Source buffer to merge
   * @return Merged buffer
   */
  def merge(buffer: HiveUDAFBuffer, input: HiveUDAFBuffer): HiveUDAFBuffer
  
  /**
   * Get final result from aggregation buffer
   * @param buffer Final aggregation buffer
   * @return Aggregation result
   */
  def eval(buffer: HiveUDAFBuffer): Any
  
  /**
   * Serialize aggregation buffer
   * @param buffer Buffer to serialize
   * @return Serialized buffer data
   */
  def serialize(buffer: HiveUDAFBuffer): Array[Byte]
  
  /**
   * Deserialize aggregation buffer
   * @param bytes Serialized buffer data
   * @return Deserialized buffer
   */
  def deserialize(bytes: Array[Byte]): HiveUDAFBuffer
  
  /**
   * Return data type of the aggregation result
   */
  lazy val dataType: DataType
  
  /**
   * Pretty name for display
   */
  def prettyName: String
  
  /**
   * SQL representation of the UDAF call
   */
  def sql: String
}

Usage Example:

// Custom aggregation function
val udafWrapper = HiveFunctionWrapper("com.example.MyHiveUDAF")

val udaf = HiveUDAFFunction(
  name = "my_aggregate",
  funcWrapper = udafWrapper,
  children = Seq(col("value")),
  isUDAFBridgeRequired = false,
  mutableAggBufferOffset = 0,
  inputAggBufferOffset = 0
)

// Use in aggregation context
val buffer = udaf.createAggregationBuffer()
val updatedBuffer = udaf.update(buffer, inputRow)
val result = udaf.eval(updatedBuffer)

UDAF Buffer

Buffer type for managing UDAF aggregation state.

/**
 * Buffer for Hive UDAF operations
 * @param buf Hive aggregation buffer
 * @param canDoMerge Whether buffer supports merge operations
 */
case class HiveUDAFBuffer(buf: AggregationBuffer, canDoMerge: Boolean)

Usage Example:

import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator.AggregationBuffer

// Create UDAF buffer
val hiveBuffer: AggregationBuffer = // created by Hive UDAF
val buffer = HiveUDAFBuffer(hiveBuffer, canDoMerge = true)

// Use in UDAF operations
val serialized = udaf.serialize(buffer)
val deserialized = udaf.deserialize(serialized)

Function Wrapper

Wrapper class for Hive function classes.

/**
 * Wrapper for Hive function classes
 * @param functionClassName Fully qualified class name of the Hive function
 */
case class HiveFunctionWrapper(functionClassName: String) {
  
  /**
   * Create instance of the wrapped function
   * @return Instance of the Hive function
   */
  def createFunction[T]: T
  
  /**
   * Get the function class
   * @return Class object for the function
   */
  def functionClass: Class[_]
}

Usage Example:

// Create wrapper
val wrapper = HiveFunctionWrapper("org.apache.hadoop.hive.ql.udf.UDFLength")

// Create function instance
val udfInstance = wrapper.createFunction[UDF]

// Get function class for reflection
val functionClass = wrapper.functionClass
println(s"Function class: ${functionClass.getName}")

UDF Registration and Usage

Integration with Spark SQL for automatic UDF registration.

// Example of registering Hive UDF in Spark session
def registerHiveUDF(
  spark: SparkSession,
  name: String,
  className: String
): Unit = {
  
  val wrapper = HiveFunctionWrapper(className)
  
  // Register as Spark UDF
  spark.udf.register(name, (inputs: Seq[Any]) => {
    val udf = HiveSimpleUDF(name, wrapper, inputs.map(Literal(_)))
    udf.eval(EmptyRow)
  })
}

Usage Example:

import org.apache.spark.sql.SparkSession

val spark = SparkSession.builder()
  .appName("HiveUDFExample")
  .enableHiveSupport()
  .getOrCreate()

// Register custom Hive UDF
registerHiveUDF(spark, "my_length", "com.example.MyLengthUDF")

// Use in SQL
spark.sql("SELECT my_length(name) FROM users").show()

// Use in DataFrame API
import spark.implicits._
val df = Seq("hello", "world").toDF("text")
df.select(callUDF("my_length", $"text")).show()

Advanced UDF Features

Support for complex UDF scenarios.

// UDF with complex input/output types
def createComplexUDF(
  name: String,
  className: String,
  inputTypes: Seq[DataType],
  outputType: DataType
): HiveGenericUDF = {
  
  val wrapper = HiveFunctionWrapper(className)
  val children = inputTypes.zipWithIndex.map { case (dataType, index) =>
    BoundReference(index, dataType, nullable = true)
  }
  
  HiveGenericUDF(name, wrapper, children)
}

// UDTF with multiple output columns
def createMultiColumnUDTF(
  name: String,
  className: String,
  inputExpression: Expression,
  outputSchema: StructType
): HiveGenericUDTF = {
  
  val wrapper = HiveFunctionWrapper(className)
  
  HiveGenericUDTF(name, wrapper, Seq(inputExpression))
}

Usage Example:

import org.apache.spark.sql.types._

// Complex UDF with struct input/output
val complexUDF = createComplexUDF(
  name = "process_struct",
  className = "com.example.StructProcessorUDF",
  inputTypes = Seq(StructType(Seq(
    StructField("id", IntegerType, false),
    StructField("name", StringType, true)
  ))),
  outputType = StringType
)

// Multi-column UDTF
val outputSchema = StructType(Seq(
  StructField("key", StringType, false),
  StructField("value", StringType, true)
))

val multiColUDTF = createMultiColumnUDTF(
  name = "split_pairs",
  className = "com.example.SplitPairsUDTF",
  inputExpression = Literal("key1:value1,key2:value2"),
  outputSchema = outputSchema
)

Error Handling

Common error patterns in UDF execution.

// Handle UDF execution errors
def safeEvaluateUDF(udf: HiveSimpleUDF, input: InternalRow): Option[Any] = {
  try {
    Some(udf.eval(input))
  } catch {
    case _: UDFArgumentException =>
      println(s"Invalid arguments for UDF ${udf.name}")
      None
    case _: HiveException =>
      println(s"Hive execution error in UDF ${udf.name}")
      None
    case e: Exception =>
      println(s"Unexpected error in UDF ${udf.name}: ${e.getMessage}")
      None
  }
}

Performance Considerations

Optimization tips for UDF usage.

// Cache UDF instances for repeated use
class UDFCache {
  private val cache = mutable.Map[String, HiveFunctionWrapper]()
  
  def getOrCreateWrapper(className: String): HiveFunctionWrapper = {
    cache.getOrElseUpdate(className, HiveFunctionWrapper(className))
  }
}

// Batch UDF evaluation
def batchEvaluateUDF(
  udf: HiveSimpleUDF,
  inputs: Seq[InternalRow]
): Seq[Any] = {
  
  // Prepare UDF once
  val preparedUDF = udf // UDF preparation happens lazily
  
  // Evaluate for all inputs
  inputs.map(preparedUDF.eval)
}

Usage Example:

val cache = new UDFCache()

// Reuse wrapper for multiple UDFs of same class
val wrapper1 = cache.getOrCreateWrapper("com.example.MyUDF")
val wrapper2 = cache.getOrCreateWrapper("com.example.MyUDF")
assert(wrapper1 eq wrapper2) // Same instance

// Batch evaluation
val inputs = Seq(
  InternalRow(UTF8String.fromString("hello")),
  InternalRow(UTF8String.fromString("world"))
)

val udf = HiveSimpleUDF("length", wrapper1, Seq(BoundReference(0, StringType, true)))
val results = batchEvaluateUDF(udf, inputs)
println(s"Batch results: ${results.mkString(", ")}")