or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

configuration-utilities.mdfile-formats.mdhive-client.mdindex.mdmetastore-integration.mdquery-execution.mdsession-configuration.mdudf-support.md
tile.json

udf-support.mddocs/

Hive UDF Support

Integration support for Hive User Defined Functions (UDFs), User Defined Aggregate Functions (UDAFs), and User Defined Table Functions (UDTFs) within Spark SQL.

Core Imports

import org.apache.spark.sql.hive.HiveSimpleUDF
import org.apache.spark.sql.hive.HiveGenericUDF
import org.apache.spark.sql.hive.HiveUDAFFunction
import org.apache.spark.sql.hive.HiveGenericUDTF
import org.apache.spark.sql.catalyst.expressions.Expression
import org.apache.spark.sql.hive.HiveUDFExpressionBuilder

Capabilities

Simple UDF Support

Wrapper for Hive Simple UDFs that operate on basic data types.

/**
 * Wrapper for Hive Simple UDF that operates on basic types
 * @param name Function name
 * @param funcWrapper Hive UDF instance wrapper
 * @param children Input expressions
 */
case class HiveSimpleUDF(
  name: String,
  funcWrapper: HiveFunctionWrapper,
  children: Seq[Expression]
) extends Expression with HiveInspectors {
  
  /** Evaluate UDF with input values */
  override def eval(input: InternalRow): Any
  
  /** Generate code for UDF evaluation */
  override def doGenerate(ctx: CodegenContext, ev: ExprCode): ExprCode
  
  /** Data type returned by this UDF */
  override def dataType: DataType
  
  /** Whether this UDF is deterministic */
  override def deterministic: Boolean
  
  /** String representation of UDF call */
  override def prettyName: String
}

Generic UDF Support

Wrapper for Hive Generic UDFs with complex type support and object inspectors.

/**
 * Wrapper for Hive Generic UDF with complex type support
 * @param name Function name
 * @param funcWrapper Hive Generic UDF wrapper
 * @param children Input expressions
 */
case class HiveGenericUDF(
  name: String,
  funcWrapper: HiveFunctionWrapper,
  children: Seq[Expression]
) extends Expression with HiveInspectors {
  
  /** Initialize UDF with object inspectors */
  def initialize(objectInspectors: Array[ObjectInspector]): ObjectInspector
  
  /** Evaluate UDF with Hive objects */
  override def eval(input: InternalRow): Any
  
  /** UDF return data type */
  override def dataType: DataType
  
  /** Whether UDF supports code generation */
  override def supportCodegen: Boolean
  
  /** Get UDF usage information */
  def getDisplayString(children: Array[String]): String
}

User Defined Aggregate Function (UDAF) Support

Support for Hive UDAFs that perform aggregation operations.

/**
 * Wrapper for Hive User Defined Aggregate Functions
 * @param name Function name
 * @param funcWrapper Hive UDAF wrapper
 * @param children Input expressions
 * @param isDistinct Whether aggregation is distinct
 */
case class HiveUDAFFunction(
  name: String,
  funcWrapper: HiveFunctionWrapper,
  children: Seq[Expression],
  isDistinct: Boolean
) extends AggregateFunction with HiveInspectors {
  
  /** Initialize aggregation buffer */
  def createAggregationBuffer(): AggregationBuffer
  
  /** Update aggregation buffer with new value */
  def update(buffer: AggregationBuffer, input: InternalRow): Unit
  
  /** Merge two aggregation buffers */
  def merge(buffer1: AggregationBuffer, buffer2: AggregationBuffer): Unit
  
  /** Get final aggregation result */
  def evaluate(buffer: AggregationBuffer): Any
  
  /** Aggregation buffer schema */
  override def aggBufferSchema: StructType
  
  /** Input aggregation buffer attributes */
  override def inputAggBufferAttributes: Seq[AttributeReference]
}

User Defined Table Function (UDTF) Support

Support for Hive UDTFs that generate multiple output rows from single input row.

/**
 * Wrapper for Hive User Defined Table Functions
 * @param name Function name
 * @param funcWrapper Hive UDTF wrapper
 * @param children Input expressions
 */
case class HiveGenericUDTF(
  name: String,
  funcWrapper: HiveFunctionWrapper,
  children: Seq[Expression]
) extends Generator with HiveInspectors {
  
  /** Initialize UDTF with object inspectors */
  def initialize(objectInspectors: Array[ObjectInspector]): StructObjectInspector
  
  /** Process input row and generate output rows */
  def process(args: Array[AnyRef]): Unit
  
  /** Signal end of input and flush any remaining output */
  def close(): Unit
  
  /** Generate output rows from input */
  override def eval(input: InternalRow): TraversableOnce[InternalRow]
  
  /** Output schema for generated rows */
  override def outputSchema: StructType
  
  /** Whether UDTF terminates on null input */
  override def terminate: Boolean
}

UDF Expression Builder

Factory for creating UDF expressions from Hive function classes.

/**
 * Builder for creating Hive UDF expressions
 */
object HiveUDFExpressionBuilder extends SparkUDFExpressionBuilder {
  
  /**
   * Create UDF expression from Hive function class
   * @param name Function name
   * @param clazz Hive UDF class
   * @param input Input expressions
   * @returns Appropriate UDF expression wrapper
   */
  override def makeExpression(
    name: String,
    clazz: Class[_],
    input: Seq[Expression]
  ): Expression
  
  /**
   * Check if class is a supported Hive UDF type
   * @param clazz Class to check
   * @returns true if supported UDF type
   */
  def isHiveUDF(clazz: Class[_]): Boolean
  
  /**
   * Get UDF type from class
   * @param clazz UDF class
   * @returns UDF type string
   */
  def getUDFType(clazz: Class[_]): String
}

Function Registration and Usage

Register Hive UDFs

/**
 * Register Hive UDF in Spark session
 * @param name Function name to register
 * @param className Fully qualified UDF class name
 * @param database Optional database name
 */
def registerHiveUDF(
  sparkSession: SparkSession,
  name: String,
  className: String,
  database: Option[String] = None
): Unit

UDF Discovery and Loading

/**
 * Discover and load Hive UDFs from classpath
 * @param sparkSession Active Spark session
 * @param packagePrefix Package prefix to scan
 * @returns Map of discovered UDF names to classes
 */
def discoverHiveUDFs(
  sparkSession: SparkSession,
  packagePrefix: String
): Map[String, Class[_]]

Usage Examples

Using Built-in Hive UDFs

import org.apache.spark.sql.SparkSession

val spark = SparkSession.builder()
  .enableHiveSupport()
  .getOrCreate()

// Use Hive built-in functions directly in SQL
val result = spark.sql("""
  SELECT 
    reflect('java.lang.Math', 'abs', -5) as abs_value,
    reflect('java.lang.String', 'valueOf', 42) as string_value,
    java_method('java.lang.Math', 'sqrt', 16.0) as sqrt_value
  FROM VALUES (1) as t(dummy)
""")

result.show()

Registering Custom Hive UDF

// Register custom Hive UDF class
spark.sql("""
  CREATE FUNCTION my_upper 
  AS 'com.example.MyUpperUDF'
  USING JAR '/path/to/my-udfs.jar'
""")

// Use the registered UDF
val df = spark.sql("""
  SELECT my_upper(name) as upper_name
  FROM users
""")

df.show()

Using Hive UDAF (Aggregate Function)

// Register custom UDAF
spark.sql("""
  CREATE FUNCTION my_collect_set
  AS 'com.example.MyCollectSetUDAF'
  USING JAR '/path/to/my-udafs.jar'
""")

// Use UDAF in aggregation
val aggregated = spark.sql("""
  SELECT 
    category,
    my_collect_set(product_name) as unique_products
  FROM products
  GROUP BY category
""")

aggregated.show()

Using Hive UDTF (Table Function)

// Register custom UDTF
spark.sql("""
  CREATE FUNCTION explode_json
  AS 'com.example.ExplodeJsonUDTF'
  USING JAR '/path/to/my-udtfs.jar'
""")

