0
# User-Defined Function Support
1
2
Complete support for Hive UDFs, UDAFs, and UDTFs with automatic registration and execution within Spark queries. This module provides seamless integration allowing Hive user-defined functions to work natively within Spark SQL expressions.
3
4
## Capabilities
5
6
### Hive Simple UDF
7
8
Support for simple Hive UDFs that extend `org.apache.hadoop.hive.ql.exec.UDF`.
9
10
```scala { .api }
11
/**
12
* Expression wrapper for simple Hive UDFs
13
* @param name UDF name for display purposes
14
* @param funcWrapper Wrapper containing the UDF class
15
* @param children Input expressions to the UDF
16
*/
17
case class HiveSimpleUDF(
18
name: String,
19
funcWrapper: HiveFunctionWrapper,
20
children: Seq[Expression]
21
) extends Expression with HiveInspectors with UserDefinedExpression {
22
23
/**
24
* Return data type of the UDF
25
*/
26
lazy val dataType: DataType
27
28
/**
29
* Evaluate the UDF with given input
30
* @param input Input row
31
* @return UDF result
32
*/
33
def eval(input: InternalRow): Any
34
35
/**
36
* Pretty name for display
37
*/
38
def prettyName: String
39
40
/**
41
* SQL representation of the UDF call
42
*/
43
def sql: String
44
}
45
```
46
47
**Usage Example:**
48
49
```scala
50
import org.apache.spark.sql.hive.HiveFunctionWrapper
51
import org.apache.spark.sql.catalyst.expressions._
52
53
// Create wrapper for a Hive UDF class
54
val wrapper = HiveFunctionWrapper("com.example.MyHiveUDF")
55
56
// Create expression for the UDF
57
val udfExpr = HiveSimpleUDF(
58
name = "my_udf",
59
funcWrapper = wrapper,
60
children = Seq(Literal("input_string"))
61
)
62
63
// Use in expression evaluation
64
val result = udfExpr.eval(EmptyRow)
65
println(s"UDF result: $result")
66
```
67
68
### Hive Generic UDF
69
70
Support for generic Hive UDFs that extend `org.apache.hadoop.hive.ql.udf.generic.GenericUDF`.
71
72
```scala { .api }
73
/**
74
* Expression wrapper for generic Hive UDFs
75
* @param name UDF name for display purposes
76
* @param funcWrapper Wrapper containing the UDF class
77
* @param children Input expressions to the UDF
78
*/
79
case class HiveGenericUDF(
80
name: String,
81
funcWrapper: HiveFunctionWrapper,
82
children: Seq[Expression]
83
) extends Expression with HiveInspectors with UserDefinedExpression {
84
85
/**
86
* Return data type of the UDF
87
*/
88
lazy val dataType: DataType
89
90
/**
91
* Evaluate the UDF with given input
92
* @param input Input row
93
* @return UDF result
94
*/
95
def eval(input: InternalRow): Any
96
97
/**
98
* Pretty name for display
99
*/
100
def prettyName: String
101
102
/**
103
* SQL representation of the UDF call
104
*/
105
def sql: String
106
}
107
```
108
109
**Usage Example:**
110
111
```scala
112
// Generic UDF for complex type handling
113
val genericWrapper = HiveFunctionWrapper("org.apache.hadoop.hive.ql.udf.generic.GenericUDFCase")
114
115
val genericUDF = HiveGenericUDF(
116
name = "case_when",
117
funcWrapper = genericWrapper,
118
children = Seq(
119
Literal(true),
120
Literal("true_value"),
121
Literal("false_value")
122
)
123
)
124
125
val result = genericUDF.eval(EmptyRow)
126
```
127
128
### Hive User-Defined Table Function (UDTF)
129
130
Support for Hive UDTFs that generate multiple output rows from single input row.
131
132
```scala { .api }
133
/**
134
* Expression wrapper for Hive UDTFs
135
* @param name UDTF name for display purposes
136
* @param funcWrapper Wrapper containing the UDTF class
137
* @param children Input expressions to the UDTF
138
*/
139
case class HiveGenericUDTF(
140
name: String,
141
funcWrapper: HiveFunctionWrapper,
142
children: Seq[Expression]
143
) extends Generator with HiveInspectors with CodegenFallback with UserDefinedExpression {
144
145
/**
146
* Output schema of the UDTF
147
*/
148
lazy val elementSchema: StructType
149
150
/**
151
* Evaluate the UDTF and generate output rows
152
* @param input Input row
153
* @return Iterator of output rows
154
*/
155
def eval(input: InternalRow): IterableOnce[InternalRow]
156
157
/**
158
* Terminate the UDTF and return any final rows
159
* @return Iterator of final output rows
160
*/
161
def terminate(): IterableOnce[InternalRow]
162
163
/**
164
* Pretty name for display
165
*/
166
def prettyName: String
167
168
/**
169
* SQL representation of the UDTF call
170
*/
171
def sql: String
172
}
173
```
174
175
**Usage Example:**
176
177
```scala
178
// UDTF that splits strings into multiple rows
179
val udtfWrapper = HiveFunctionWrapper("org.apache.hadoop.hive.ql.udf.generic.GenericUDTFExplode")
180
181
val udtf = HiveGenericUDTF(
182
name = "explode",
183
funcWrapper = udtfWrapper,
184
children = Seq(Literal.create(Array("a", "b", "c"), ArrayType(StringType)))
185
)
186
187
// Evaluate UDTF
188
val outputRows = udtf.eval(EmptyRow).toSeq
189
println(s"UDTF generated ${outputRows.length} rows")
190
191
// Terminate to get any final rows
192
val finalRows = udtf.terminate().toSeq
193
```
194
195
### Hive User-Defined Aggregate Function (UDAF)
196
197
Support for Hive UDAFs that perform custom aggregation operations.
198
199
```scala { .api }
200
/**
201
* Expression wrapper for Hive UDAFs
202
* @param name UDAF name for display purposes
203
* @param funcWrapper Wrapper containing the UDAF class
204
* @param children Input expressions to the UDAF
205
* @param isUDAFBridgeRequired Whether UDAF bridge is needed
206
* @param mutableAggBufferOffset Offset in mutable aggregation buffer
207
* @param inputAggBufferOffset Offset in input aggregation buffer
208
*/
209
case class HiveUDAFFunction(
210
name: String,
211
funcWrapper: HiveFunctionWrapper,
212
children: Seq[Expression],
213
isUDAFBridgeRequired: Boolean,
214
mutableAggBufferOffset: Int,
215
inputAggBufferOffset: Int
216
) extends TypedImperativeAggregate[HiveUDAFBuffer] with HiveInspectors with UserDefinedExpression {
217
218
/**
219
* Create a new aggregation buffer
220
* @return New UDAF buffer
221
*/
222
def createAggregationBuffer(): HiveUDAFBuffer
223
224
/**
225
* Update aggregation buffer with new input
226
* @param buffer Current aggregation buffer
227
* @param input Input row
228
* @return Updated buffer
229
*/
230
def update(buffer: HiveUDAFBuffer, input: InternalRow): HiveUDAFBuffer
231
232
/**
233
* Merge two aggregation buffers
234
* @param buffer Target buffer
235
* @param input Source buffer to merge
236
* @return Merged buffer
237
*/
238
def merge(buffer: HiveUDAFBuffer, input: HiveUDAFBuffer): HiveUDAFBuffer
239
240
/**
241
* Get final result from aggregation buffer
242
* @param buffer Final aggregation buffer
243
* @return Aggregation result
244
*/
245
def eval(buffer: HiveUDAFBuffer): Any
246
247
/**
248
* Serialize aggregation buffer
249
* @param buffer Buffer to serialize
250
* @return Serialized buffer data
251
*/
252
def serialize(buffer: HiveUDAFBuffer): Array[Byte]
253
254
/**
255
* Deserialize aggregation buffer
256
* @param bytes Serialized buffer data
257
* @return Deserialized buffer
258
*/
259
def deserialize(bytes: Array[Byte]): HiveUDAFBuffer
260
261
/**
262
* Return data type of the aggregation result
263
*/
264
lazy val dataType: DataType
265
266
/**
267
* Pretty name for display
268
*/
269
def prettyName: String
270
271
/**
272
* SQL representation of the UDAF call
273
*/
274
def sql: String
275
}
276
```
277
278
**Usage Example:**
279
280
```scala
281
// Custom aggregation function
282
val udafWrapper = HiveFunctionWrapper("com.example.MyHiveUDAF")
283
284
val udaf = HiveUDAFFunction(
285
name = "my_aggregate",
286
funcWrapper = udafWrapper,
287
children = Seq(col("value")),
288
isUDAFBridgeRequired = false,
289
mutableAggBufferOffset = 0,
290
inputAggBufferOffset = 0
291
)
292
293
// Use in aggregation context
294
val buffer = udaf.createAggregationBuffer()
295
val updatedBuffer = udaf.update(buffer, inputRow)
296
val result = udaf.eval(updatedBuffer)
297
```
298
299
### UDAF Buffer
300
301
Buffer type for managing UDAF aggregation state.
302
303
```scala { .api }
304
/**
305
* Buffer for Hive UDAF operations
306
* @param buf Hive aggregation buffer
307
* @param canDoMerge Whether buffer supports merge operations
308
*/
309
case class HiveUDAFBuffer(buf: AggregationBuffer, canDoMerge: Boolean)
310
```
311
312
**Usage Example:**
313
314
```scala
315
import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator.AggregationBuffer
316
317
// Create UDAF buffer
318
val hiveBuffer: AggregationBuffer = // created by Hive UDAF
319
val buffer = HiveUDAFBuffer(hiveBuffer, canDoMerge = true)
320
321
// Use in UDAF operations
322
val serialized = udaf.serialize(buffer)
323
val deserialized = udaf.deserialize(serialized)
324
```
325
326
### Function Wrapper
327
328
Wrapper class for Hive function classes.
329
330
```scala { .api }
331
/**
332
* Wrapper for Hive function classes
333
* @param functionClassName Fully qualified class name of the Hive function
334
*/
335
case class HiveFunctionWrapper(functionClassName: String) {
336
337
/**
338
* Create instance of the wrapped function
339
* @return Instance of the Hive function
340
*/
341
def createFunction[T]: T
342
343
/**
344
* Get the function class
345
* @return Class object for the function
346
*/
347
def functionClass: Class[_]
348
}
349
```
350
351
**Usage Example:**
352
353
```scala
354
// Create wrapper
355
val wrapper = HiveFunctionWrapper("org.apache.hadoop.hive.ql.udf.UDFLength")
356
357
// Create function instance
358
val udfInstance = wrapper.createFunction[UDF]
359
360
// Get function class for reflection
361
val functionClass = wrapper.functionClass
362
println(s"Function class: ${functionClass.getName}")
363
```
364
365
### UDF Registration and Usage
366
367
Integration with Spark SQL for automatic UDF registration.
368
369
```scala { .api }
370
// Example of registering Hive UDF in Spark session
371
def registerHiveUDF(
372
spark: SparkSession,
373
name: String,
374
className: String
375
): Unit = {
376
377
val wrapper = HiveFunctionWrapper(className)
378
379
// Register as Spark UDF
380
spark.udf.register(name, (inputs: Seq[Any]) => {
381
val udf = HiveSimpleUDF(name, wrapper, inputs.map(Literal(_)))
382
udf.eval(EmptyRow)
383
})
384
}
385
```
386
387
**Usage Example:**
388
389
```scala
390
import org.apache.spark.sql.SparkSession
391
392
val spark = SparkSession.builder()
393
.appName("HiveUDFExample")
394
.enableHiveSupport()
395
.getOrCreate()
396
397
// Register custom Hive UDF
398
registerHiveUDF(spark, "my_length", "com.example.MyLengthUDF")
399
400
// Use in SQL
401
spark.sql("SELECT my_length(name) FROM users").show()
402
403
// Use in DataFrame API
404
import spark.implicits._
405
val df = Seq("hello", "world").toDF("text")
406
df.select(callUDF("my_length", $"text")).show()
407
```
408
409
### Advanced UDF Features
410
411
Support for complex UDF scenarios.
412
413
```scala { .api }
414
// UDF with complex input/output types
415
def createComplexUDF(
416
name: String,
417
className: String,
418
inputTypes: Seq[DataType],
419
outputType: DataType
420
): HiveGenericUDF = {
421
422
val wrapper = HiveFunctionWrapper(className)
423
val children = inputTypes.zipWithIndex.map { case (dataType, index) =>
424
BoundReference(index, dataType, nullable = true)
425
}
426
427
HiveGenericUDF(name, wrapper, children)
428
}
429
430
// UDTF with multiple output columns
431
def createMultiColumnUDTF(
432
name: String,
433
className: String,
434
inputExpression: Expression,
435
outputSchema: StructType
436
): HiveGenericUDTF = {
437
438
val wrapper = HiveFunctionWrapper(className)
439
440
HiveGenericUDTF(name, wrapper, Seq(inputExpression))
441
}
442
```
443
444
**Usage Example:**
445
446
```scala
447
import org.apache.spark.sql.types._
448
449
// Complex UDF with struct input/output
450
val complexUDF = createComplexUDF(
451
name = "process_struct",
452
className = "com.example.StructProcessorUDF",
453
inputTypes = Seq(StructType(Seq(
454
StructField("id", IntegerType, false),
455
StructField("name", StringType, true)
456
))),
457
outputType = StringType
458
)
459
460
// Multi-column UDTF
461
val outputSchema = StructType(Seq(
462
StructField("key", StringType, false),
463
StructField("value", StringType, true)
464
))
465
466
val multiColUDTF = createMultiColumnUDTF(
467
name = "split_pairs",
468
className = "com.example.SplitPairsUDTF",
469
inputExpression = Literal("key1:value1,key2:value2"),
470
outputSchema = outputSchema
471
)
472
```
473
474
### Error Handling
475
476
Common error patterns in UDF execution.
477
478
```scala { .api }
479
// Handle UDF execution errors
480
def safeEvaluateUDF(udf: HiveSimpleUDF, input: InternalRow): Option[Any] = {
481
try {
482
Some(udf.eval(input))
483
} catch {
484
case _: UDFArgumentException =>
485
println(s"Invalid arguments for UDF ${udf.name}")
486
None
487
case _: HiveException =>
488
println(s"Hive execution error in UDF ${udf.name}")
489
None
490
case e: Exception =>
491
println(s"Unexpected error in UDF ${udf.name}: ${e.getMessage}")
492
None
493
}
494
}
495
```
496
497
### Performance Considerations
498
499
Optimization tips for UDF usage.
500
501
```scala { .api }
502
// Cache UDF instances for repeated use
503
class UDFCache {
504
private val cache = mutable.Map[String, HiveFunctionWrapper]()
505
506
def getOrCreateWrapper(className: String): HiveFunctionWrapper = {
507
cache.getOrElseUpdate(className, HiveFunctionWrapper(className))
508
}
509
}
510
511
// Batch UDF evaluation
512
def batchEvaluateUDF(
513
udf: HiveSimpleUDF,
514
inputs: Seq[InternalRow]
515
): Seq[Any] = {
516
517
// Prepare UDF once
518
val preparedUDF = udf // UDF preparation happens lazily
519
520
// Evaluate for all inputs
521
inputs.map(preparedUDF.eval)
522
}
523
```
524
525
**Usage Example:**
526
527
```scala
528
val cache = new UDFCache()
529
530
// Reuse wrapper for multiple UDFs of same class
531
val wrapper1 = cache.getOrCreateWrapper("com.example.MyUDF")
532
val wrapper2 = cache.getOrCreateWrapper("com.example.MyUDF")
533
assert(wrapper1 eq wrapper2) // Same instance
534
535
// Batch evaluation
536
val inputs = Seq(
537
InternalRow(UTF8String.fromString("hello")),
538
InternalRow(UTF8String.fromString("world"))
539
)
540
541
val udf = HiveSimpleUDF("length", wrapper1, Seq(BoundReference(0, StringType, true)))
542
val results = batchEvaluateUDF(udf, inputs)
543
println(s"Batch results: ${results.mkString(", ")}")
544
```