or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

configuration.mddata-conversion.mdexternal-catalog.mdhive-client.mdindex.mdudf-support.md

udf-support.mddocs/

0

# User-Defined Function Support

1

2

Complete support for Hive UDFs, UDAFs, and UDTFs with automatic registration and execution within Spark queries. This module provides seamless integration allowing Hive user-defined functions to work natively within Spark SQL expressions.

3

4

## Capabilities

5

6

### Hive Simple UDF

7

8

Support for simple Hive UDFs that extend `org.apache.hadoop.hive.ql.exec.UDF`.

9

10

```scala { .api }

11

/**

12

* Expression wrapper for simple Hive UDFs

13

* @param name UDF name for display purposes

14

* @param funcWrapper Wrapper containing the UDF class

15

* @param children Input expressions to the UDF

16

*/

17

case class HiveSimpleUDF(

18

name: String,

19

funcWrapper: HiveFunctionWrapper,

20

children: Seq[Expression]

21

) extends Expression with HiveInspectors with UserDefinedExpression {

22

23

/**

24

* Return data type of the UDF

25

*/

26

lazy val dataType: DataType

27

28

/**

29

* Evaluate the UDF with given input

30

* @param input Input row

31

* @return UDF result

32

*/

33

def eval(input: InternalRow): Any

34

35

/**

36

* Pretty name for display

37

*/

38

def prettyName: String

39

40

/**

41

* SQL representation of the UDF call

42

*/

43

def sql: String

44

}

45

```

46

47

**Usage Example:**

48

49

```scala

50

import org.apache.spark.sql.hive.HiveFunctionWrapper

51

import org.apache.spark.sql.catalyst.expressions._

52

53

// Create wrapper for a Hive UDF class

54

val wrapper = HiveFunctionWrapper("com.example.MyHiveUDF")

55

56

// Create expression for the UDF

57

val udfExpr = HiveSimpleUDF(

58

name = "my_udf",

59

funcWrapper = wrapper,

60

children = Seq(Literal("input_string"))

61

)

62

63

// Use in expression evaluation

64

val result = udfExpr.eval(EmptyRow)

65

println(s"UDF result: $result")

66

```

67

68

### Hive Generic UDF

69

70

Support for generic Hive UDFs that extend `org.apache.hadoop.hive.ql.udf.generic.GenericUDF`.

71

72

```scala { .api }

73

/**

74

* Expression wrapper for generic Hive UDFs

75

* @param name UDF name for display purposes

76

* @param funcWrapper Wrapper containing the UDF class

77

* @param children Input expressions to the UDF

78

*/

79

case class HiveGenericUDF(

80

name: String,

81

funcWrapper: HiveFunctionWrapper,

82

children: Seq[Expression]

83

) extends Expression with HiveInspectors with UserDefinedExpression {

84

85

/**

86

* Return data type of the UDF

87

*/

88

lazy val dataType: DataType

89

90

/**

91

* Evaluate the UDF with given input

92

* @param input Input row

93

* @return UDF result

94

*/

95

def eval(input: InternalRow): Any

96

97

/**

98

* Pretty name for display

99

*/

100

def prettyName: String

101

102

/**

103

* SQL representation of the UDF call

104

*/

105

def sql: String

106

}

107

```

108

109

**Usage Example:**

110

111

```scala

112

// Generic UDF for complex type handling

113

val genericWrapper = HiveFunctionWrapper("org.apache.hadoop.hive.ql.udf.generic.GenericUDFCase")

114

115

val genericUDF = HiveGenericUDF(

116

name = "case_when",

117

funcWrapper = genericWrapper,

118

children = Seq(

119

Literal(true),

120

Literal("true_value"),

121

Literal("false_value")

122

)

123

)

124

125

val result = genericUDF.eval(EmptyRow)

126

```

127

128

### Hive User-Defined Table Function (UDTF)

129

130

Support for Hive UDTFs that generate multiple output rows from single input row.

131

132

