tessl install tessl/maven-org-apache-spark--spark-hive@1.6.0Apache Spark SQL Hive integration module providing HiveContext, metastore operations, HiveQL parsing, and Hive data format compatibility
Apache Spark Hive integration provides comprehensive support for User-Defined Functions (UDFs), User-Defined Aggregate Functions (UDAFs), and User-Defined Table Functions (UDTFs). This includes both Hive-native functions and Spark SQL function integration.
import org.apache.spark.sql.hive.HiveFunctionRegistry
import org.apache.spark.sql.catalyst.FunctionIdentifier
import org.apache.spark.sql.catalyst.expressions.Expression
import org.apache.hadoop.hive.ql.exec.FunctionInfo
import org.apache.hadoop.hive.ql.udf.generic._class HiveFunctionRegistry(
underlying: FunctionRegistry,
executionHive: ClientWrapper
) extends FunctionRegistry with HiveInspectors {
def getFunctionInfo(name: String): FunctionInfo
def lookupFunction(name: String, children: Seq[Expression]): Expression
def registerFunction(name: FunctionIdentifier, builder: Seq[Expression] => Expression): Unit
}HiveFunctionRegistry - Registry for both Hive and Spark SQL functions
getFunctionInfo - Retrieves Hive function metadata and signature information
lookupFunction - Resolves function name to executable expression with type checking
registerFunction - Registers new function with given name and expression builder
Usage Examples:
val hiveContext = new HiveContext(sc)
val registry = hiveContext.sessionState.functionRegistry
// Get function information
val funcInfo = registry.getFunctionInfo("concat")
println(s"Function class: ${funcInfo.getFunctionClass}")
// Look up function with arguments
val concatExpr = registry.lookupFunction("concat", Seq(stringLit1, stringLit2))
// Register custom function
registry.registerFunction(
FunctionIdentifier("my_custom_func"),
(args: Seq[Expression]) => new MyCustomFunction(args)
)The integration supports all standard Hive built-in functions:
String Functions:
concat, concat_ws, substr, substring, length, trim, ltrim, rtrimupper, lower, reverse, split, regexp_replace, regexp_extractlike, rlike, instr, locate, ascii, base64, unbase64Mathematical Functions:
abs, acos, asin, atan, atan2, ceil, ceiling, cos, coshexp, floor, ln, log, log10, log2, negative, pi, positivepow, power, rand, round, sign, sin, sinh, sqrt, tan, tanhDate/Time Functions:
year, month, day, hour, minute, second, weekofyeardate_add, date_sub, datediff, date_format, from_unixtime, unix_timestampto_date, trunc, months_between, add_months, last_day, next_dayAggregate Functions:
count, sum, avg, min, max, variance, var_pop, var_sampstddev, stddev_pop, stddev_samp, collect_list, collect_setpercentile, percentile_approx, histogram_numericUsage Examples:
// Use built-in functions in SQL
val result = hiveContext.sql("""
SELECT
concat(first_name, ' ', last_name) as full_name,
year(birth_date) as birth_year,
round(salary * 1.1, 2) as adjusted_salary
FROM employees
WHERE length(first_name) > 3
""")
// Use built-in functions in DataFrame API
import org.apache.spark.sql.functions._
val df = hiveContext.table("employees")
.select(
concat(col("first_name"), lit(" "), col("last_name")).as("full_name"),
year(col("birth_date")).as("birth_year")
)// UDF base classes supported
abstract class UDF extends org.apache.hadoop.hive.ql.exec.UDF
abstract class GenericUDF extends org.apache.hadoop.hive.ql.udf.generic.GenericUDF
abstract class GenericUDTF extends org.apache.hadoop.hive.ql.udf.generic.GenericUDTF
abstract class GenericUDAF extends org.apache.hadoop.hive.ql.udf.generic.GenericUDAF
// Evaluator for UDAFs
abstract class GenericUDAFEvaluator
extends org.apache.hadoop.hive.ql.udf.generic.GenericUDAF.GenericUDAFEvaluatorUsage Examples:
// Register Hive UDF class
hiveContext.sql("ADD JAR /path/to/my-udfs.jar")
hiveContext.sql("""
CREATE TEMPORARY FUNCTION my_string_func
AS 'com.company.hive.MyStringUDF'
""")
// Use registered UDF
val result = hiveContext.sql("""
SELECT customer_id, my_string_func(customer_name) as processed_name
FROM customers
""")
// Register UDAF
hiveContext.sql("""
CREATE TEMPORARY FUNCTION my_aggregate_func
AS 'com.company.hive.MyAggregateUDAF'
""")
val aggregateResult = hiveContext.sql("""
SELECT
department,
my_aggregate_func(salary) as custom_aggregate
FROM employees
GROUP BY department
""")case class HiveGenericUDF(
name: String,
funcWrapper: HiveFunctionWrapper,
children: Seq[Expression]
) extends Expression with HiveInspectors with CodegenFallbackHiveGenericUDF - Wrapper for Hive GenericUDF implementations
Usage in query planning:
// This is typically handled internally by the query planner
// when Hive UDFs are encountered in SQL queriescase class HiveGenericUDTF(
name: String,
funcWrapper: HiveFunctionWrapper,
children: Seq[Expression]
) extends Generator with HiveInspectors with CodegenFallbackHiveGenericUDTF - Wrapper for Hive table-generating functions
Usage Examples:
// UDTF usage with LATERAL VIEW
val result = hiveContext.sql("""
SELECT
customer_id,
tag
FROM customers
LATERAL VIEW explode(tags) exploded_table AS tag
""")
// Custom UDTF
hiveContext.sql("""
CREATE TEMPORARY FUNCTION my_explode_func
AS 'com.company.hive.MyExplodeUDTF'
""")
val customResult = hiveContext.sql("""
SELECT
id,
value
FROM input_table
LATERAL VIEW my_explode_func(data_column) t AS value
""")// Add JAR containing UDFs
def addJar(path: String): Unit
// Create temporary function
def createFunction(
name: String,
className: String,
resources: Seq[String] = Seq.empty
): Unit
// Drop temporary function
def dropFunction(name: String): UnitUsage Examples:
// Add UDF JAR to classpath
hiveContext.sql("ADD JAR hdfs://namenode:port/path/to/udf.jar")
// Create temporary function
hiveContext.sql("""
CREATE TEMPORARY FUNCTION advanced_analytics
AS 'com.analytics.AdvancedAnalyticsUDF'
USING JAR 'hdfs://namenode:port/path/to/analytics.jar'
""")
// Use in queries
val insights = hiveContext.sql("""
SELECT
customer_segment,
advanced_analytics(purchase_history, demographic_data) as insights
FROM customer_data
GROUP BY customer_segment
""")
// Clean up function
hiveContext.sql("DROP TEMPORARY FUNCTION advanced_analytics")// List available functions
def listFunctions(): Array[String]
def listFunctions(pattern: String): Array[String]
// Describe function
def describeFunction(name: String): String
def describeFunction(name: String, extended: Boolean): StringUsage Examples:
// List all functions
val allFunctions = hiveContext.sql("SHOW FUNCTIONS").collect()
// List functions matching pattern
val stringFunctions = hiveContext.sql("SHOW FUNCTIONS 'str*'").collect()
// Get function description
val description = hiveContext.sql("DESCRIBE FUNCTION concat").collect()
val extendedDesc = hiveContext.sql("DESCRIBE FUNCTION EXTENDED concat").collect()Common UDF-related issues:
ClassNotFoundException - UDF JAR not in classpath
// Solution: Add JAR before creating function
hiveContext.sql("ADD JAR /path/to/udf.jar")UDFArgumentException - Incorrect argument types or count
// Check function signature and argument types
// Verify UDF implementation matches expected signatureSerializationException - UDF not serializable
// Ensure UDF classes implement Serializable if needed
// Use transient fields for non-serializable componentsOptimization Tips: