or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

configuration.mddata-type-conversion.mdfile-formats.mdindex.mdmetastore-operations.mdsession-management.mdudf-integration.md

udf-integration.mddocs/

0

# UDF Integration

1

2

Comprehensive support for Hive User-Defined Functions including simple UDFs, generic UDFs, table-generating functions (UDTFs), and aggregate functions (UDAFs). This enables seamless integration of existing Hive UDFs within Spark SQL queries.

3

4

## Capabilities

5

6

### Simple UDF Support

7

8

Wrapper for simple Hive UDFs that work with basic data types.

9

10

```scala { .api }

11

/**

12

* Expression wrapper for simple Hive UDFs

13

* Handles basic data type conversion between Hive and Catalyst

14

*/

15

case class HiveSimpleUDF(

16

funcWrapper: HiveFunctionWrapper,

17

children: Seq[Expression]

18

) extends Expression with HiveInspectors with CodegenFallback with Logging {

19

20

override def dataType: DataType

21

override def nullable: Boolean = true

22

override def eval(input: InternalRow): Any

23

override def prettyName: String

24

}

25

```

26

27

**Usage Examples:**

28

29

```scala

30

import org.apache.spark.sql.SparkSession

31

32

val spark = SparkSession.builder()

33

.enableHiveSupport()

34

.getOrCreate()

35

36

// Register and use simple Hive UDF

37

spark.sql("CREATE TEMPORARY FUNCTION simple_upper AS 'org.apache.hadoop.hive.ql.udf.UDFUpper'")

38

spark.sql("SELECT simple_upper('hello world') as result").show()

39

// Result: HELLO WORLD

40

41

// Use built-in Hive simple UDFs

42

spark.sql("SELECT substr('Apache Spark', 1, 6) as result").show()

43

// Result: Apache

44

```

45

46

### Generic UDF Support

47

48

Wrapper for generic Hive UDFs that can handle complex data types and advanced operations.

49

50

```scala { .api }

51

/**

52

* Expression wrapper for generic Hive UDFs

53

* Supports complex data types and provides full ObjectInspector integration

54

*/

55

case class HiveGenericUDF(

56

funcWrapper: HiveFunctionWrapper,

57

children: Seq[Expression]

58

) extends Expression with HiveInspectors with CodegenFallback with Logging {

59

60

override def dataType: DataType

61

override def nullable: Boolean = true

62

override def eval(input: InternalRow): Any

63

override def prettyName: String

64

}

65

```

66

67

**Usage Examples:**

68

69

```scala

70

// Register generic UDF for JSON processing

71

spark.sql("""

72

CREATE TEMPORARY FUNCTION get_json_object

73

AS 'org.apache.hadoop.hive.ql.udf.generic.GenericUDFJsonObject'

74

""")

75

76

// Use generic UDF

77

val jsonData = """{"name": "Alice", "age": 25, "city": "NYC"}"""

78

spark.sql(s"""

79

SELECT

80

get_json_object('$jsonData', '$$.name') as name,

81

get_json_object('$jsonData', '$$.age') as age

82

""").show()

83

84

// Register custom generic UDF for array operations

85

spark.sql("""

86

CREATE TEMPORARY FUNCTION array_contains

87

AS 'org.apache.hadoop.hive.ql.udf.generic.GenericUDFArrayContains'

88

""")

89

90

spark.sql("""

91

SELECT array_contains(array(1,2,3,4), 3) as contains_three

92

""").show()

93

// Result: true

94

```

95

96

### Table-Generating Functions (UDTF)

97

98

Support for Hive UDTFs that generate multiple rows from a single input row.

99

100

```scala { .api }

101

/**

102

* Generator wrapper for Hive UDTFs (User-Defined Table-Generating Functions)

103

* Converts single input rows into multiple output rows

104

*/

105

case class HiveGenericUDTF(

106

funcWrapper: HiveFunctionWrapper,

107

children: Seq[Expression]

108

) extends Generator with HiveInspectors with CodegenFallback with Logging {

109

110

override def elementSchema: StructType

111

override def eval(input: InternalRow): TraversableOnce[InternalRow]

112

override def prettyName: String

113

}

114

```

115

116

**Usage Examples:**

117

118

