or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

core-hive-integration.mdexecution-engine.mdfile-formats.mdindex.mdmetastore-operations.mdudf-integration.md

udf-integration.mddocs/

0

# UDF Integration

1

2

Apache Spark Hive integration provides comprehensive support for executing Hive User-Defined Functions (UDFs), User-Defined Aggregate Functions (UDAFs), and User-Defined Table-Generating Functions (UDTFs) within Spark SQL queries.

3

4

## Overview

5

6

The UDF integration system allows Spark to execute existing Hive UDFs without modification, providing seamless compatibility with existing Hive-based data processing pipelines. Spark wraps Hive UDFs in specialized expression classes that handle the translation between Spark's internal row format and Hive's object format.

7

8

## HiveSimpleUDF

9

10

Wrapper for Hive simple UDFs that extend `org.apache.hadoop.hive.ql.exec.UDF`.

11

12

```scala { .api }

13

case class HiveSimpleUDF(

14

name: String,

15

funcWrapper: HiveFunctionWrapper,

16

children: Seq[Expression]

17

) extends Expression with CodegenFallback with Logging {

18

19

def eval(input: InternalRow): Any

20

def dataType: DataType

21

def nullable: Boolean

22

def prettyName: String = name

23

override def toString: String

24

}

25

```

26

27

### Usage Example

28

29

```scala

30

import org.apache.spark.sql.SparkSession

31

32

val spark = SparkSession.builder()

33

.enableHiveSupport()

34

.getOrCreate()

35

36

// Register a simple Hive UDF

37

spark.sql("""

38

CREATE TEMPORARY FUNCTION my_upper AS 'com.example.UpperCaseUDF'

39

""")

40

41

// Use the UDF in queries

42

val result = spark.sql("""

43

SELECT my_upper(name) as upper_name

44

FROM employee

45

""")

46

47

result.show()

48

```

49

50

### Creating Custom Simple UDFs

51

52

To create a Hive UDF that works with Spark:

53

54

```java

55

package com.example;

56

57

import org.apache.hadoop.hive.ql.exec.UDF;

58

import org.apache.hadoop.io.Text;

59

60

public class UpperCaseUDF extends UDF {

61

public Text evaluate(Text input) {

62

if (input == null) return null;

63

return new Text(input.toString().toUpperCase());

64

}

65

}

66

```

67

68

## HiveGenericUDF

69

70

Wrapper for Hive generic UDFs that extend `org.apache.hadoop.hive.ql.udf.generic.GenericUDF`.

71

72

```scala { .api }

73

case class HiveGenericUDF(

74

name: String,

75

funcWrapper: HiveFunctionWrapper,

76

children: Seq[Expression]

77

) extends Expression with CodegenFallback with Logging {

78

79

def eval(input: InternalRow): Any

80

def dataType: DataType

81

def nullable: Boolean

82

def prettyName: String = name

83

def foldable: Boolean

84

override def toString: String

85

}

86

```

87

88

### Usage Example

89

90

```scala

91

// Register a generic UDF

92

spark.sql("""

93

CREATE TEMPORARY FUNCTION json_extract AS 'org.apache.hadoop.hive.ql.udf.generic.GenericUDFJsonExtract'

94

""")

95

96

// Use the generic UDF

97

val result = spark.sql("""

98

SELECT json_extract(json_column, '$.name') as extracted_name

99

FROM json_table

100

""")

101

```

102

103

### Generic UDF Advantages

104

105

Generic UDFs provide more flexibility than simple UDFs:

106

- Support for complex data types (arrays, maps, structs)

107

- Runtime type checking and conversion

108

- Better performance through object inspector framework

109

- Support for variable arguments

110

111

## HiveGenericUDTF

112

113

Wrapper for Hive User-Defined Table-Generating Functions that produce multiple rows from a single input row.

114

115

```scala { .api }

116

case class HiveGenericUDTF(

117

name: String,

118

funcWrapper: HiveFunctionWrapper,

119

children: Seq[Expression]

120

) extends Generator with CodegenFallback with Logging {

121

122

def eval(input: InternalRow): TraversableOnce[InternalRow]

123

def terminate(): TraversableOnce[InternalRow]

124

def dataType: DataType

125

def nullable: Boolean

126

def prettyName: String = name

127

override def toString: String

128

}

129

```

130

131

### Usage Example

132

133

```scala

134

// Register a UDTF (e.g., explode-like function)

135

spark.sql("""

136

CREATE TEMPORARY FUNCTION split_words AS 'com.example.SplitWordsUDTF'

137

""")

138

139

// Use UDTF in LATERAL VIEW

140

val result = spark.sql("""

141

SELECT word

142

FROM sentences

143

LATERAL VIEW split_words(text) exploded_table AS word

144

""")

145

```

146

147

### UDTF Lifecycle

148

149

UDTFs follow a specific lifecycle:

150

1. **Initialize**: Setup phase with input object inspectors

151

2. **Process**: Called for each input row, may produce 0 or more output rows

152

3. **Termine**: Called at the end, may produce final output rows

153

154

## HiveUDAFFunction