// Use UDTF to generate multiple rows
val exploded = spark.sql("""
  SELECT 
    id,
    exploded_col
  FROM events
  LATERAL VIEW explode_json(json_data) exploded_table AS exploded_col
""")

exploded.show()

Programmatic UDF Registration

import org.apache.spark.sql.hive.HiveUtils
import org.apache.spark.sql.catalyst.FunctionIdentifier
import org.apache.spark.sql.catalyst.catalog.CatalogFunction

// Create function definition
val functionDefinition = CatalogFunction(
  identifier = FunctionIdentifier("custom_concat", Some("default")),
  className = "com.example.CustomConcatUDF",
  resources = Seq(FunctionResource(JarResource, "/path/to/udf.jar"))
)

// Register through catalog
spark.sessionState.catalog.createFunction(
  functionDefinition,
  ignoreIfExists = true
)

// Verify registration
val functions = spark.catalog.listFunctions("default")
functions.filter(_.name == "custom_concat").show()

Advanced UDF Usage with Complex Types

// UDF that works with complex types (arrays, maps, structs)
spark.sql("""
  CREATE FUNCTION array_intersect
  AS 'org.apache.hadoop.hive.ql.udf.generic.GenericUDFArrayIntersect'
""")

val complexQuery = spark.sql("""
  SELECT 
    user_id,
    array_intersect(user_interests, recommended_items) as matching_interests
  FROM (
    SELECT 
      user_id,
      split(interests, ',') as user_interests,
      split(recommendations, ',') as recommended_items
    FROM user_profiles
  )
""")

complexQuery.show()

Using Reflection-based UDFs

// Use Java reflection for dynamic function calls
val reflectionQuery = spark.sql("""
  SELECT 
    user_name,
    reflect('java.lang.String', 'toLowerCase', user_name) as lower_name,
    reflect('java.util.UUID', 'randomUUID') as random_id,
    java_method('java.lang.System', 'currentTimeMillis') as current_time
  FROM users
""")

reflectionQuery.show()

Error Handling with UDFs

import org.apache.spark.sql.AnalysisException

try {
  // Attempt to use non-existent UDF
  val result = spark.sql("SELECT non_existent_udf(name) FROM users")
  result.collect()
} catch {
  case e: AnalysisException if e.getMessage.contains("Undefined function") =>
    println("UDF not found or not registered")
  case e: ClassNotFoundException =>
    println("UDF class not found in classpath")
  case e: Exception =>
    println(s"UDF execution error: ${e.getMessage}")
}

// Safe UDF usage with null handling
val safeQuery = spark.sql("""
  SELECT 
    user_id,
    CASE 
      WHEN user_data IS NOT NULL 
      THEN my_custom_udf(user_data)
      ELSE NULL
    END as processed_data
  FROM users
""")

Types

UDF Wrapper Types

/**
 * Wrapper for Hive function instances with proper class loading
 */
case class HiveFunctionWrapper(
  functionClassName: String,
  functionInstance: AnyRef
) {
  def createFunction[T](): T
  def getMethodInfo(): Array[Method]
}

/**
 * Resource specification for UDF dependencies
 */
case class FunctionResource(
  resourceType: FunctionResourceType,
  uri: String
)

sealed trait FunctionResourceType
case object JarResource extends FunctionResourceType
case object FileResource extends FunctionResourceType
case object ArchiveResource extends FunctionResourceType

Inspector Types

/**
 * Trait for working with Hive object inspectors
 */
trait HiveInspectors {
  def toInspector(dataType: DataType): ObjectInspector
  def wrapperFor(inspector: ObjectInspector, dataType: DataType): (Any) => Any
  def unwrap(data: Any, inspector: ObjectInspector): AnyRef
}

/**
 * Hive object inspector categories
 */
sealed trait ObjectInspectorCategory
case object PrimitiveObjectInspector extends ObjectInspectorCategory  
case object ListObjectInspector extends ObjectInspectorCategory
case object MapObjectInspector extends ObjectInspectorCategory
case object StructObjectInspector extends ObjectInspectorCategory