or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

configuration.mddata-type-conversion.mdfile-formats.mdindex.mdmetastore-operations.mdsession-management.mdudf-integration.md
tile.json

udf-integration.mddocs/

UDF Integration

Comprehensive support for Hive User-Defined Functions including simple UDFs, generic UDFs, table-generating functions (UDTFs), and aggregate functions (UDAFs). This enables seamless integration of existing Hive UDFs within Spark SQL queries.

Capabilities

Simple UDF Support

Wrapper for simple Hive UDFs that work with basic data types.

/**
 * Expression wrapper for simple Hive UDFs
 * Handles basic data type conversion between Hive and Catalyst
 */
case class HiveSimpleUDF(
  funcWrapper: HiveFunctionWrapper,
  children: Seq[Expression]
) extends Expression with HiveInspectors with CodegenFallback with Logging {
  
  override def dataType: DataType
  override def nullable: Boolean = true
  override def eval(input: InternalRow): Any
  override def prettyName: String
}

Usage Examples:

import org.apache.spark.sql.SparkSession

val spark = SparkSession.builder()
  .enableHiveSupport()
  .getOrCreate()

// Register and use simple Hive UDF
spark.sql("CREATE TEMPORARY FUNCTION simple_upper AS 'org.apache.hadoop.hive.ql.udf.UDFUpper'")
spark.sql("SELECT simple_upper('hello world') as result").show()
// Result: HELLO WORLD

// Use built-in Hive simple UDFs
spark.sql("SELECT substr('Apache Spark', 1, 6) as result").show()
// Result: Apache

Generic UDF Support

Wrapper for generic Hive UDFs that can handle complex data types and advanced operations.

/**
 * Expression wrapper for generic Hive UDFs
 * Supports complex data types and provides full ObjectInspector integration
 */
case class HiveGenericUDF(
  funcWrapper: HiveFunctionWrapper, 
  children: Seq[Expression]
) extends Expression with HiveInspectors with CodegenFallback with Logging {
  
  override def dataType: DataType
  override def nullable: Boolean = true
  override def eval(input: InternalRow): Any
  override def prettyName: String
}

Usage Examples:

// Register generic UDF for JSON processing
spark.sql("""
  CREATE TEMPORARY FUNCTION get_json_object 
  AS 'org.apache.hadoop.hive.ql.udf.generic.GenericUDFJsonObject'
""")

// Use generic UDF
val jsonData = """{"name": "Alice", "age": 25, "city": "NYC"}"""
spark.sql(s"""
  SELECT 
    get_json_object('$jsonData', '$$.name') as name,
    get_json_object('$jsonData', '$$.age') as age
""").show()

// Register custom generic UDF for array operations
spark.sql("""
  CREATE TEMPORARY FUNCTION array_contains
  AS 'org.apache.hadoop.hive.ql.udf.generic.GenericUDFArrayContains'  
""")

spark.sql("""
  SELECT array_contains(array(1,2,3,4), 3) as contains_three
""").show()
// Result: true

Table-Generating Functions (UDTF)

Support for Hive UDTFs that generate multiple rows from a single input row.

/**
 * Generator wrapper for Hive UDTFs (User-Defined Table-Generating Functions)
 * Converts single input rows into multiple output rows
 */  
case class HiveGenericUDTF(
  funcWrapper: HiveFunctionWrapper,
  children: Seq[Expression]
) extends Generator with HiveInspectors with CodegenFallback with Logging {
  
  override def elementSchema: StructType
  override def eval(input: InternalRow): TraversableOnce[InternalRow]
  override def prettyName: String
}

Usage Examples:

// Register explode UDTF for array expansion
spark.sql("""
  CREATE TEMPORARY FUNCTION hive_explode
  AS 'org.apache.hadoop.hive.ql.udf.generic.GenericUDTFExplode'
""")

// Use UDTF to expand arrays
spark.sql("""
  SELECT hive_explode(array('a', 'b', 'c')) as item
""").show()
// Results:
// +----+
// |item|
// +----+
// |  a |
// |  b |
// |  c |
// +----+

// Register stack UDTF for pivoting data
spark.sql("""
  CREATE TEMPORARY FUNCTION stack
  AS 'org.apache.hadoop.hive.ql.udf.generic.GenericUDTFStack'
""")

spark.sql("""
  SELECT stack(2, 'A', 1, 'B', 2) as (letter, number)
""").show()
// Results:
// +------+------+
// |letter|number|
// +------+------+
// |     A|     1|
// |     B|     2|
// +------+------+

Aggregate Functions (UDAF)

Support for Hive UDAFs that perform custom aggregation operations.

/**
 * Aggregate function wrapper for Hive UDAFs
 * Provides custom aggregation logic with proper state management
 */
