or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

configuration-utilities.mdfile-formats.mdhive-client.mdindex.mdmetastore-integration.mdquery-execution.mdsession-configuration.mdudf-support.md

udf-support.mddocs/

0

# Hive UDF Support

1

2

Integration support for Hive User Defined Functions (UDFs), User Defined Aggregate Functions (UDAFs), and User Defined Table Functions (UDTFs) within Spark SQL.

3

4

## Core Imports

5

6

```scala

7

import org.apache.spark.sql.hive.HiveSimpleUDF

8

import org.apache.spark.sql.hive.HiveGenericUDF

9

import org.apache.spark.sql.hive.HiveUDAFFunction

10

import org.apache.spark.sql.hive.HiveGenericUDTF

11

import org.apache.spark.sql.catalyst.expressions.Expression

12

import org.apache.spark.sql.hive.HiveUDFExpressionBuilder

13

```

14

15

## Capabilities

16

17

### Simple UDF Support

18

19

Wrapper for Hive Simple UDFs that operate on basic data types.

20

21

```scala { .api }

22

/**

23

* Wrapper for Hive Simple UDF that operates on basic types

24

* @param name Function name

25

* @param funcWrapper Hive UDF instance wrapper

26

* @param children Input expressions

27

*/

28

case class HiveSimpleUDF(

29

name: String,

30

funcWrapper: HiveFunctionWrapper,

31

children: Seq[Expression]

32

) extends Expression with HiveInspectors {

33

34

/** Evaluate UDF with input values */

35

override def eval(input: InternalRow): Any

36

37

/** Generate code for UDF evaluation */

38

override def doGenerate(ctx: CodegenContext, ev: ExprCode): ExprCode

39

40

/** Data type returned by this UDF */

41

override def dataType: DataType

42

43

/** Whether this UDF is deterministic */

44

override def deterministic: Boolean

45

46

/** String representation of UDF call */

47

override def prettyName: String

48

}

49

```

50

51

### Generic UDF Support

52

53

Wrapper for Hive Generic UDFs with complex type support and object inspectors.

54

55

```scala { .api }

56

/**

57

* Wrapper for Hive Generic UDF with complex type support

58

* @param name Function name

59

* @param funcWrapper Hive Generic UDF wrapper

60

* @param children Input expressions

61

*/

62

case class HiveGenericUDF(

63

name: String,

64

funcWrapper: HiveFunctionWrapper,

65

children: Seq[Expression]

66

) extends Expression with HiveInspectors {

67

68

/** Initialize UDF with object inspectors */

69

def initialize(objectInspectors: Array[ObjectInspector]): ObjectInspector

70

71

/** Evaluate UDF with Hive objects */

72

override def eval(input: InternalRow): Any

73

74

/** UDF return data type */

75

override def dataType: DataType

76

77

/** Whether UDF supports code generation */

78

override def supportCodegen: Boolean

79

80

/** Get UDF usage information */

81

def getDisplayString(children: Array[String]): String

82

}

83

```

84

85

### User Defined Aggregate Function (UDAF) Support

86

87

Support for Hive UDAFs that perform aggregation operations.

88

89

```scala { .api }

90

/**

91

* Wrapper for Hive User Defined Aggregate Functions

92

* @param name Function name

93

* @param funcWrapper Hive UDAF wrapper

94

* @param children Input expressions

95

* @param isDistinct Whether aggregation is distinct

96

*/

97

case class HiveUDAFFunction(

98

name: String,

99

funcWrapper: HiveFunctionWrapper,

100

children: Seq[Expression],

101

isDistinct: Boolean

102

) extends AggregateFunction with HiveInspectors {

103

104

/** Initialize aggregation buffer */

105

def createAggregationBuffer(): AggregationBuffer

106

107

/** Update aggregation buffer with new value */

108

def update(buffer: AggregationBuffer, input: InternalRow): Unit

109

110

/** Merge two aggregation buffers */

111

def merge(buffer1: AggregationBuffer, buffer2: AggregationBuffer): Unit

112

113

/** Get final aggregation result */

114

def evaluate(buffer: AggregationBuffer): Any

115

116

/** Aggregation buffer schema */

117

override def aggBufferSchema: StructType

118

119

/** Input aggregation buffer attributes */

120

override def inputAggBufferAttributes: Seq[AttributeReference]

121

}

122

```

123

124

### User Defined Table Function (UDTF) Support

125

126

Support for Hive UDTFs that generate multiple output rows from single input row.

127

128

