0
# UDF Integration
1
2
Apache Spark Hive integration provides comprehensive support for executing Hive User-Defined Functions (UDFs), User-Defined Aggregate Functions (UDAFs), and User-Defined Table-Generating Functions (UDTFs) within Spark SQL queries.
3
4
## Overview
5
6
The UDF integration system allows Spark to execute existing Hive UDFs without modification, providing seamless compatibility with existing Hive-based data processing pipelines. Spark wraps Hive UDFs in specialized expression classes that handle the translation between Spark's internal row format and Hive's object format.
7
8
## HiveSimpleUDF
9
10
Wrapper for Hive simple UDFs that extend `org.apache.hadoop.hive.ql.exec.UDF`.
11
12
```scala { .api }
13
case class HiveSimpleUDF(
14
name: String,
15
funcWrapper: HiveFunctionWrapper,
16
children: Seq[Expression]
17
) extends Expression with CodegenFallback with Logging {
18
19
def eval(input: InternalRow): Any
20
def dataType: DataType
21
def nullable: Boolean
22
def prettyName: String = name
23
override def toString: String
24
}
25
```
26
27
### Usage Example
28
29
```scala
30
import org.apache.spark.sql.SparkSession
31
32
val spark = SparkSession.builder()
33
.enableHiveSupport()
34
.getOrCreate()
35
36
// Register a simple Hive UDF
37
spark.sql("""
38
CREATE TEMPORARY FUNCTION my_upper AS 'com.example.UpperCaseUDF'
39
""")
40
41
// Use the UDF in queries
42
val result = spark.sql("""
43
SELECT my_upper(name) as upper_name
44
FROM employee
45
""")
46
47
result.show()
48
```
49
50
### Creating Custom Simple UDFs
51
52
To create a Hive UDF that works with Spark:
53
54
```java
55
package com.example;
56
57
import org.apache.hadoop.hive.ql.exec.UDF;
58
import org.apache.hadoop.io.Text;
59
60
public class UpperCaseUDF extends UDF {
61
public Text evaluate(Text input) {
62
if (input == null) return null;
63
return new Text(input.toString().toUpperCase());
64
}
65
}
66
```
67
68
## HiveGenericUDF
69
70
Wrapper for Hive generic UDFs that extend `org.apache.hadoop.hive.ql.udf.generic.GenericUDF`.
71
72
```scala { .api }
73
case class HiveGenericUDF(
74
name: String,
75
funcWrapper: HiveFunctionWrapper,
76
children: Seq[Expression]
77
) extends Expression with CodegenFallback with Logging {
78
79
def eval(input: InternalRow): Any
80
def dataType: DataType
81
def nullable: Boolean
82
def prettyName: String = name
83
def foldable: Boolean
84
override def toString: String
85
}
86
```
87
88
### Usage Example
89
90
```scala
91
// Register a generic UDF
92
spark.sql("""
93
CREATE TEMPORARY FUNCTION json_extract AS 'org.apache.hadoop.hive.ql.udf.generic.GenericUDFJsonExtract'
94
""")
95
96
// Use the generic UDF
97
val result = spark.sql("""
98
SELECT json_extract(json_column, '$.name') as extracted_name
99
FROM json_table
100
""")
101
```
102
103
### Generic UDF Advantages
104
105
Generic UDFs provide more flexibility than simple UDFs:
106
- Support for complex data types (arrays, maps, structs)
107
- Runtime type checking and conversion
108
- Better performance through object inspector framework
109
- Support for variable arguments
110
111
## HiveGenericUDTF
112
113
Wrapper for Hive User-Defined Table-Generating Functions that produce multiple rows from a single input row.
114
115
```scala { .api }
116
case class HiveGenericUDTF(
117
name: String,
118
funcWrapper: HiveFunctionWrapper,
119
children: Seq[Expression]
120
) extends Generator with CodegenFallback with Logging {
121
122
def eval(input: InternalRow): TraversableOnce[InternalRow]
123
def terminate(): TraversableOnce[InternalRow]
124
def dataType: DataType
125
def nullable: Boolean
126
def prettyName: String = name
127
override def toString: String
128
}
129
```
130
131
### Usage Example
132
133
```scala
134
// Register a UDTF (e.g., explode-like function)
135
spark.sql("""
136
CREATE TEMPORARY FUNCTION split_words AS 'com.example.SplitWordsUDTF'
137
""")
138
139
// Use UDTF in LATERAL VIEW
140
val result = spark.sql("""
141
SELECT word
142
FROM sentences
143
LATERAL VIEW split_words(text) exploded_table AS word
144
""")
145
```
146
147
### UDTF Lifecycle
148
149
UDTFs follow a specific lifecycle:
150
1. **Initialize**: Setup phase with input object inspectors
151
2. **Process**: Called for each input row, may produce 0 or more output rows
152
3. **Termine**: Called at the end, may produce final output rows
153
154
## HiveUDAFFunction
155
156
Wrapper for Hive User-Defined Aggregate Functions that perform aggregation operations.
157
158
```scala { .api }
159
case class HiveUDAFFunction(
160
name: String,
161
funcWrapper: HiveFunctionWrapper,
162
children: Seq[Expression],
163
isUDAFBridgeRequired: Boolean = false,
164
mutableAggBufferOffset: Int = 0,
165
inputAggBufferOffset: Int = 0
166
) extends TypedImperativeAggregate[GenericUDAFEvaluator.AggregationBuffer] with Logging {
167
168
def createAggregationBuffer(): AggregationBuffer
169
def update(buffer: AggregationBuffer, input: InternalRow): AggregationBuffer
170
def merge(buffer: AggregationBuffer, input: AggregationBuffer): AggregationBuffer
171
def eval(buffer: AggregationBuffer): Any
172
def serialize(buffer: AggregationBuffer): Array[Byte]
173
def deserialize(storageFormat: Array[Byte]): AggregationBuffer
174
def prettyName: String = name
175
}
176
```
177
178
### Usage Example
179
180
```scala
181
// Register a UDAF
182
spark.sql("""
183
CREATE TEMPORARY FUNCTION my_avg AS 'com.example.AverageUDAF'
184
""")
185
186
// Use UDAF in aggregation query
187
val result = spark.sql("""
188
SELECT department, my_avg(salary) as avg_salary
189
FROM employee
190
GROUP BY department
191
""")
192
```
193
194
### UDAF Aggregation Process
195
196
UDAFs implement a distributed aggregation process:
197
1. **Partial aggregation**: Each partition computes partial results
198
2. **Merge**: Partial results are combined across partitions
199
3. **Final evaluation**: Final result is computed from merged state
200
201
## HiveFunctionWrapper
202
203
Core wrapper class that loads and manages Hive function instances.
204
205
```scala { .api }
206
case class HiveFunctionWrapper(
207
className: String,
208
instance: AnyRef
209
) extends Serializable {
210
211
def createFunction[T](): T
212
def getMethodName(): String
213
def getParameterTypes(): Array[Class[_]]
214
}
215
```
216
217
### Function Loading
218
219
```scala
220
// Create function wrapper
221
val wrapper = HiveFunctionWrapper("com.example.MyUDF", null)
222
223
// Create function instance
224
val udfInstance = wrapper.createFunction[UDF]()
225
```
226
227
## Function Registration
228
229
### Temporary Functions
230
231
Register functions for the current session:
232
233
```scala
234
// Register UDF from JAR
235
spark.sql("""
236
CREATE TEMPORARY FUNCTION my_func AS 'com.example.MyFunction'
237
""")
238
239
// Register with JAR location
240
spark.sql("""
241
CREATE FUNCTION my_func AS 'com.example.MyFunction'
242
USING JAR '/path/to/udf.jar'
243
""")
244
```
245
246
### Permanent Functions
247
248
Register functions in Hive metastore:
249
250
```scala
251
// Create permanent function
252
spark.sql("""
253
CREATE FUNCTION my_database.my_func AS 'com.example.MyFunction'
254
USING JAR 'hdfs://path/to/udf.jar'
255
""")
256
257
// Use permanent function
258
val result = spark.sql("""
259
SELECT my_database.my_func(column) FROM table
260
""")
261
```
262
263
### Function Discovery
264
265
List and inspect registered functions:
266
267
```scala
268
// List all functions
269
spark.sql("SHOW FUNCTIONS").show()
270
271
// List functions matching pattern
272
spark.sql("SHOW FUNCTIONS LIKE 'my_*'").show()
273
274
// Describe function
275
spark.sql("DESCRIBE FUNCTION my_func").show()
276
277
// Show extended info
278
spark.sql("DESCRIBE FUNCTION EXTENDED my_func").show()
279
```
280
281
## Data Type Mapping
282
283
Mapping between Hive and Spark data types in UDF integration:
284
285
### Primitive Types
286
- **BOOLEAN** ↔ BooleanType
287
- **TINYINT** ↔ ByteType
288
- **SMALLINT** ↔ ShortType
289
- **INT** ↔ IntegerType
290
- **BIGINT** ↔ LongType
291
- **FLOAT** ↔ FloatType
292
- **DOUBLE** ↔ DoubleType
293
- **STRING** ↔ StringType
294
- **BINARY** ↔ BinaryType
295
- **TIMESTAMP** ↔ TimestampType
296
- **DATE** ↔ DateType
297
298
### Complex Types
299
- **ARRAY\<T\>** ↔ ArrayType(T)
300
- **MAP\<K,V\>** ↔ MapType(K,V)
301
- **STRUCT\<...>** ↔ StructType(...)
302
303
### Usage in UDFs
304
305
```java
306
// Java UDF handling complex types
307
public class ComplexUDF extends GenericUDF {
308
@Override
309
public Object evaluate(DeferredObject[] arguments) throws HiveException {
310
// Handle array input
311
ListObjectInspector listOI = (ListObjectInspector) arguments[0].get();
312
List<?> inputList = listOI.getList(arguments[0].get());
313
314
// Process and return result
315
return result;
316
}
317
}
318
```
319
320
## Performance Considerations
321
322
### UDF Performance Tips
323
324
1. **Use Generic UDFs**: Better performance than simple UDFs for complex operations
325
2. **Minimize Object Creation**: Reuse objects where possible in UDF evaluation
326
3. **Leverage Vectorization**: Some UDFs can benefit from vectorized execution
327
4. **Consider Native Functions**: Use Spark's built-in functions when available
328
329
### Example Optimized UDF
330
331
```java
332
public class OptimizedUDF extends GenericUDF {
333
private Text result = new Text(); // Reuse object
334
335
@Override
336
public Object evaluate(DeferredObject[] arguments) throws HiveException {
337
String input = arguments[0].get().toString();
338
result.set(processString(input)); // Reuse Text object
339
return result;
340
}
341
}
342
```
343
344
## Error Handling
345
346
### Common UDF Errors
347
348
**ClassNotFoundException**: UDF class not found
349
```scala
350
// Solution: Add JAR to classpath
351
spark.sql("ADD JAR '/path/to/udf.jar'")
352
```
353
354
**Method Not Found**: Incorrect UDF method signature
355
```scala
356
// Ensure UDF implements correct evaluate() method
357
public Object evaluate(DeferredObject[] args) throws HiveException
358
```
359
360
**Serialization Issues**: UDF not serializable for distributed execution
361
```scala
362
// Make UDF implement Serializable or use transient fields
363
public class MyUDF extends GenericUDF implements Serializable
364
```
365
366
### Exception Handling in UDFs
367
368
```java
369
public class SafeUDF extends GenericUDF {
370
@Override
371
public Object evaluate(DeferredObject[] arguments) throws HiveException {
372
try {
373
// UDF logic
374
return processInput(arguments[0].get());
375
} catch (Exception e) {
376
// Handle errors gracefully
377
LOG.warn("UDF error: " + e.getMessage());
378
return null; // or appropriate default value
379
}
380
}
381
}
382
```
383
384
## Testing UDFs
385
386
### Unit Testing
387
388
```scala
389
import org.apache.spark.sql.test.SharedSparkSession
390
391
class UDFIntegrationSuite extends SparkFunSuite with SharedSparkSession {
392
test("custom UDF execution") {
393
spark.sql("CREATE TEMPORARY FUNCTION test_udf AS 'com.example.TestUDF'")
394
395
val result = spark.sql("SELECT test_udf('input') as output").collect()
396
assert(result(0).getString(0) == "expected_output")
397
}
398
}
399
```
400
401
### Integration Testing
402
403
```scala
404
// Test with actual Hive UDFs
405
class HiveUDFSuite extends SparkFunSuite with SharedSparkSession {
406
test("hive builtin UDF") {
407
val result = spark.sql("SELECT upper('hello') as upper_hello").collect()
408
assert(result(0).getString(0) == "HELLO")
409
}
410
}
411
```
412
413
## Migration and Compatibility
414
415
### Migrating from Hive
416
417
Most Hive UDFs work without modification in Spark:
418
419
1. **Copy JAR files** to Spark classpath
420
2. **Register functions** using CREATE FUNCTION
421
3. **Test functionality** with representative data
422
4. **Monitor performance** and optimize if needed
423
424
### Version Compatibility
425
426
UDF compatibility across Hive versions:
427
- **Simple UDFs**: Generally compatible across versions
428
- **Generic UDFs**: May require Hive version-specific compilation
429
- **Built-in UDFs**: Spark provides compatibility layer
430
431
## Types
432
433
```scala { .api }
434
// Base expression interface
435
trait Expression extends TreeNode[Expression] {
436
def dataType: DataType
437
def nullable: Boolean
438
def eval(input: InternalRow): Any
439
def prettyName: String
440
}
441
442
// Generator for table-generating functions
443
trait Generator extends Expression {
444
def eval(input: InternalRow): TraversableOnce[InternalRow]
445
def terminate(): TraversableOnce[InternalRow]
446
}
447
448
// Aggregate function interface
449
trait TypedImperativeAggregate[T] extends ImperativeAggregate {
450
def createAggregationBuffer(): T
451
def update(buffer: T, input: InternalRow): T
452
def merge(buffer: T, input: T): T
453
def eval(buffer: T): Any
454
}
455
456
// Hive UDAF aggregation buffer
457
trait AggregationBuffer {
458
def reset(): Unit
459
def copy(): AggregationBuffer
460
}
461
462
// Object inspector for Hive type system
463
trait ObjectInspector {
464
def getCategory(): ObjectInspector.Category
465
def getTypeName(): String
466
}
467
468
// Function wrapper for Hive functions
469
case class HiveFunctionWrapper(className: String, instance: AnyRef) extends Serializable {
470
def createFunction[T](): T
471
}
472
473
// Internal row representation
474
trait InternalRow {
475
def numFields: Int
476
def get(ordinal: Int, dataType: DataType): Any
477
def isNullAt(ordinal: Int): Boolean
478
}
479
```