case class HiveUDAFFunction(
  funcWrapper: HiveFunctionWrapper,
  children: Seq[Expression]
) extends TypedImperativeAggregate[Any] with HiveInspectors with Logging {
  
  override def nullable: Boolean = true
  override def dataType: DataType
  override def prettyName: String
  override def createAggregationBuffer(): Any
  override def update(buffer: Any, input: InternalRow): Any
  override def merge(buffer: Any, input: Any): Any
  override def eval(buffer: Any): Any
  override def serialize(buffer: Any): Array[Byte]
  override def deserialize(storageFormat: Array[Byte]): Any
}

Usage Examples:

// Register custom UDAF for advanced statistics
spark.sql("""
  CREATE TEMPORARY FUNCTION variance 
  AS 'org.apache.hadoop.hive.ql.udf.generic.GenericUDAFVariance'
""")

// Use UDAF in aggregation query
spark.sql("""
  SELECT 
    department,
    variance(salary) as salary_variance
  FROM employees 
  GROUP BY department
""").show()

// Register percentile UDAF
spark.sql("""
  CREATE TEMPORARY FUNCTION percentile_approx
  AS 'org.apache.hadoop.hive.ql.udf.generic.GenericUDAFPercentileApprox'
""")

spark.sql("""
  SELECT 
    percentile_approx(age, 0.5) as median_age,
    percentile_approx(age, 0.95) as p95_age
  FROM users
""").show()

HiveGenericUDTF (Table-Generating Functions)

Wrapper for Hive UDTFs that generate multiple rows from single input rows, extending the Generator interface.

/**
 * Generator wrapper for Hive UDTFs (User-Defined Table-Generating Functions)
 * Converts single input rows into multiple output rows with full schema support
 */
case class HiveGenericUDTF(
  funcWrapper: HiveFunctionWrapper,
  children: Seq[Expression]
) extends Generator with HiveInspectors with CodegenFallback with Logging {
  
  override def elementSchema: StructType
  override def eval(input: InternalRow): TraversableOnce[InternalRow]
  override def prettyName: String
  override def terminate(): TraversableOnce[InternalRow]
  override def close(): Unit
}

Usage Examples:

// Register custom UDTF for data expansion
spark.sql("""
  CREATE TEMPORARY FUNCTION my_explode_json
  AS 'com.example.JsonExplodeUDTF'
""")

// Use UDTF in LATERAL VIEW
spark.sql("""
  SELECT t.id, exploded.key, exploded.value
  FROM my_table t
  LATERAL VIEW my_explode_json(t.json_data) exploded AS key, value
""").show()

// Built-in UDTF examples
spark.sql("""
  SELECT explode(array('a', 'b', 'c')) as item
""").show()

spark.sql("""
  SELECT stack(3, 'col1', 1, 'col2', 2, 'col3', 3) as (name, value)
""").show()

HiveUDAFFunction (Aggregate Functions)

Wrapper for Hive UDAFs providing custom aggregation with proper state management and distributed execution support.

/**
 * Aggregate function wrapper for Hive UDAFs
 * Provides custom aggregation logic with proper state management for distributed execution
 */
case class HiveUDAFFunction(
  funcWrapper: HiveFunctionWrapper,
  children: Seq[Expression]
) extends TypedImperativeAggregate[Any] with HiveInspectors with Logging {
  
  override def nullable: Boolean = true
  override def dataType: DataType
  override def prettyName: String
  override def createAggregationBuffer(): Any
  override def update(buffer: Any, input: InternalRow): Any
  override def merge(buffer: Any, input: Any): Any
  override def eval(buffer: Any): Any
  override def serialize(buffer: Any): Array[Byte]
  override def deserialize(storageFormat: Array[Byte]): Any
}

Usage Examples:

// Register custom UDAF for advanced analytics
spark.sql("""
  CREATE TEMPORARY FUNCTION geometric_mean
  AS 'com.example.GeometricMeanUDAF'
""")

// Use UDAF in aggregation queries
spark.sql("""
  SELECT 
    department,
    geometric_mean(salary) as geo_mean_salary,
    percentile_approx(salary, 0.5) as median_salary
  FROM employees 
  GROUP BY department
""").show()

// Window function usage
spark.sql("""
  SELECT 
    name,
    salary,
    variance(salary) OVER (PARTITION BY department) as dept_variance
  FROM employees
""").show()

Function Registration and Management

Utilities for registering and managing Hive UDFs in Spark sessions.

/**
 * Function wrapper for Hive functions with class loading support
 */
case class HiveFunctionWrapper(functionClassName: String) {
  def createFunction[UDFType](): UDFType
}

Registration Examples:

import org.apache.spark.sql.SparkSession

val spark = SparkSession.builder()
  .enableHiveSupport()
  .getOrCreate()