```scala { .api }

133

/**

134

* Expression wrapper for Hive UDTFs

135

* @param name UDTF name for display purposes

136

* @param funcWrapper Wrapper containing the UDTF class

137

* @param children Input expressions to the UDTF

138

*/

139

case class HiveGenericUDTF(

140

name: String,

141

funcWrapper: HiveFunctionWrapper,

142

children: Seq[Expression]

143

) extends Generator with HiveInspectors with CodegenFallback with UserDefinedExpression {

144

145

/**

146

* Output schema of the UDTF

147

*/

148

lazy val elementSchema: StructType

149

150

/**

151

* Evaluate the UDTF and generate output rows

152

* @param input Input row

153

* @return Iterator of output rows

154

*/

155

def eval(input: InternalRow): IterableOnce[InternalRow]

156

157

/**

158

* Terminate the UDTF and return any final rows

159

* @return Iterator of final output rows

160

*/

161

def terminate(): IterableOnce[InternalRow]

162

163

/**

164

* Pretty name for display

165

*/

166

def prettyName: String

167

168

/**

169

* SQL representation of the UDTF call

170

*/

171

def sql: String

172

}

173

```

174

175

**Usage Example:**

176

177

```scala

178

// UDTF that splits strings into multiple rows

179

val udtfWrapper = HiveFunctionWrapper("org.apache.hadoop.hive.ql.udf.generic.GenericUDTFExplode")

180

181

val udtf = HiveGenericUDTF(

182

name = "explode",

183

funcWrapper = udtfWrapper,

184

children = Seq(Literal.create(Array("a", "b", "c"), ArrayType(StringType)))

185

)

186

187

// Evaluate UDTF

188

val outputRows = udtf.eval(EmptyRow).toSeq

189

println(s"UDTF generated ${outputRows.length} rows")

190

191

// Terminate to get any final rows

192

val finalRows = udtf.terminate().toSeq

193

```

194

195

### Hive User-Defined Aggregate Function (UDAF)

196

197

Support for Hive UDAFs that perform custom aggregation operations.

198

199

```scala { .api }

200

/**

201

* Expression wrapper for Hive UDAFs

202

* @param name UDAF name for display purposes

203

* @param funcWrapper Wrapper containing the UDAF class

204

* @param children Input expressions to the UDAF

205

* @param isUDAFBridgeRequired Whether UDAF bridge is needed

206

* @param mutableAggBufferOffset Offset in mutable aggregation buffer

207

* @param inputAggBufferOffset Offset in input aggregation buffer

208

*/

209

case class HiveUDAFFunction(

210

name: String,

211

funcWrapper: HiveFunctionWrapper,

212

children: Seq[Expression],

213

isUDAFBridgeRequired: Boolean,

214

mutableAggBufferOffset: Int,

215

inputAggBufferOffset: Int

216

) extends TypedImperativeAggregate[HiveUDAFBuffer] with HiveInspectors with UserDefinedExpression {

217

218

/**

219

* Create a new aggregation buffer

220

* @return New UDAF buffer

221

*/

222

def createAggregationBuffer(): HiveUDAFBuffer

223

224

/**

225

* Update aggregation buffer with new input

226

* @param buffer Current aggregation buffer

227

* @param input Input row

228

* @return Updated buffer

229

*/

230

def update(buffer: HiveUDAFBuffer, input: InternalRow): HiveUDAFBuffer

231

232

/**

233

* Merge two aggregation buffers

234

* @param buffer Target buffer

235

* @param input Source buffer to merge

236

* @return Merged buffer

237

*/

238

def merge(buffer: HiveUDAFBuffer, input: HiveUDAFBuffer): HiveUDAFBuffer

239

240

/**

241

* Get final result from aggregation buffer

242

* @param buffer Final aggregation buffer

243

* @return Aggregation result

244

*/

245

def eval(buffer: HiveUDAFBuffer): Any

246

247

/**

248

* Serialize aggregation buffer

249

* @param buffer Buffer to serialize

250

* @return Serialized buffer data

251

*/

252

def serialize(buffer: HiveUDAFBuffer): Array[Byte]

253

254

/**

255

* Deserialize aggregation buffer

256

* @param bytes Serialized buffer data

257

* @return Deserialized buffer

258

*/

259

def deserialize(bytes: Array[Byte]): HiveUDAFBuffer

260

261

/**

262

* Return data type of the aggregation result

263

*/

264

lazy val dataType: DataType

265

266

/**

267

* Pretty name for display

268

*/

269

def prettyName: String

270

271

/**

272

* SQL representation of the UDAF call

273

*/

274

def sql: String

275

}

276

```