155

156

Wrapper for Hive User-Defined Aggregate Functions that perform aggregation operations.

157

158

```scala { .api }

159

case class HiveUDAFFunction(

160

name: String,

161

funcWrapper: HiveFunctionWrapper,

162

children: Seq[Expression],

163

isUDAFBridgeRequired: Boolean = false,

164

mutableAggBufferOffset: Int = 0,

165

inputAggBufferOffset: Int = 0

166

) extends TypedImperativeAggregate[GenericUDAFEvaluator.AggregationBuffer] with Logging {

167

168

def createAggregationBuffer(): AggregationBuffer

169

def update(buffer: AggregationBuffer, input: InternalRow): AggregationBuffer

170

def merge(buffer: AggregationBuffer, input: AggregationBuffer): AggregationBuffer

171

def eval(buffer: AggregationBuffer): Any

172

def serialize(buffer: AggregationBuffer): Array[Byte]

173

def deserialize(storageFormat: Array[Byte]): AggregationBuffer

174

def prettyName: String = name

175

}

176

```

177

178

### Usage Example

179

180

```scala

181

// Register a UDAF

182

spark.sql("""

183

CREATE TEMPORARY FUNCTION my_avg AS 'com.example.AverageUDAF'

184

""")

185

186

// Use UDAF in aggregation query

187

val result = spark.sql("""

188

SELECT department, my_avg(salary) as avg_salary

189

FROM employee

190

GROUP BY department

191

""")

192

```

193

194

### UDAF Aggregation Process

195

196

UDAFs implement a distributed aggregation process:

197

1. **Partial aggregation**: Each partition computes partial results

198

2. **Merge**: Partial results are combined across partitions

199

3. **Final evaluation**: Final result is computed from merged state

200

201

## HiveFunctionWrapper

202

203

Core wrapper class that loads and manages Hive function instances.

204

205

```scala { .api }

206

case class HiveFunctionWrapper(

207

className: String,

208

instance: AnyRef

209

) extends Serializable {

210

211

def createFunction[T](): T

212

def getMethodName(): String

213

def getParameterTypes(): Array[Class[_]]

214

}

215

```

216

217

### Function Loading

218

219

```scala

220

// Create function wrapper

221

val wrapper = HiveFunctionWrapper("com.example.MyUDF", null)

222

223

// Create function instance

224

val udfInstance = wrapper.createFunction[UDF]()

225

```

226

227

## Function Registration

228

229

### Temporary Functions

230

231

Register functions for the current session:

232

233

```scala

234

// Register UDF from JAR

235

spark.sql("""

236

CREATE TEMPORARY FUNCTION my_func AS 'com.example.MyFunction'

237

""")

238

239

// Register with JAR location

240

spark.sql("""

241

CREATE FUNCTION my_func AS 'com.example.MyFunction'

242

USING JAR '/path/to/udf.jar'

243

""")

244

```

245

246

### Permanent Functions

247

248

Register functions in Hive metastore:

249

250

```scala

251

// Create permanent function

252

spark.sql("""

253

CREATE FUNCTION my_database.my_func AS 'com.example.MyFunction'

254

USING JAR 'hdfs://path/to/udf.jar'

255

""")

256

257

// Use permanent function

258

val result = spark.sql("""

259

SELECT my_database.my_func(column) FROM table

260

""")

261

```

262

263

### Function Discovery

264

265

List and inspect registered functions:

266

267

```scala

268

// List all functions

269

spark.sql("SHOW FUNCTIONS").show()

270

271

// List functions matching pattern

272

spark.sql("SHOW FUNCTIONS LIKE 'my_*'").show()

273

274

// Describe function

275

spark.sql("DESCRIBE FUNCTION my_func").show()

276

277

// Show extended info

278

spark.sql("DESCRIBE FUNCTION EXTENDED my_func").show()

279

```

280

281

## Data Type Mapping

282

283

Mapping between Hive and Spark data types in UDF integration:

284

285

### Primitive Types

286

- **BOOLEAN** ↔ BooleanType

287

- **TINYINT** ↔ ByteType

288

- **SMALLINT** ↔ ShortType

289

- **INT** ↔ IntegerType

290

- **BIGINT** ↔ LongType

291

- **FLOAT** ↔ FloatType

292

- **DOUBLE** ↔ DoubleType

293

- **STRING** ↔ StringType

294

- **BINARY** ↔ BinaryType

295

- **TIMESTAMP** ↔ TimestampType

296

- **DATE** ↔ DateType

297

298

### Complex Types

299

- **ARRAY\<T\>** ↔ ArrayType(T)

300

- **MAP\<K,V\>** ↔ MapType(K,V)

301

- **STRUCT\<...>** ↔ StructType(...)

302

303

### Usage in UDFs

304

305

```java

306

// Java UDF handling complex types

307

public class ComplexUDF extends GenericUDF {

308

@Override

309

public Object evaluate(DeferredObject[] arguments) throws HiveException {

310

// Handle array input

311

ListObjectInspector listOI = (ListObjectInspector) arguments[0].get();

312

List<?> inputList = listOI.getList(arguments[0].get());

313

314

// Process and return result

315

return result;

316

}

317

}

318

```