// Register UDF from JAR
spark.sql("ADD JAR /path/to/custom-udfs.jar")
spark.sql("""
  CREATE TEMPORARY FUNCTION my_custom_function
  AS 'com.example.MyCustomUDF'
""")

// Register permanent function in metastore
spark.sql("""
  CREATE FUNCTION my_db.my_permanent_function
  AS 'com.example.MyPermanentUDF'
  USING JAR '/path/to/custom-udfs.jar'
""")

// List available functions
spark.sql("SHOW FUNCTIONS LIKE '*custom*'").show()

// Get function information
spark.sql("DESCRIBE FUNCTION my_custom_function").show(truncate = false)

UDF Type Integration

Support for complex Hive data types in UDF operations.

/**
 * HiveInspectors trait provides conversion utilities between
 * Hive ObjectInspectors and Catalyst data types
 */
trait HiveInspectors {
  /** Convert Java type to Catalyst DataType */
  def javaTypeToDataType(clz: Type): DataType
  
  /** Create wrapper for converting Catalyst data to Hive format */
  def wrapperFor(oi: ObjectInspector, dataType: DataType): Any => Any
  
  /** Create unwrapper for converting Hive data to Catalyst format */  
  def unwrapperFor(objectInspector: ObjectInspector): Any => Any
  
  /** Convert Catalyst DataType to Hive ObjectInspector */
  def toInspector(dataType: DataType): ObjectInspector
  
  /** Convert Hive ObjectInspector to Catalyst DataType */
  def inspectorToDataType(inspector: ObjectInspector): DataType
}

Complex Type Examples:

// Working with array types in UDFs
spark.sql("""
  SELECT 
    collect_list(name) as names,
    size(collect_list(name)) as count
  FROM users 
  GROUP BY department
""").show()

// Working with map types
spark.sql("""
  SELECT 
    str_to_map('key1:value1,key2:value2', ',', ':') as parsed_map
""").show()

// Working with struct types  
spark.sql("""
  SELECT 
    named_struct('name', 'Alice', 'age', 25) as person_info
""").show()

Built-in Hive Function Integration

Access to extensive set of built-in Hive functions.

String Functions:

// String manipulation functions
spark.sql("SELECT concat('Hello', ' ', 'World') as greeting").show()
spark.sql("SELECT upper('apache spark') as upper_case").show()
spark.sql("SELECT regexp_replace('abc123def', '[0-9]+', 'XXX') as replaced").show()

Date/Time Functions:

// Date and time functions
spark.sql("SELECT from_unixtime(unix_timestamp()) as current_time").show()
spark.sql("SELECT date_add('2023-01-01', 30) as future_date").show()
spark.sql("SELECT datediff('2023-12-31', '2023-01-01') as days_diff").show()

Mathematical Functions:

// Mathematical functions
spark.sql("SELECT round(3.14159, 2) as rounded").show()
spark.sql("SELECT pow(2, 3) as power").show()
spark.sql("SELECT greatest(1, 5, 3, 2) as max_value").show()

Conditional Functions:

// Conditional functions
spark.sql("""
  SELECT 
    name,
    CASE 
      WHEN age < 18 THEN 'Minor'
      WHEN age < 65 THEN 'Adult' 
      ELSE 'Senior'
    END as age_group
  FROM users
""").show()

spark.sql("SELECT nvl(null_column, 'default_value') as coalesced").show()

Error Handling

Common error patterns and exception handling for UDF operations:

import org.apache.spark.sql.AnalysisException

try {
  // Attempt to use non-existent UDF
  spark.sql("SELECT non_existent_udf('test')").show()
} catch {
  case e: AnalysisException if e.getMessage.contains("undefined function") =>
    println("UDF not found - check function registration")
  case e: Exception =>
    println(s"UDF execution error: ${e.getMessage}")
}

// Handle UDF registration errors
try {
  spark.sql("CREATE TEMPORARY FUNCTION bad_udf AS 'invalid.class.name'")
} catch {
  case e: ClassNotFoundException =>
    println("UDF class not found - check classpath")
  case e: Exception =>
    println(s"UDF registration failed: ${e.getMessage}")
}

Performance Considerations

Best practices for UDF performance:

// Prefer built-in functions over custom UDFs when possible
// GOOD: Use built-in functions
spark.sql("SELECT upper(name) FROM users")

// LESS OPTIMAL: Custom UDF for same functionality  
// spark.sql("SELECT my_upper_udf(name) FROM users")

// Use vectorized operations when available
// For Spark 2.4+, some Hive UDFs support vectorization
spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", "true")

// Register UDFs once per session to avoid repeated registration overhead
val spark = SparkSession.builder().enableHiveSupport().getOrCreate()
spark.sql("CREATE TEMPORARY FUNCTION my_udf AS 'com.example.MyUDF'")
// Reuse throughout session