```scala { .api }

129

/**

130

* Wrapper for Hive User Defined Table Functions

131

* @param name Function name

132

* @param funcWrapper Hive UDTF wrapper

133

* @param children Input expressions

134

*/

135

case class HiveGenericUDTF(

136

name: String,

137

funcWrapper: HiveFunctionWrapper,

138

children: Seq[Expression]

139

) extends Generator with HiveInspectors {

140

141

/** Initialize UDTF with object inspectors */

142

def initialize(objectInspectors: Array[ObjectInspector]): StructObjectInspector

143

144

/** Process input row and generate output rows */

145

def process(args: Array[AnyRef]): Unit

146

147

/** Signal end of input and flush any remaining output */

148

def close(): Unit

149

150

/** Generate output rows from input */

151

override def eval(input: InternalRow): TraversableOnce[InternalRow]

152

153

/** Output schema for generated rows */

154

override def outputSchema: StructType

155

156

/** Whether UDTF terminates on null input */

157

override def terminate: Boolean

158

}

159

```

160

161

### UDF Expression Builder

162

163

Factory for creating UDF expressions from Hive function classes.

164

165

```scala { .api }

166

/**

167

* Builder for creating Hive UDF expressions

168

*/

169

object HiveUDFExpressionBuilder extends SparkUDFExpressionBuilder {

170

171

/**

172

* Create UDF expression from Hive function class

173

* @param name Function name

174

* @param clazz Hive UDF class

175

* @param input Input expressions

176

* @returns Appropriate UDF expression wrapper

177

*/

178

override def makeExpression(

179

name: String,

180

clazz: Class[_],

181

input: Seq[Expression]

182

): Expression

183

184

/**

185

* Check if class is a supported Hive UDF type

186

* @param clazz Class to check

187

* @returns true if supported UDF type

188

*/

189

def isHiveUDF(clazz: Class[_]): Boolean

190

191

/**

192

* Get UDF type from class

193

* @param clazz UDF class

194

* @returns UDF type string

195

*/

196

def getUDFType(clazz: Class[_]): String

197

}

198

```

199

200

## Function Registration and Usage

201

202

### Register Hive UDFs

203

204

```scala { .api }

205

/**

206

* Register Hive UDF in Spark session

207

* @param name Function name to register

208

* @param className Fully qualified UDF class name

209

* @param database Optional database name

210

*/

211

def registerHiveUDF(

212

sparkSession: SparkSession,

213

name: String,

214

className: String,

215

database: Option[String] = None

216

): Unit

217

```

218

219

### UDF Discovery and Loading

220

221

```scala { .api }

222

/**

223

* Discover and load Hive UDFs from classpath

224

* @param sparkSession Active Spark session

225

* @param packagePrefix Package prefix to scan

226

* @returns Map of discovered UDF names to classes

227

*/

228

def discoverHiveUDFs(

229

sparkSession: SparkSession,

230

packagePrefix: String

231

): Map[String, Class[_]]

232

```

233

234

## Usage Examples

235

236

### Using Built-in Hive UDFs

237

238

```scala

239

import org.apache.spark.sql.SparkSession

240

241

val spark = SparkSession.builder()

242

.enableHiveSupport()

243

.getOrCreate()

244

245

// Use Hive built-in functions directly in SQL

246

val result = spark.sql("""

247

SELECT

248

reflect('java.lang.Math', 'abs', -5) as abs_value,

249

reflect('java.lang.String', 'valueOf', 42) as string_value,

250

java_method('java.lang.Math', 'sqrt', 16.0) as sqrt_value

251

FROM VALUES (1) as t(dummy)

252

""")

253

254

result.show()

255

```

256

257

### Registering Custom Hive UDF

258

259

```scala

260

// Register custom Hive UDF class

261

spark.sql("""

262

CREATE FUNCTION my_upper

263

AS 'com.example.MyUpperUDF'

264

USING JAR '/path/to/my-udfs.jar'

265

""")

266

267

// Use the registered UDF

268

val df = spark.sql("""

269

SELECT my_upper(name) as upper_name

270

FROM users

271

""")

272

273

df.show()

274

```

275

276

### Using Hive UDAF (Aggregate Function)

277

278

```scala

279

// Register custom UDAF

280

spark.sql("""

281

CREATE FUNCTION my_collect_set

282

AS 'com.example.MyCollectSetUDAF'

283

USING JAR '/path/to/my-udafs.jar'

284

""")

285

286

// Use UDAF in aggregation

287

val aggregated = spark.sql("""

288

SELECT

289

category,

290

my_collect_set(product_name) as unique_products

291

FROM products

292

GROUP BY category

293

""")

294

295

aggregated.show()

296

```

297

298

### Using Hive UDTF (Table Function)

299

300

```scala

301

// Register custom UDTF

302

spark.sql("""

303

CREATE FUNCTION explode_json

304

AS 'com.example.ExplodeJsonUDTF'

305

USING JAR '/path/to/my-udtfs.jar'

306

""")

307

308

// Use UDTF to generate multiple rows

309

val exploded = spark.sql("""

310

SELECT

311

id,

312

exploded_col

313

FROM events

314

LATERAL VIEW explode_json(json_data) exploded_table AS exploded_col

315

""")

316

317

exploded.show()

318

```

