Apache Spark Hive integration module that provides seamless integration with Apache Hive data warehouse software, enabling Spark SQL to work with Hive tables, metastore, and SerDes
npx @tessl/cli install tessl/maven-org-apache-spark--spark-hive_2-13@4.0.0Apache Spark Hive integration module that provides seamless integration with Apache Hive data warehouse software, enabling Spark SQL to work with Hive tables, metastore, and SerDes. This module serves as a bridge allowing Spark applications to read from and write to Hive tables, utilize Hive's metastore for table metadata management, execute Hive UDFs within Spark queries, and maintain compatibility with existing Hive-based data pipelines.
"org.apache.spark" %% "spark-hive" % "4.0.0"import org.apache.spark.sql.hive._
import org.apache.spark.sql.hive.client.HiveClient
import org.apache.spark.sql.SparkSessionimport org.apache.spark.sql.SparkSession
// Create SparkSession with Hive support
val spark = SparkSession.builder()
.appName("HiveIntegrationExample")
.enableHiveSupport()
.getOrCreate()
// Use Hive tables directly
spark.sql("CREATE TABLE IF NOT EXISTS hive_table (id INT, name STRING)")
spark.sql("INSERT INTO hive_table VALUES (1, 'Alice'), (2, 'Bob')")
val df = spark.sql("SELECT * FROM hive_table")
df.show()
// Access Hive metastore
val catalog = spark.sessionState.catalog
catalog.listTables().collect().foreach(println)The Spark Hive integration is built around several key components:
HiveExternalCatalog provides complete Hive metastore integration for database, table, and partition operationsHiveClient interface abstracts different Hive versions (2.0.x through 4.0.x) with unified APIHiveInspectors handles bidirectional conversion between Spark and Hive data representationsThis package is marked as private internal API in the Spark codebase. All classes are subject to change between minor releases. However, these APIs are still exposed and used by applications integrating with Spark Hive functionality.
Complete Hive metastore integration providing database, table, partition, and function operations through the Spark catalog interface.
class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configuration) extends ExternalCatalog {
lazy val client: HiveClient
// Database operations
def createDatabase(dbDefinition: CatalogDatabase, ignoreIfExists: Boolean): Unit
def getDatabase(db: String): CatalogDatabase
def listDatabases(): Seq[String]
// Table operations
def createTable(tableDefinition: CatalogTable, ignoreIfExists: Boolean): Unit
def getTable(db: String, table: String): CatalogTable
def listTables(db: String): Seq[String]
}Low-level interface to Hive metastore client with version abstraction, supporting direct SQL execution and raw Hive operations.
trait HiveClient {
def version: HiveVersion
def runSqlHive(sql: String): Seq[String]
// Database operations
def createDatabase(database: CatalogDatabase, ignoreIfExists: Boolean): Unit
def getDatabase(name: String): CatalogDatabase
def listDatabases(pattern: String): Seq[String]
// Table operations
def createTable(table: CatalogTable, ignoreIfExists: Boolean): Unit
def getTable(dbName: String, tableName: String): CatalogTable
def getRawHiveTable(dbName: String, tableName: String): RawHiveTable
}Bidirectional conversion system between Spark and Hive data representations, handling complex nested types and Hive SerDe integration.
trait HiveInspectors {
def inspectorToDataType(inspector: ObjectInspector): DataType
def toInspector(dataType: DataType): ObjectInspector
def wrapperFor(oi: ObjectInspector, dataType: DataType): Any => Any
def unwrapperFor(objectInspector: ObjectInspector): Any => Any
implicit class typeInfoConversions(dt: DataType) {
def toTypeInfo: TypeInfo
}
}Complete support for Hive UDFs, UDAFs, and UDTFs with automatic registration and execution within Spark queries.
case class HiveSimpleUDF(
name: String,
funcWrapper: HiveFunctionWrapper,
children: Seq[Expression]
) extends Expression with HiveInspectors with UserDefinedExpression
case class HiveUDAFFunction(
name: String,
funcWrapper: HiveFunctionWrapper,
children: Seq[Expression],
isUDAFBridgeRequired: Boolean,
mutableAggBufferOffset: Int,
inputAggBufferOffset: Int
) extends TypedImperativeAggregate[HiveUDAFBuffer]Comprehensive configuration system for Hive integration with metastore connection settings, format conversions, and JAR management.
object HiveUtils {
val builtinHiveVersion: String
// Configuration entries
val HIVE_METASTORE_VERSION: ConfigEntry[String]
val HIVE_METASTORE_JARS: ConfigEntry[String]
val CONVERT_METASTORE_PARQUET: ConfigEntry[Boolean]
val CONVERT_METASTORE_ORC: ConfigEntry[Boolean]
// Client creation
def newClientForExecution(conf: SparkConf, hadoopConf: Configuration): HiveClientImpl
def newClientForMetadata(conf: SparkConf, hadoopConf: Configuration,
configurations: Map[String, String]): HiveClient
}// Hive version representation
abstract class HiveVersion(
val fullVersion: String,
val extraDeps: Seq[String] = Nil,
val exclusions: Seq[String] = Nil
) extends Ordered[HiveVersion]
// Raw Hive table interface
trait RawHiveTable {
def rawTable: Object
def toCatalogTable: CatalogTable
def hiveTableProps(): Map[String, String]
}
// UDF wrapper
case class HiveFunctionWrapper(functionClassName: String)
// UDAF buffer
case class HiveUDAFBuffer(buf: AggregationBuffer, canDoMerge: Boolean)object hive {
case object v2_0 extends HiveVersion("2.0.1")
case object v2_1 extends HiveVersion("2.1.1")
case object v2_2 extends HiveVersion("2.2.0")
case object v2_3 extends HiveVersion("2.3.10")
case object v3_0 extends HiveVersion("3.0.0")
case object v3_1 extends HiveVersion("3.1.3")
case object v4_0 extends HiveVersion("4.0.1")
val allSupportedHiveVersions: Set[HiveVersion]
}