```scala

119

// Register explode UDTF for array expansion

120

spark.sql("""

121

CREATE TEMPORARY FUNCTION hive_explode

122

AS 'org.apache.hadoop.hive.ql.udf.generic.GenericUDTFExplode'

123

""")

124

125

// Use UDTF to expand arrays

126

spark.sql("""

127

SELECT hive_explode(array('a', 'b', 'c')) as item

128

""").show()

129

// Results:

130

// +----+

131

// |item|

132

// +----+

133

// | a |

134

// | b |

135

// | c |

136

// +----+

137

138

// Register stack UDTF for pivoting data

139

spark.sql("""

140

CREATE TEMPORARY FUNCTION stack

141

AS 'org.apache.hadoop.hive.ql.udf.generic.GenericUDTFStack'

142

""")

143

144

spark.sql("""

145

SELECT stack(2, 'A', 1, 'B', 2) as (letter, number)

146

""").show()

147

// Results:

148

// +------+------+

149

// |letter|number|

150

// +------+------+

151

// | A| 1|

152

// | B| 2|

153

// +------+------+

154

```

155

156

### Aggregate Functions (UDAF)

157

158

Support for Hive UDAFs that perform custom aggregation operations.

159

160

```scala { .api }

161

/**

162

* Aggregate function wrapper for Hive UDAFs

163

* Provides custom aggregation logic with proper state management

164

*/

165

case class HiveUDAFFunction(

166

funcWrapper: HiveFunctionWrapper,

167

children: Seq[Expression]

168

) extends TypedImperativeAggregate[Any] with HiveInspectors with Logging {

169

170

override def nullable: Boolean = true

171

override def dataType: DataType

172

override def prettyName: String

173

override def createAggregationBuffer(): Any

174

override def update(buffer: Any, input: InternalRow): Any

175

override def merge(buffer: Any, input: Any): Any

176

override def eval(buffer: Any): Any

177

override def serialize(buffer: Any): Array[Byte]

178

override def deserialize(storageFormat: Array[Byte]): Any

179

}

180

```

181

182

**Usage Examples:**

183

184

```scala

185

// Register custom UDAF for advanced statistics

186

spark.sql("""

187

CREATE TEMPORARY FUNCTION variance

188

AS 'org.apache.hadoop.hive.ql.udf.generic.GenericUDAFVariance'

189

""")

190

191

// Use UDAF in aggregation query

192

spark.sql("""

193

SELECT

194

department,

195

variance(salary) as salary_variance

196

FROM employees

197

GROUP BY department

198

""").show()

199

200

// Register percentile UDAF

201

spark.sql("""

202

CREATE TEMPORARY FUNCTION percentile_approx

203

AS 'org.apache.hadoop.hive.ql.udf.generic.GenericUDAFPercentileApprox'

204

""")

205

206

spark.sql("""

207

SELECT

208

percentile_approx(age, 0.5) as median_age,

209

percentile_approx(age, 0.95) as p95_age

210

FROM users

211

""").show()

212

```

213

214

### HiveGenericUDTF (Table-Generating Functions)

215

216

Wrapper for Hive UDTFs that generate multiple rows from single input rows, extending the Generator interface.

217

218

```scala { .api }

219

/**

220

* Generator wrapper for Hive UDTFs (User-Defined Table-Generating Functions)

221

* Converts single input rows into multiple output rows with full schema support

222

*/

223

case class HiveGenericUDTF(

224

funcWrapper: HiveFunctionWrapper,

225

children: Seq[Expression]

226

) extends Generator with HiveInspectors with CodegenFallback with Logging {

227

228

override def elementSchema: StructType

229

override def eval(input: InternalRow): TraversableOnce[InternalRow]

230

override def prettyName: String

231

override def terminate(): TraversableOnce[InternalRow]

232

override def close(): Unit

233

}

234

```

235

236

**Usage Examples:**

237

238

