0
# Hive UDF Support
1
2
Integration support for Hive User Defined Functions (UDFs), User Defined Aggregate Functions (UDAFs), and User Defined Table Functions (UDTFs) within Spark SQL.
3
4
## Core Imports
5
6
```scala
7
import org.apache.spark.sql.hive.HiveSimpleUDF
8
import org.apache.spark.sql.hive.HiveGenericUDF
9
import org.apache.spark.sql.hive.HiveUDAFFunction
10
import org.apache.spark.sql.hive.HiveGenericUDTF
11
import org.apache.spark.sql.catalyst.expressions.Expression
12
import org.apache.spark.sql.hive.HiveUDFExpressionBuilder
13
```
14
15
## Capabilities
16
17
### Simple UDF Support
18
19
Wrapper for Hive Simple UDFs that operate on basic data types.
20
21
```scala { .api }
22
/**
23
* Wrapper for Hive Simple UDF that operates on basic types
24
* @param name Function name
25
* @param funcWrapper Hive UDF instance wrapper
26
* @param children Input expressions
27
*/
28
case class HiveSimpleUDF(
29
name: String,
30
funcWrapper: HiveFunctionWrapper,
31
children: Seq[Expression]
32
) extends Expression with HiveInspectors {
33
34
/** Evaluate UDF with input values */
35
override def eval(input: InternalRow): Any
36
37
/** Generate code for UDF evaluation */
38
override def doGenerate(ctx: CodegenContext, ev: ExprCode): ExprCode
39
40
/** Data type returned by this UDF */
41
override def dataType: DataType
42
43
/** Whether this UDF is deterministic */
44
override def deterministic: Boolean
45
46
/** String representation of UDF call */
47
override def prettyName: String
48
}
49
```
50
51
### Generic UDF Support
52
53
Wrapper for Hive Generic UDFs with complex type support and object inspectors.
54
55
```scala { .api }
56
/**
57
* Wrapper for Hive Generic UDF with complex type support
58
* @param name Function name
59
* @param funcWrapper Hive Generic UDF wrapper
60
* @param children Input expressions
61
*/
62
case class HiveGenericUDF(
63
name: String,
64
funcWrapper: HiveFunctionWrapper,
65
children: Seq[Expression]
66
) extends Expression with HiveInspectors {
67
68
/** Initialize UDF with object inspectors */
69
def initialize(objectInspectors: Array[ObjectInspector]): ObjectInspector
70
71
/** Evaluate UDF with Hive objects */
72
override def eval(input: InternalRow): Any
73
74
/** UDF return data type */
75
override def dataType: DataType
76
77
/** Whether UDF supports code generation */
78
override def supportCodegen: Boolean
79
80
/** Get UDF usage information */
81
def getDisplayString(children: Array[String]): String
82
}
83
```
84
85
### User Defined Aggregate Function (UDAF) Support
86
87
Support for Hive UDAFs that perform aggregation operations.
88
89
```scala { .api }
90
/**
91
* Wrapper for Hive User Defined Aggregate Functions
92
* @param name Function name
93
* @param funcWrapper Hive UDAF wrapper
94
* @param children Input expressions
95
* @param isDistinct Whether aggregation is distinct
96
*/
97
case class HiveUDAFFunction(
98
name: String,
99
funcWrapper: HiveFunctionWrapper,
100
children: Seq[Expression],
101
isDistinct: Boolean
102
) extends AggregateFunction with HiveInspectors {
103
104
/** Initialize aggregation buffer */
105
def createAggregationBuffer(): AggregationBuffer
106
107
/** Update aggregation buffer with new value */
108
def update(buffer: AggregationBuffer, input: InternalRow): Unit
109
110
/** Merge two aggregation buffers */
111
def merge(buffer1: AggregationBuffer, buffer2: AggregationBuffer): Unit
112
113
/** Get final aggregation result */
114
def evaluate(buffer: AggregationBuffer): Any
115
116
/** Aggregation buffer schema */
117
override def aggBufferSchema: StructType
118
119
/** Input aggregation buffer attributes */
120
override def inputAggBufferAttributes: Seq[AttributeReference]
121
}
122
```
123
124
### User Defined Table Function (UDTF) Support
125
126
Support for Hive UDTFs that generate multiple output rows from single input row.
127
128
```scala { .api }
129
/**
130
* Wrapper for Hive User Defined Table Functions
131
* @param name Function name
132
* @param funcWrapper Hive UDTF wrapper
133
* @param children Input expressions
134
*/
135
case class HiveGenericUDTF(
136
name: String,
137
funcWrapper: HiveFunctionWrapper,
138
children: Seq[Expression]
139
) extends Generator with HiveInspectors {
140
141
/** Initialize UDTF with object inspectors */
142
def initialize(objectInspectors: Array[ObjectInspector]): StructObjectInspector
143
144
/** Process input row and generate output rows */
145
def process(args: Array[AnyRef]): Unit
146
147
/** Signal end of input and flush any remaining output */
148
def close(): Unit
149
150
/** Generate output rows from input */
151
override def eval(input: InternalRow): TraversableOnce[InternalRow]
152
153
/** Output schema for generated rows */
154
override def outputSchema: StructType
155
156
/** Whether UDTF terminates on null input */
157
override def terminate: Boolean
158
}
159
```
160
161
### UDF Expression Builder
162
163
Factory for creating UDF expressions from Hive function classes.
164
165
```scala { .api }
166
/**
167
* Builder for creating Hive UDF expressions
168
*/
169
object HiveUDFExpressionBuilder extends SparkUDFExpressionBuilder {
170
171
/**
172
* Create UDF expression from Hive function class
173
* @param name Function name
174
* @param clazz Hive UDF class
175
* @param input Input expressions
176
* @returns Appropriate UDF expression wrapper
177
*/
178
override def makeExpression(
179
name: String,
180
clazz: Class[_],
181
input: Seq[Expression]
182
): Expression
183
184
/**
185
* Check if class is a supported Hive UDF type
186
* @param clazz Class to check
187
* @returns true if supported UDF type
188
*/
189
def isHiveUDF(clazz: Class[_]): Boolean
190
191
/**
192
* Get UDF type from class
193
* @param clazz UDF class
194
* @returns UDF type string
195
*/
196
def getUDFType(clazz: Class[_]): String
197
}
198
```
199
200
## Function Registration and Usage
201
202
### Register Hive UDFs
203
204
```scala { .api }
205
/**
206
* Register Hive UDF in Spark session
207
* @param name Function name to register
208
* @param className Fully qualified UDF class name
209
* @param database Optional database name
210
*/
211
def registerHiveUDF(
212
sparkSession: SparkSession,
213
name: String,
214
className: String,
215
database: Option[String] = None
216
): Unit
217
```
218
219
### UDF Discovery and Loading
220
221
```scala { .api }
222
/**
223
* Discover and load Hive UDFs from classpath
224
* @param sparkSession Active Spark session
225
* @param packagePrefix Package prefix to scan
226
* @returns Map of discovered UDF names to classes
227
*/
228
def discoverHiveUDFs(
229
sparkSession: SparkSession,
230
packagePrefix: String
231
): Map[String, Class[_]]
232
```
233
234
## Usage Examples
235
236
### Using Built-in Hive UDFs
237
238
```scala
239
import org.apache.spark.sql.SparkSession
240
241
val spark = SparkSession.builder()
242
.enableHiveSupport()
243
.getOrCreate()
244
245
// Use Hive built-in functions directly in SQL
246
val result = spark.sql("""
247
SELECT
248
reflect('java.lang.Math', 'abs', -5) as abs_value,
249
reflect('java.lang.String', 'valueOf', 42) as string_value,
250
java_method('java.lang.Math', 'sqrt', 16.0) as sqrt_value
251
FROM VALUES (1) as t(dummy)
252
""")
253
254
result.show()
255
```
256
257
### Registering Custom Hive UDF
258
259
```scala
260
// Register custom Hive UDF class
261
spark.sql("""
262
CREATE FUNCTION my_upper
263
AS 'com.example.MyUpperUDF'
264
USING JAR '/path/to/my-udfs.jar'
265
""")
266
267
// Use the registered UDF
268
val df = spark.sql("""
269
SELECT my_upper(name) as upper_name
270
FROM users
271
""")
272
273
df.show()
274
```
275
276
### Using Hive UDAF (Aggregate Function)
277
278
```scala
279
// Register custom UDAF
280
spark.sql("""
281
CREATE FUNCTION my_collect_set
282
AS 'com.example.MyCollectSetUDAF'
283
USING JAR '/path/to/my-udafs.jar'
284
""")
285
286
// Use UDAF in aggregation
287
val aggregated = spark.sql("""
288
SELECT
289
category,
290
my_collect_set(product_name) as unique_products
291
FROM products
292
GROUP BY category
293
""")
294
295
aggregated.show()
296
```
297
298
### Using Hive UDTF (Table Function)
299
300
```scala
301
// Register custom UDTF
302
spark.sql("""
303
CREATE FUNCTION explode_json
304
AS 'com.example.ExplodeJsonUDTF'
305
USING JAR '/path/to/my-udtfs.jar'
306
""")
307
308
// Use UDTF to generate multiple rows
309
val exploded = spark.sql("""
310
SELECT
311
id,
312
exploded_col
313
FROM events
314
LATERAL VIEW explode_json(json_data) exploded_table AS exploded_col
315
""")
316
317
exploded.show()
318
```
319
320
### Programmatic UDF Registration
321
322
```scala
323
import org.apache.spark.sql.hive.HiveUtils
324
import org.apache.spark.sql.catalyst.FunctionIdentifier
325
import org.apache.spark.sql.catalyst.catalog.CatalogFunction
326
327
// Create function definition
328
val functionDefinition = CatalogFunction(
329
identifier = FunctionIdentifier("custom_concat", Some("default")),
330
className = "com.example.CustomConcatUDF",
331
resources = Seq(FunctionResource(JarResource, "/path/to/udf.jar"))
332
)
333
334
// Register through catalog
335
spark.sessionState.catalog.createFunction(
336
functionDefinition,
337
ignoreIfExists = true
338
)
339
340
// Verify registration
341
val functions = spark.catalog.listFunctions("default")
342
functions.filter(_.name == "custom_concat").show()
343
```
344
345
### Advanced UDF Usage with Complex Types
346
347
```scala
348
// UDF that works with complex types (arrays, maps, structs)
349
spark.sql("""
350
CREATE FUNCTION array_intersect
351
AS 'org.apache.hadoop.hive.ql.udf.generic.GenericUDFArrayIntersect'
352
""")
353
354
val complexQuery = spark.sql("""
355
SELECT
356
user_id,
357
array_intersect(user_interests, recommended_items) as matching_interests
358
FROM (
359
SELECT
360
user_id,
361
split(interests, ',') as user_interests,
362
split(recommendations, ',') as recommended_items
363
FROM user_profiles
364
)
365
""")
366
367
complexQuery.show()
368
```
369
370
### Using Reflection-based UDFs
371
372
```scala
373
// Use Java reflection for dynamic function calls
374
val reflectionQuery = spark.sql("""
375
SELECT
376
user_name,
377
reflect('java.lang.String', 'toLowerCase', user_name) as lower_name,
378
reflect('java.util.UUID', 'randomUUID') as random_id,
379
java_method('java.lang.System', 'currentTimeMillis') as current_time
380
FROM users
381
""")
382
383
reflectionQuery.show()
384
```
385
386
### Error Handling with UDFs
387
388
```scala
389
import org.apache.spark.sql.AnalysisException
390
391
try {
392
// Attempt to use non-existent UDF
393
val result = spark.sql("SELECT non_existent_udf(name) FROM users")
394
result.collect()
395
} catch {
396
case e: AnalysisException if e.getMessage.contains("Undefined function") =>
397
println("UDF not found or not registered")
398
case e: ClassNotFoundException =>
399
println("UDF class not found in classpath")
400
case e: Exception =>
401
println(s"UDF execution error: ${e.getMessage}")
402
}
403
404
// Safe UDF usage with null handling
405
val safeQuery = spark.sql("""
406
SELECT
407
user_id,
408
CASE
409
WHEN user_data IS NOT NULL
410
THEN my_custom_udf(user_data)
411
ELSE NULL
412
END as processed_data
413
FROM users
414
""")
415
```
416
417
## Types
418
419
### UDF Wrapper Types
420
421
```scala { .api }
422
/**
423
* Wrapper for Hive function instances with proper class loading
424
*/
425
case class HiveFunctionWrapper(
426
functionClassName: String,
427
functionInstance: AnyRef
428
) {
429
def createFunction[T](): T
430
def getMethodInfo(): Array[Method]
431
}
432
433
/**
434
* Resource specification for UDF dependencies
435
*/
436
case class FunctionResource(
437
resourceType: FunctionResourceType,
438
uri: String
439
)
440
441
sealed trait FunctionResourceType
442
case object JarResource extends FunctionResourceType
443
case object FileResource extends FunctionResourceType
444
case object ArchiveResource extends FunctionResourceType
445
```
446
447
### Inspector Types
448
449
```scala { .api }
450
/**
451
* Trait for working with Hive object inspectors
452
*/
453
trait HiveInspectors {
454
def toInspector(dataType: DataType): ObjectInspector
455
def wrapperFor(inspector: ObjectInspector, dataType: DataType): (Any) => Any
456
def unwrap(data: Any, inspector: ObjectInspector): AnyRef
457
}
458
459
/**
460
* Hive object inspector categories
461
*/
462
sealed trait ObjectInspectorCategory
463
case object PrimitiveObjectInspector extends ObjectInspectorCategory
464
case object ListObjectInspector extends ObjectInspectorCategory
465
case object MapObjectInspector extends ObjectInspectorCategory
466
case object StructObjectInspector extends ObjectInspectorCategory
467
```