277

278

**Usage Example:**

279

280

```scala

281

// Custom aggregation function

282

val udafWrapper = HiveFunctionWrapper("com.example.MyHiveUDAF")

283

284

val udaf = HiveUDAFFunction(

285

name = "my_aggregate",

286

funcWrapper = udafWrapper,

287

children = Seq(col("value")),

288

isUDAFBridgeRequired = false,

289

mutableAggBufferOffset = 0,

290

inputAggBufferOffset = 0

291

)

292

293

// Use in aggregation context

294

val buffer = udaf.createAggregationBuffer()

295

val updatedBuffer = udaf.update(buffer, inputRow)

296

val result = udaf.eval(updatedBuffer)

297

```

298

299

### UDAF Buffer

300

301

Buffer type for managing UDAF aggregation state.

302

303

```scala { .api }

304

/**

305

* Buffer for Hive UDAF operations

306

* @param buf Hive aggregation buffer

307

* @param canDoMerge Whether buffer supports merge operations

308

*/

309

case class HiveUDAFBuffer(buf: AggregationBuffer, canDoMerge: Boolean)

310

```

311

312

**Usage Example:**

313

314

```scala

315

import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator.AggregationBuffer

316

317

// Create UDAF buffer

318

val hiveBuffer: AggregationBuffer = // created by Hive UDAF

319

val buffer = HiveUDAFBuffer(hiveBuffer, canDoMerge = true)

320

321

// Use in UDAF operations

322

val serialized = udaf.serialize(buffer)

323

val deserialized = udaf.deserialize(serialized)

324

```

325

326

### Function Wrapper

327

328

Wrapper class for Hive function classes.

329

330

```scala { .api }

331

/**

332

* Wrapper for Hive function classes

333

* @param functionClassName Fully qualified class name of the Hive function

334

*/

335

case class HiveFunctionWrapper(functionClassName: String) {

336

337

/**

338

* Create instance of the wrapped function

339

* @return Instance of the Hive function

340

*/

341

def createFunction[T]: T

342

343

/**

344

* Get the function class

345

* @return Class object for the function

346

*/

347

def functionClass: Class[_]

348

}

349

```

350

351

**Usage Example:**

352

353

```scala

354

// Create wrapper

355

val wrapper = HiveFunctionWrapper("org.apache.hadoop.hive.ql.udf.UDFLength")

356

357

// Create function instance

358

val udfInstance = wrapper.createFunction[UDF]

359

360

// Get function class for reflection

361

val functionClass = wrapper.functionClass

362

println(s"Function class: ${functionClass.getName}")

363

```

364

365

### UDF Registration and Usage

366

367

Integration with Spark SQL for automatic UDF registration.

368

369

```scala { .api }

370

// Example of registering Hive UDF in Spark session

371

def registerHiveUDF(

372

spark: SparkSession,

373

name: String,

374

className: String

375

): Unit = {

376

377

val wrapper = HiveFunctionWrapper(className)

378

379

// Register as Spark UDF

380

spark.udf.register(name, (inputs: Seq[Any]) => {

381

val udf = HiveSimpleUDF(name, wrapper, inputs.map(Literal(_)))

382

udf.eval(EmptyRow)

383

})

384

}

385

```

386

387

**Usage Example:**

388

389

```scala

390

import org.apache.spark.sql.SparkSession

391

392

val spark = SparkSession.builder()

393

.appName("HiveUDFExample")

394

.enableHiveSupport()

395

.getOrCreate()

396

397

// Register custom Hive UDF

398

registerHiveUDF(spark, "my_length", "com.example.MyLengthUDF")

399

400

// Use in SQL

401

spark.sql("SELECT my_length(name) FROM users").show()

402

403

// Use in DataFrame API

404

import spark.implicits._

405

val df = Seq("hello", "world").toDF("text")

406

df.select(callUDF("my_length", $"text")).show()

407

```

408

409

### Advanced UDF Features

410

411