```scala

239

// Register custom UDTF for data expansion

240

spark.sql("""

241

CREATE TEMPORARY FUNCTION my_explode_json

242

AS 'com.example.JsonExplodeUDTF'

243

""")

244

245

// Use UDTF in LATERAL VIEW

246

spark.sql("""

247

SELECT t.id, exploded.key, exploded.value

248

FROM my_table t

249

LATERAL VIEW my_explode_json(t.json_data) exploded AS key, value

250

""").show()

251

252

// Built-in UDTF examples

253

spark.sql("""

254

SELECT explode(array('a', 'b', 'c')) as item

255

""").show()

256

257

spark.sql("""

258

SELECT stack(3, 'col1', 1, 'col2', 2, 'col3', 3) as (name, value)

259

""").show()

260

```

261

262

### HiveUDAFFunction (Aggregate Functions)

263

264

Wrapper for Hive UDAFs providing custom aggregation with proper state management and distributed execution support.

265

266

```scala { .api }

267

/**

268

* Aggregate function wrapper for Hive UDAFs

269

* Provides custom aggregation logic with proper state management for distributed execution

270

*/

271

case class HiveUDAFFunction(

272

funcWrapper: HiveFunctionWrapper,

273

children: Seq[Expression]

274

) extends TypedImperativeAggregate[Any] with HiveInspectors with Logging {

275

276

override def nullable: Boolean = true

277

override def dataType: DataType

278

override def prettyName: String

279

override def createAggregationBuffer(): Any

280

override def update(buffer: Any, input: InternalRow): Any

281

override def merge(buffer: Any, input: Any): Any

282

override def eval(buffer: Any): Any

283

override def serialize(buffer: Any): Array[Byte]

284

override def deserialize(storageFormat: Array[Byte]): Any

285

}

286

```

287

288

**Usage Examples:**

289

290

```scala

291

// Register custom UDAF for advanced analytics

292

spark.sql("""

293

CREATE TEMPORARY FUNCTION geometric_mean

294

AS 'com.example.GeometricMeanUDAF'

295

""")

296

297

// Use UDAF in aggregation queries

298

spark.sql("""

299

SELECT

300

department,

301

geometric_mean(salary) as geo_mean_salary,

302

percentile_approx(salary, 0.5) as median_salary

303

FROM employees

304

GROUP BY department

305

""").show()

306

307

// Window function usage

308

spark.sql("""

309

SELECT

310

name,

311

salary,

312

variance(salary) OVER (PARTITION BY department) as dept_variance

313

FROM employees

314

""").show()

315

```

316

317

### Function Registration and Management

318

319

Utilities for registering and managing Hive UDFs in Spark sessions.

320

321

```scala { .api }

322

/**

323

* Function wrapper for Hive functions with class loading support

324

*/

325

case class HiveFunctionWrapper(functionClassName: String) {

326

def createFunction[UDFType](): UDFType

327

}

328

```

329

330

**Registration Examples:**

331

332

```scala

333

import org.apache.spark.sql.SparkSession

334

335

val spark = SparkSession.builder()

336

.enableHiveSupport()

337

.getOrCreate()

338

339

// Register UDF from JAR

340

spark.sql("ADD JAR /path/to/custom-udfs.jar")

341

spark.sql("""

342

CREATE TEMPORARY FUNCTION my_custom_function

343

AS 'com.example.MyCustomUDF'

344

""")

345

346

// Register permanent function in metastore

347

spark.sql("""

348

CREATE FUNCTION my_db.my_permanent_function

349

AS 'com.example.MyPermanentUDF'

350

USING JAR '/path/to/custom-udfs.jar'

351

""")

352

353

// List available functions

354

spark.sql("SHOW FUNCTIONS LIKE '*custom*'").show()

355

356

// Get function information

357

spark.sql("DESCRIBE FUNCTION my_custom_function").show(truncate = false)

358

```

359

360

### UDF Type Integration

361

362

Support for complex Hive data types in UDF operations.

363

364

```scala { .api }

365

/**

366

* HiveInspectors trait provides conversion utilities between

367

* Hive ObjectInspectors and Catalyst data types

368

*/

369

trait HiveInspectors {

370

/** Convert Java type to Catalyst DataType */

371

def javaTypeToDataType(clz: Type): DataType

372

373

/** Create wrapper for converting Catalyst data to Hive format */

374

def wrapperFor(oi: ObjectInspector, dataType: DataType): Any => Any

375

376

/** Create unwrapper for converting Hive data to Catalyst format */

377

def unwrapperFor(objectInspector: ObjectInspector): Any => Any

378

379

/** Convert Catalyst DataType to Hive ObjectInspector */

380

def toInspector(dataType: DataType): ObjectInspector

381

382

/** Convert Hive ObjectInspector to Catalyst DataType */

383

def inspectorToDataType(inspector: ObjectInspector): DataType

384

}

385

```

