Apache Spark Hive integration module that provides support for Hive tables, queries, and SerDes
npx @tessl/cli install tessl/maven-org-apache-spark--spark-hive_2-11@2.4.0Apache Spark Hive integration module that provides comprehensive support for accessing and manipulating Hive tables, executing HiveQL queries, and leveraging Hive SerDes. This module serves as the bridge between Apache Spark SQL and Apache Hive, enabling Spark applications to work seamlessly with existing Hive infrastructure, metastore, and data formats while maintaining full compatibility with Hive features including UDFs, partitioning, and complex data types.
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-hive_2.11</artifactId>
<version>2.4.8</version>
</dependency>For Scala applications:
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.hive.HiveContext // Deprecated - use SparkSessionFor enabling Hive support (modern approach):
import org.apache.spark.sql.SparkSession
val spark = SparkSession.builder()
.appName("Hive Integration App")
.enableHiveSupport()
.getOrCreate()import org.apache.spark.sql.SparkSession
// Create SparkSession with Hive support
val spark = SparkSession.builder()
.appName("Hive Integration Example")
.enableHiveSupport()
.getOrCreate()
// Execute HiveQL queries
spark.sql("CREATE TABLE IF NOT EXISTS users (id INT, name STRING, age INT)")
spark.sql("INSERT INTO users VALUES (1, 'Alice', 25), (2, 'Bob', 30)")
// Query Hive tables
val result = spark.sql("SELECT * FROM users WHERE age > 25")
result.show()
// Access Hive metastore
spark.catalog.listTables().show()
spark.catalog.listDatabases().show()
// Work with Hive partitioned tables
spark.sql("""
CREATE TABLE IF NOT EXISTS partitioned_sales (
product STRING,
amount DOUBLE
) PARTITIONED BY (year INT, month INT)
""")
// Load data into partitioned table
spark.sql("INSERT INTO partitioned_sales PARTITION(year=2023, month=12) VALUES ('laptop', 999.99)")The Spark Hive integration is built around several key components:
Core functionality for creating and managing Spark sessions with Hive integration, including legacy HiveContext support and modern SparkSession configuration.
// Modern approach (recommended)
def enableHiveSupport(): SparkSession.Builder
// Legacy approach (FULLY DEPRECATED since 2.0.0 - DO NOT USE)
@deprecated("Use SparkSession.builder.enableHiveSupport instead", "2.0.0")
class HiveContext(sc: SparkContext) extends SQLContextDirect access to Hive metastore for programmatic database, table, partition, and function management through the HiveClient interface.
trait HiveClient {
def listDatabases(pattern: String): Seq[String]
def getDatabase(name: String): CatalogDatabase
def listTables(dbName: String): Seq[String]
def getTable(dbName: String, tableName: String): CatalogTable
def createTable(table: CatalogTable, ignoreIfExists: Boolean): Unit
def getPartitions(catalogTable: CatalogTable, partialSpec: Option[TablePartitionSpec] = None): Seq[CatalogTablePartition]
def runSqlHive(sql: String): Seq[String]
}Comprehensive support for Hive User-Defined Functions including simple UDFs, generic UDFs, table-generating functions (UDTFs), and aggregate functions (UDAFs).
case class HiveSimpleUDF(funcWrapper: HiveFunctionWrapper, children: Seq[Expression]) extends Expression
case class HiveGenericUDF(funcWrapper: HiveFunctionWrapper, children: Seq[Expression]) extends Expression
case class HiveGenericUDTF(funcWrapper: HiveFunctionWrapper, children: Seq[Expression]) extends Generator
case class HiveUDAFFunction(funcWrapper: HiveFunctionWrapper, children: Seq[Expression]) extends TypedImperativeAggregate[Any]Configuration options, utilities, and constants for customizing Hive integration behavior including metastore settings, file format conversion, and compatibility options.
object HiveUtils {
val builtinHiveVersion: String = "1.2.1"
val HIVE_METASTORE_VERSION: ConfigEntry[String]
val CONVERT_METASTORE_PARQUET: ConfigEntry[Boolean]
val CONVERT_METASTORE_ORC: ConfigEntry[Boolean]
def newTemporaryConfiguration(useInMemoryDerby: Boolean): Map[String, String]
}Utilities for converting between Hive and Catalyst data types, handling ObjectInspectors, and managing SerDe operations.
trait HiveInspectors {
def javaTypeToDataType(clz: Type): DataType
def toInspector(dataType: DataType): ObjectInspector
def inspectorToDataType(inspector: ObjectInspector): DataType
def wrapperFor(oi: ObjectInspector, dataType: DataType): Any => Any
def unwrapperFor(objectInspector: ObjectInspector): Any => Any
}Native support for Hive file formats including traditional Hive tables and optimized ORC files with Hive compatibility.
class HiveFileFormat extends FileFormat with DataSourceRegister {
override def shortName(): String = "hive"
}
class OrcFileFormat extends FileFormat with DataSourceRegister {
override def shortName(): String = "orc"
}// From Spark SQL Catalyst - used throughout Hive integration
case class CatalogDatabase(
name: String,
description: String,
locationUri: String,
properties: Map[String, String]
)
case class CatalogTable(
identifier: TableIdentifier,
tableType: CatalogTableType,
storage: CatalogStorageFormat,
schema: StructType,
partitionColumnNames: Seq[String] = Seq.empty,
properties: Map[String, String] = Map.empty
)
case class CatalogTablePartition(
spec: TablePartitionSpec,
storage: CatalogStorageFormat,
parameters: Map[String, String] = Map.empty
)
case class CatalogFunction(
identifier: FunctionIdentifier,
className: String,
resources: Seq[FunctionResource]
)// Hive version support
abstract class HiveVersion(
val fullVersion: String,
val extraDeps: Seq[String] = Nil,
val exclusions: Seq[String] = Nil
)
// Configuration for Hive data sources
class HiveOptions(parameters: Map[String, String]) {
val fileFormat: Option[String]
val inputFormat: Option[String]
val outputFormat: Option[String]
val serde: Option[String]
def serdeProperties: Map[String, String]
}