Support for complex UDF scenarios.

412

413

```scala { .api }

414

// UDF with complex input/output types

415

def createComplexUDF(

416

name: String,

417

className: String,

418

inputTypes: Seq[DataType],

419

outputType: DataType

420

): HiveGenericUDF = {

421

422

val wrapper = HiveFunctionWrapper(className)

423

val children = inputTypes.zipWithIndex.map { case (dataType, index) =>

424

BoundReference(index, dataType, nullable = true)

425

}

426

427

HiveGenericUDF(name, wrapper, children)

428

}

429

430

// UDTF with multiple output columns

431

def createMultiColumnUDTF(

432

name: String,

433

className: String,

434

inputExpression: Expression,

435

outputSchema: StructType

436

): HiveGenericUDTF = {

437

438

val wrapper = HiveFunctionWrapper(className)

439

440

HiveGenericUDTF(name, wrapper, Seq(inputExpression))

441

}

442

```

443

444

**Usage Example:**

445

446

```scala

447

import org.apache.spark.sql.types._

448

449

// Complex UDF with struct input/output

450

val complexUDF = createComplexUDF(

451

name = "process_struct",

452

className = "com.example.StructProcessorUDF",

453

inputTypes = Seq(StructType(Seq(

454

StructField("id", IntegerType, false),

455

StructField("name", StringType, true)

456

))),

457

outputType = StringType

458

)

459

460

// Multi-column UDTF

461

val outputSchema = StructType(Seq(

462

StructField("key", StringType, false),

463

StructField("value", StringType, true)

464

))

465

466

val multiColUDTF = createMultiColumnUDTF(

467

name = "split_pairs",

468

className = "com.example.SplitPairsUDTF",

469

inputExpression = Literal("key1:value1,key2:value2"),

470

outputSchema = outputSchema

471

)

472

```

473

474

### Error Handling

475

476

Common error patterns in UDF execution.

477

478

```scala { .api }

479

// Handle UDF execution errors

480

def safeEvaluateUDF(udf: HiveSimpleUDF, input: InternalRow): Option[Any] = {

481

try {

482

Some(udf.eval(input))

483

} catch {

484

case _: UDFArgumentException =>

485

println(s"Invalid arguments for UDF ${udf.name}")

486

None

487

case _: HiveException =>

488

println(s"Hive execution error in UDF ${udf.name}")

489

None

490

case e: Exception =>

491

println(s"Unexpected error in UDF ${udf.name}: ${e.getMessage}")

492

None

493

}

494

}

495

```

496

497

### Performance Considerations

498

499

Optimization tips for UDF usage.

500

501

```scala { .api }

502

// Cache UDF instances for repeated use

503

class UDFCache {

504

private val cache = mutable.Map[String, HiveFunctionWrapper]()

505

506

def getOrCreateWrapper(className: String): HiveFunctionWrapper = {

507

cache.getOrElseUpdate(className, HiveFunctionWrapper(className))

508

}

509

}

510

511

// Batch UDF evaluation

512

def batchEvaluateUDF(

513

udf: HiveSimpleUDF,

514

inputs: Seq[InternalRow]

515

): Seq[Any] = {

516

517

// Prepare UDF once

518

val preparedUDF = udf // UDF preparation happens lazily

519

520

// Evaluate for all inputs

521

inputs.map(preparedUDF.eval)

522

}

523

```

524

525

**Usage Example:**

526

527

```scala

528

val cache = new UDFCache()

529

530

// Reuse wrapper for multiple UDFs of same class

531

val wrapper1 = cache.getOrCreateWrapper("com.example.MyUDF")

532

val wrapper2 = cache.getOrCreateWrapper("com.example.MyUDF")

533

assert(wrapper1 eq wrapper2) // Same instance

534

535

// Batch evaluation

536

val inputs = Seq(

537

InternalRow(UTF8String.fromString("hello")),

538

InternalRow(UTF8String.fromString("world"))

539

)

540

541

val udf = HiveSimpleUDF("length", wrapper1, Seq(BoundReference(0, StringType, true)))

542

val results = batchEvaluateUDF(udf, inputs)

543

println(s"Batch results: ${results.mkString(", ")}")

544

```