319

320

### Programmatic UDF Registration

321

322

```scala

323

import org.apache.spark.sql.hive.HiveUtils

324

import org.apache.spark.sql.catalyst.FunctionIdentifier

325

import org.apache.spark.sql.catalyst.catalog.CatalogFunction

326

327

// Create function definition

328

val functionDefinition = CatalogFunction(

329

identifier = FunctionIdentifier("custom_concat", Some("default")),

330

className = "com.example.CustomConcatUDF",

331

resources = Seq(FunctionResource(JarResource, "/path/to/udf.jar"))

332

)

333

334

// Register through catalog

335

spark.sessionState.catalog.createFunction(

336

functionDefinition,

337

ignoreIfExists = true

338

)

339

340

// Verify registration

341

val functions = spark.catalog.listFunctions("default")

342

functions.filter(_.name == "custom_concat").show()

343

```

344

345

### Advanced UDF Usage with Complex Types

346

347

```scala

348

// UDF that works with complex types (arrays, maps, structs)

349

spark.sql("""

350

CREATE FUNCTION array_intersect

351

AS 'org.apache.hadoop.hive.ql.udf.generic.GenericUDFArrayIntersect'

352

""")

353

354

val complexQuery = spark.sql("""

355

SELECT

356

user_id,

357

array_intersect(user_interests, recommended_items) as matching_interests

358

FROM (

359

SELECT

360

user_id,

361

split(interests, ',') as user_interests,

362

split(recommendations, ',') as recommended_items

363

FROM user_profiles

364

)

365

""")

366

367

complexQuery.show()

368

```

369

370

### Using Reflection-based UDFs

371

372

```scala

373

// Use Java reflection for dynamic function calls

374

val reflectionQuery = spark.sql("""

375

SELECT

376

user_name,

377

reflect('java.lang.String', 'toLowerCase', user_name) as lower_name,

378

reflect('java.util.UUID', 'randomUUID') as random_id,

379

java_method('java.lang.System', 'currentTimeMillis') as current_time

380

FROM users

381

""")

382

383

reflectionQuery.show()

384

```

385

386

### Error Handling with UDFs

387

388

```scala

389

import org.apache.spark.sql.AnalysisException

390

391

try {

392

// Attempt to use non-existent UDF

393

val result = spark.sql("SELECT non_existent_udf(name) FROM users")

394

result.collect()

395

} catch {

396

case e: AnalysisException if e.getMessage.contains("Undefined function") =>

397

println("UDF not found or not registered")

398

case e: ClassNotFoundException =>

399

println("UDF class not found in classpath")

400

case e: Exception =>

401

println(s"UDF execution error: ${e.getMessage}")

402

}

403

404

// Safe UDF usage with null handling

405

val safeQuery = spark.sql("""

406

SELECT

407

user_id,

408

CASE

409

WHEN user_data IS NOT NULL

410

THEN my_custom_udf(user_data)

411

ELSE NULL

412

END as processed_data

413

FROM users

414

""")

415

```

416

417

## Types

418

419

### UDF Wrapper Types

420

421

```scala { .api }

422

/**

423

* Wrapper for Hive function instances with proper class loading

424

*/

425

case class HiveFunctionWrapper(

426

functionClassName: String,

427

functionInstance: AnyRef

428

) {

429

def createFunction[T](): T

430

def getMethodInfo(): Array[Method]

431

}

432

433

/**

434

* Resource specification for UDF dependencies

435

*/

436

case class FunctionResource(

437

resourceType: FunctionResourceType,

438

uri: String

439

)

440

441

sealed trait FunctionResourceType

442

case object JarResource extends FunctionResourceType

443

case object FileResource extends FunctionResourceType

444

case object ArchiveResource extends FunctionResourceType

445

```

446

447

### Inspector Types

448

449

```scala { .api }

450

/**

451

* Trait for working with Hive object inspectors

452

*/

453

trait HiveInspectors {

454

def toInspector(dataType: DataType): ObjectInspector

455

def wrapperFor(inspector: ObjectInspector, dataType: DataType): (Any) => Any

456

def unwrap(data: Any, inspector: ObjectInspector): AnyRef

457

}

458

459

/**

460

* Hive object inspector categories

461

*/

462

sealed trait ObjectInspectorCategory

463

case object PrimitiveObjectInspector extends ObjectInspectorCategory

464

case object ListObjectInspector extends ObjectInspectorCategory

465

case object MapObjectInspector extends ObjectInspectorCategory

466

case object StructObjectInspector extends ObjectInspectorCategory

467

```