319

320

## Performance Considerations

321

322

### UDF Performance Tips

323

324

1. **Use Generic UDFs**: Better performance than simple UDFs for complex operations

325

2. **Minimize Object Creation**: Reuse objects where possible in UDF evaluation

326

3. **Leverage Vectorization**: Some UDFs can benefit from vectorized execution

327

4. **Consider Native Functions**: Use Spark's built-in functions when available

328

329

### Example Optimized UDF

330

331

```java

332

public class OptimizedUDF extends GenericUDF {

333

private Text result = new Text(); // Reuse object

334

335

@Override

336

public Object evaluate(DeferredObject[] arguments) throws HiveException {

337

String input = arguments[0].get().toString();

338

result.set(processString(input)); // Reuse Text object

339

return result;

340

}

341

}

342

```

343

344

## Error Handling

345

346

### Common UDF Errors

347

348

**ClassNotFoundException**: UDF class not found

349

```scala

350

// Solution: Add JAR to classpath

351

spark.sql("ADD JAR '/path/to/udf.jar'")

352

```

353

354

**Method Not Found**: Incorrect UDF method signature

355

```scala

356

// Ensure UDF implements correct evaluate() method

357

public Object evaluate(DeferredObject[] args) throws HiveException

358

```

359

360

**Serialization Issues**: UDF not serializable for distributed execution

361

```scala

362

// Make UDF implement Serializable or use transient fields

363

public class MyUDF extends GenericUDF implements Serializable

364

```

365

366

### Exception Handling in UDFs

367

368

```java

369

public class SafeUDF extends GenericUDF {

370

@Override

371

public Object evaluate(DeferredObject[] arguments) throws HiveException {

372

try {

373

// UDF logic

374

return processInput(arguments[0].get());

375

} catch (Exception e) {

376

// Handle errors gracefully

377

LOG.warn("UDF error: " + e.getMessage());

378

return null; // or appropriate default value

379

}

380

}

381

}

382

```

383

384

## Testing UDFs

385

386

### Unit Testing

387

388

```scala

389

import org.apache.spark.sql.test.SharedSparkSession

390

391

class UDFIntegrationSuite extends SparkFunSuite with SharedSparkSession {

392

test("custom UDF execution") {

393

spark.sql("CREATE TEMPORARY FUNCTION test_udf AS 'com.example.TestUDF'")

394

395

val result = spark.sql("SELECT test_udf('input') as output").collect()

396

assert(result(0).getString(0) == "expected_output")

397

}

398

}

399

```

400

401

### Integration Testing

402

403

```scala

404

// Test with actual Hive UDFs

405

class HiveUDFSuite extends SparkFunSuite with SharedSparkSession {

406

test("hive builtin UDF") {

407

val result = spark.sql("SELECT upper('hello') as upper_hello").collect()

408

assert(result(0).getString(0) == "HELLO")

409

}

410

}

411

```

412

413

## Migration and Compatibility

414

415

### Migrating from Hive

416

417

Most Hive UDFs work without modification in Spark:

418

419

1. **Copy JAR files** to Spark classpath

420

2. **Register functions** using CREATE FUNCTION

421

3. **Test functionality** with representative data

422

4. **Monitor performance** and optimize if needed

423

424

### Version Compatibility

425

426

UDF compatibility across Hive versions:

427

- **Simple UDFs**: Generally compatible across versions

428

- **Generic UDFs**: May require Hive version-specific compilation

429

- **Built-in UDFs**: Spark provides compatibility layer

430

431

## Types

432

433

```scala { .api }

434

// Base expression interface

435

trait Expression extends TreeNode[Expression] {

436

def dataType: DataType

437

def nullable: Boolean

438

def eval(input: InternalRow): Any

439

def prettyName: String

440

}

441

442

// Generator for table-generating functions

443

trait Generator extends Expression {

444

def eval(input: InternalRow): TraversableOnce[InternalRow]

445

def terminate(): TraversableOnce[InternalRow]

446

}

447

448

// Aggregate function interface

449

trait TypedImperativeAggregate[T] extends ImperativeAggregate {

450

def createAggregationBuffer(): T

451

def update(buffer: T, input: InternalRow): T

452

def merge(buffer: T, input: T): T

453

def eval(buffer: T): Any

454

}

455

456

// Hive UDAF aggregation buffer

457

trait AggregationBuffer {

458

def reset(): Unit

459

def copy(): AggregationBuffer

460

}

461

462

// Object inspector for Hive type system

463

trait ObjectInspector {

464

def getCategory(): ObjectInspector.Category

465

def getTypeName(): String

466

}

467

468

// Function wrapper for Hive functions

469

case class HiveFunctionWrapper(className: String, instance: AnyRef) extends Serializable {

470

def createFunction[T](): T

471

}

472

473

// Internal row representation

474

trait InternalRow {

475

def numFields: Int

476

def get(ordinal: Int, dataType: DataType): Any

477

def isNullAt(ordinal: Int): Boolean

478

}

479

```