386

387

**Complex Type Examples:**

388

389

```scala

390

// Working with array types in UDFs

391

spark.sql("""

392

SELECT

393

collect_list(name) as names,

394

size(collect_list(name)) as count

395

FROM users

396

GROUP BY department

397

""").show()

398

399

// Working with map types

400

spark.sql("""

401

SELECT

402

str_to_map('key1:value1,key2:value2', ',', ':') as parsed_map

403

""").show()

404

405

// Working with struct types

406

spark.sql("""

407

SELECT

408

named_struct('name', 'Alice', 'age', 25) as person_info

409

""").show()

410

```

411

412

### Built-in Hive Function Integration

413

414

Access to extensive set of built-in Hive functions.

415

416

**String Functions:**

417

```scala

418

// String manipulation functions

419

spark.sql("SELECT concat('Hello', ' ', 'World') as greeting").show()

420

spark.sql("SELECT upper('apache spark') as upper_case").show()

421

spark.sql("SELECT regexp_replace('abc123def', '[0-9]+', 'XXX') as replaced").show()

422

```

423

424

**Date/Time Functions:**

425

```scala

426

// Date and time functions

427

spark.sql("SELECT from_unixtime(unix_timestamp()) as current_time").show()

428

spark.sql("SELECT date_add('2023-01-01', 30) as future_date").show()

429

spark.sql("SELECT datediff('2023-12-31', '2023-01-01') as days_diff").show()

430

```

431

432

**Mathematical Functions:**

433

```scala

434

// Mathematical functions

435

spark.sql("SELECT round(3.14159, 2) as rounded").show()

436

spark.sql("SELECT pow(2, 3) as power").show()

437

spark.sql("SELECT greatest(1, 5, 3, 2) as max_value").show()

438

```

439

440

**Conditional Functions:**

441

```scala

442

// Conditional functions

443

spark.sql("""

444

SELECT

445

name,

446

CASE

447

WHEN age < 18 THEN 'Minor'

448

WHEN age < 65 THEN 'Adult'

449

ELSE 'Senior'

450

END as age_group

451

FROM users

452

""").show()

453

454

spark.sql("SELECT nvl(null_column, 'default_value') as coalesced").show()

455

```

456

457

## Error Handling

458

459

Common error patterns and exception handling for UDF operations:

460

461

```scala

462

import org.apache.spark.sql.AnalysisException

463

464

try {

465

// Attempt to use non-existent UDF

466

spark.sql("SELECT non_existent_udf('test')").show()

467

} catch {

468

case e: AnalysisException if e.getMessage.contains("undefined function") =>

469

println("UDF not found - check function registration")

470

case e: Exception =>

471

println(s"UDF execution error: ${e.getMessage}")

472

}

473

474

// Handle UDF registration errors

475

try {

476

spark.sql("CREATE TEMPORARY FUNCTION bad_udf AS 'invalid.class.name'")

477

} catch {

478

case e: ClassNotFoundException =>

479

println("UDF class not found - check classpath")

480

case e: Exception =>

481

println(s"UDF registration failed: ${e.getMessage}")

482

}

483

```

484

485

## Performance Considerations

486

487

Best practices for UDF performance:

488

489

```scala

490

// Prefer built-in functions over custom UDFs when possible

491

// GOOD: Use built-in functions

492

spark.sql("SELECT upper(name) FROM users")

493

494

// LESS OPTIMAL: Custom UDF for same functionality

495

// spark.sql("SELECT my_upper_udf(name) FROM users")

496

497

// Use vectorized operations when available

498

// For Spark 2.4+, some Hive UDFs support vectorization

499

spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", "true")

500

501

// Register UDFs once per session to avoid repeated registration overhead

502

val spark = SparkSession.builder().enableHiveSupport().getOrCreate()

503

spark.sql("CREATE TEMPORARY FUNCTION my_udf AS 'com.example.MyUDF'")

504

// Reuse throughout session

505

```