0
# UDF Integration
1
2
Comprehensive support for Hive User-Defined Functions including simple UDFs, generic UDFs, table-generating functions (UDTFs), and aggregate functions (UDAFs). This enables seamless integration of existing Hive UDFs within Spark SQL queries.
3
4
## Capabilities
5
6
### Simple UDF Support
7
8
Wrapper for simple Hive UDFs that work with basic data types.
9
10
```scala { .api }
11
/**
12
* Expression wrapper for simple Hive UDFs
13
* Handles basic data type conversion between Hive and Catalyst
14
*/
15
case class HiveSimpleUDF(
16
funcWrapper: HiveFunctionWrapper,
17
children: Seq[Expression]
18
) extends Expression with HiveInspectors with CodegenFallback with Logging {
19
20
override def dataType: DataType
21
override def nullable: Boolean = true
22
override def eval(input: InternalRow): Any
23
override def prettyName: String
24
}
25
```
26
27
**Usage Examples:**
28
29
```scala
30
import org.apache.spark.sql.SparkSession
31
32
val spark = SparkSession.builder()
33
.enableHiveSupport()
34
.getOrCreate()
35
36
// Register and use simple Hive UDF
37
spark.sql("CREATE TEMPORARY FUNCTION simple_upper AS 'org.apache.hadoop.hive.ql.udf.UDFUpper'")
38
spark.sql("SELECT simple_upper('hello world') as result").show()
39
// Result: HELLO WORLD
40
41
// Use built-in Hive simple UDFs
42
spark.sql("SELECT substr('Apache Spark', 1, 6) as result").show()
43
// Result: Apache
44
```
45
46
### Generic UDF Support
47
48
Wrapper for generic Hive UDFs that can handle complex data types and advanced operations.
49
50
```scala { .api }
51
/**
52
* Expression wrapper for generic Hive UDFs
53
* Supports complex data types and provides full ObjectInspector integration
54
*/
55
case class HiveGenericUDF(
56
funcWrapper: HiveFunctionWrapper,
57
children: Seq[Expression]
58
) extends Expression with HiveInspectors with CodegenFallback with Logging {
59
60
override def dataType: DataType
61
override def nullable: Boolean = true
62
override def eval(input: InternalRow): Any
63
override def prettyName: String
64
}
65
```
66
67
**Usage Examples:**
68
69
```scala
70
// Register generic UDF for JSON processing
71
spark.sql("""
72
CREATE TEMPORARY FUNCTION get_json_object
73
AS 'org.apache.hadoop.hive.ql.udf.generic.GenericUDFJsonObject'
74
""")
75
76
// Use generic UDF
77
val jsonData = """{"name": "Alice", "age": 25, "city": "NYC"}"""
78
spark.sql(s"""
79
SELECT
80
get_json_object('$jsonData', '$$.name') as name,
81
get_json_object('$jsonData', '$$.age') as age
82
""").show()
83
84
// Register custom generic UDF for array operations
85
spark.sql("""
86
CREATE TEMPORARY FUNCTION array_contains
87
AS 'org.apache.hadoop.hive.ql.udf.generic.GenericUDFArrayContains'
88
""")
89
90
spark.sql("""
91
SELECT array_contains(array(1,2,3,4), 3) as contains_three
92
""").show()
93
// Result: true
94
```
95
96
### Table-Generating Functions (UDTF)
97
98
Support for Hive UDTFs that generate multiple rows from a single input row.
99
100
```scala { .api }
101
/**
102
* Generator wrapper for Hive UDTFs (User-Defined Table-Generating Functions)
103
* Converts single input rows into multiple output rows
104
*/
105
case class HiveGenericUDTF(
106
funcWrapper: HiveFunctionWrapper,
107
children: Seq[Expression]
108
) extends Generator with HiveInspectors with CodegenFallback with Logging {
109
110
override def elementSchema: StructType
111
override def eval(input: InternalRow): TraversableOnce[InternalRow]
112
override def prettyName: String
113
}
114
```
115
116
**Usage Examples:**
117
118
```scala
119
// Register explode UDTF for array expansion
120
spark.sql("""
121
CREATE TEMPORARY FUNCTION hive_explode
122
AS 'org.apache.hadoop.hive.ql.udf.generic.GenericUDTFExplode'
123
""")
124
125
// Use UDTF to expand arrays
126
spark.sql("""
127
SELECT hive_explode(array('a', 'b', 'c')) as item
128
""").show()
129
// Results:
130
// +----+
131
// |item|
132
// +----+
133
// | a |
134
// | b |
135
// | c |
136
// +----+
137
138
// Register stack UDTF for pivoting data
139
spark.sql("""
140
CREATE TEMPORARY FUNCTION stack
141
AS 'org.apache.hadoop.hive.ql.udf.generic.GenericUDTFStack'
142
""")
143
144
spark.sql("""
145
SELECT stack(2, 'A', 1, 'B', 2) as (letter, number)
146
""").show()
147
// Results:
148
// +------+------+
149
// |letter|number|
150
// +------+------+
151
// | A| 1|
152
// | B| 2|
153
// +------+------+
154
```
155
156
### Aggregate Functions (UDAF)
157
158
Support for Hive UDAFs that perform custom aggregation operations.
159
160
```scala { .api }
161
/**
162
* Aggregate function wrapper for Hive UDAFs
163
* Provides custom aggregation logic with proper state management
164
*/
165
case class HiveUDAFFunction(
166
funcWrapper: HiveFunctionWrapper,
167
children: Seq[Expression]
168
) extends TypedImperativeAggregate[Any] with HiveInspectors with Logging {
169
170
override def nullable: Boolean = true
171
override def dataType: DataType
172
override def prettyName: String
173
override def createAggregationBuffer(): Any
174
override def update(buffer: Any, input: InternalRow): Any
175
override def merge(buffer: Any, input: Any): Any
176
override def eval(buffer: Any): Any
177
override def serialize(buffer: Any): Array[Byte]
178
override def deserialize(storageFormat: Array[Byte]): Any
179
}
180
```
181
182
**Usage Examples:**
183
184
```scala
185
// Register custom UDAF for advanced statistics
186
spark.sql("""
187
CREATE TEMPORARY FUNCTION variance
188
AS 'org.apache.hadoop.hive.ql.udf.generic.GenericUDAFVariance'
189
""")
190
191
// Use UDAF in aggregation query
192
spark.sql("""
193
SELECT
194
department,
195
variance(salary) as salary_variance
196
FROM employees
197
GROUP BY department
198
""").show()
199
200
// Register percentile UDAF
201
spark.sql("""
202
CREATE TEMPORARY FUNCTION percentile_approx
203
AS 'org.apache.hadoop.hive.ql.udf.generic.GenericUDAFPercentileApprox'
204
""")
205
206
spark.sql("""
207
SELECT
208
percentile_approx(age, 0.5) as median_age,
209
percentile_approx(age, 0.95) as p95_age
210
FROM users
211
""").show()
212
```
213
214
### HiveGenericUDTF (Table-Generating Functions)
215
216
Wrapper for Hive UDTFs that generate multiple rows from single input rows, extending the Generator interface.
217
218
```scala { .api }
219
/**
220
* Generator wrapper for Hive UDTFs (User-Defined Table-Generating Functions)
221
* Converts single input rows into multiple output rows with full schema support
222
*/
223
case class HiveGenericUDTF(
224
funcWrapper: HiveFunctionWrapper,
225
children: Seq[Expression]
226
) extends Generator with HiveInspectors with CodegenFallback with Logging {
227
228
override def elementSchema: StructType
229
override def eval(input: InternalRow): TraversableOnce[InternalRow]
230
override def prettyName: String
231
override def terminate(): TraversableOnce[InternalRow]
232
override def close(): Unit
233
}
234
```
235
236
**Usage Examples:**
237
238
```scala
239
// Register custom UDTF for data expansion
240
spark.sql("""
241
CREATE TEMPORARY FUNCTION my_explode_json
242
AS 'com.example.JsonExplodeUDTF'
243
""")
244
245
// Use UDTF in LATERAL VIEW
246
spark.sql("""
247
SELECT t.id, exploded.key, exploded.value
248
FROM my_table t
249
LATERAL VIEW my_explode_json(t.json_data) exploded AS key, value
250
""").show()
251
252
// Built-in UDTF examples
253
spark.sql("""
254
SELECT explode(array('a', 'b', 'c')) as item
255
""").show()
256
257
spark.sql("""
258
SELECT stack(3, 'col1', 1, 'col2', 2, 'col3', 3) as (name, value)
259
""").show()
260
```
261
262
### HiveUDAFFunction (Aggregate Functions)
263
264
Wrapper for Hive UDAFs providing custom aggregation with proper state management and distributed execution support.
265
266
```scala { .api }
267
/**
268
* Aggregate function wrapper for Hive UDAFs
269
* Provides custom aggregation logic with proper state management for distributed execution
270
*/
271
case class HiveUDAFFunction(
272
funcWrapper: HiveFunctionWrapper,
273
children: Seq[Expression]
274
) extends TypedImperativeAggregate[Any] with HiveInspectors with Logging {
275
276
override def nullable: Boolean = true
277
override def dataType: DataType
278
override def prettyName: String
279
override def createAggregationBuffer(): Any
280
override def update(buffer: Any, input: InternalRow): Any
281
override def merge(buffer: Any, input: Any): Any
282
override def eval(buffer: Any): Any
283
override def serialize(buffer: Any): Array[Byte]
284
override def deserialize(storageFormat: Array[Byte]): Any
285
}
286
```
287
288
**Usage Examples:**
289
290
```scala
291
// Register custom UDAF for advanced analytics
292
spark.sql("""
293
CREATE TEMPORARY FUNCTION geometric_mean
294
AS 'com.example.GeometricMeanUDAF'
295
""")
296
297
// Use UDAF in aggregation queries
298
spark.sql("""
299
SELECT
300
department,
301
geometric_mean(salary) as geo_mean_salary,
302
percentile_approx(salary, 0.5) as median_salary
303
FROM employees
304
GROUP BY department
305
""").show()
306
307
// Window function usage
308
spark.sql("""
309
SELECT
310
name,
311
salary,
312
variance(salary) OVER (PARTITION BY department) as dept_variance
313
FROM employees
314
""").show()
315
```
316
317
### Function Registration and Management
318
319
Utilities for registering and managing Hive UDFs in Spark sessions.
320
321
```scala { .api }
322
/**
323
* Function wrapper for Hive functions with class loading support
324
*/
325
case class HiveFunctionWrapper(functionClassName: String) {
326
def createFunction[UDFType](): UDFType
327
}
328
```
329
330
**Registration Examples:**
331
332
```scala
333
import org.apache.spark.sql.SparkSession
334
335
val spark = SparkSession.builder()
336
.enableHiveSupport()
337
.getOrCreate()
338
339
// Register UDF from JAR
340
spark.sql("ADD JAR /path/to/custom-udfs.jar")
341
spark.sql("""
342
CREATE TEMPORARY FUNCTION my_custom_function
343
AS 'com.example.MyCustomUDF'
344
""")
345
346
// Register permanent function in metastore
347
spark.sql("""
348
CREATE FUNCTION my_db.my_permanent_function
349
AS 'com.example.MyPermanentUDF'
350
USING JAR '/path/to/custom-udfs.jar'
351
""")
352
353
// List available functions
354
spark.sql("SHOW FUNCTIONS LIKE '*custom*'").show()
355
356
// Get function information
357
spark.sql("DESCRIBE FUNCTION my_custom_function").show(truncate = false)
358
```
359
360
### UDF Type Integration
361
362
Support for complex Hive data types in UDF operations.
363
364
```scala { .api }
365
/**
366
* HiveInspectors trait provides conversion utilities between
367
* Hive ObjectInspectors and Catalyst data types
368
*/
369
trait HiveInspectors {
370
/** Convert Java type to Catalyst DataType */
371
def javaTypeToDataType(clz: Type): DataType
372
373
/** Create wrapper for converting Catalyst data to Hive format */
374
def wrapperFor(oi: ObjectInspector, dataType: DataType): Any => Any
375
376
/** Create unwrapper for converting Hive data to Catalyst format */
377
def unwrapperFor(objectInspector: ObjectInspector): Any => Any
378
379
/** Convert Catalyst DataType to Hive ObjectInspector */
380
def toInspector(dataType: DataType): ObjectInspector
381
382
/** Convert Hive ObjectInspector to Catalyst DataType */
383
def inspectorToDataType(inspector: ObjectInspector): DataType
384
}
385
```
386
387
**Complex Type Examples:**
388
389
```scala
390
// Working with array types in UDFs
391
spark.sql("""
392
SELECT
393
collect_list(name) as names,
394
size(collect_list(name)) as count
395
FROM users
396
GROUP BY department
397
""").show()
398
399
// Working with map types
400
spark.sql("""
401
SELECT
402
str_to_map('key1:value1,key2:value2', ',', ':') as parsed_map
403
""").show()
404
405
// Working with struct types
406
spark.sql("""
407
SELECT
408
named_struct('name', 'Alice', 'age', 25) as person_info
409
""").show()
410
```
411
412
### Built-in Hive Function Integration
413
414
Access to extensive set of built-in Hive functions.
415
416
**String Functions:**
417
```scala
418
// String manipulation functions
419
spark.sql("SELECT concat('Hello', ' ', 'World') as greeting").show()
420
spark.sql("SELECT upper('apache spark') as upper_case").show()
421
spark.sql("SELECT regexp_replace('abc123def', '[0-9]+', 'XXX') as replaced").show()
422
```
423
424
**Date/Time Functions:**
425
```scala
426
// Date and time functions
427
spark.sql("SELECT from_unixtime(unix_timestamp()) as current_time").show()
428
spark.sql("SELECT date_add('2023-01-01', 30) as future_date").show()
429
spark.sql("SELECT datediff('2023-12-31', '2023-01-01') as days_diff").show()
430
```
431
432
**Mathematical Functions:**
433
```scala
434
// Mathematical functions
435
spark.sql("SELECT round(3.14159, 2) as rounded").show()
436
spark.sql("SELECT pow(2, 3) as power").show()
437
spark.sql("SELECT greatest(1, 5, 3, 2) as max_value").show()
438
```
439
440
**Conditional Functions:**
441
```scala
442
// Conditional functions
443
spark.sql("""
444
SELECT
445
name,
446
CASE
447
WHEN age < 18 THEN 'Minor'
448
WHEN age < 65 THEN 'Adult'
449
ELSE 'Senior'
450
END as age_group
451
FROM users
452
""").show()
453
454
spark.sql("SELECT nvl(null_column, 'default_value') as coalesced").show()
455
```
456
457
## Error Handling
458
459
Common error patterns and exception handling for UDF operations:
460
461
```scala
462
import org.apache.spark.sql.AnalysisException
463
464
try {
465
// Attempt to use non-existent UDF
466
spark.sql("SELECT non_existent_udf('test')").show()
467
} catch {
468
case e: AnalysisException if e.getMessage.contains("undefined function") =>
469
println("UDF not found - check function registration")
470
case e: Exception =>
471
println(s"UDF execution error: ${e.getMessage}")
472
}
473
474
// Handle UDF registration errors
475
try {
476
spark.sql("CREATE TEMPORARY FUNCTION bad_udf AS 'invalid.class.name'")
477
} catch {
478
case e: ClassNotFoundException =>
479
println("UDF class not found - check classpath")
480
case e: Exception =>
481
println(s"UDF registration failed: ${e.getMessage}")
482
}
483
```
484
485
## Performance Considerations
486
487
Best practices for UDF performance:
488
489
```scala
490
// Prefer built-in functions over custom UDFs when possible
491
// GOOD: Use built-in functions
492
spark.sql("SELECT upper(name) FROM users")
493
494
// LESS OPTIMAL: Custom UDF for same functionality
495
// spark.sql("SELECT my_upper_udf(name) FROM users")
496
497
// Use vectorized operations when available
498
// For Spark 2.4+, some Hive UDFs support vectorization
499
spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", "true")
500
501
// Register UDFs once per session to avoid repeated registration overhead
502
val spark = SparkSession.builder().enableHiveSupport().getOrCreate()
503
spark.sql("CREATE TEMPORARY FUNCTION my_udf AS 'com.example.MyUDF'")
504
// Reuse throughout session
505
```