Apache Spark Hive integration module that provides compatibility with Apache Hive for Spark SQL operations
npx @tessl/cli install tessl/maven-org-apache-spark--spark-hive@3.5.0Apache Spark Hive module provides comprehensive integration between Apache Spark and Apache Hive, enabling Spark SQL to seamlessly interact with Hive tables, metastore, and storage formats. This module serves as the bridge between Spark SQL and the Hive ecosystem for backward compatibility and hybrid environments.
org.apache.spark:spark-hive_2.13:3.5.6import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.hive.HiveUtils
import org.apache.spark.sql.hive.client.HiveClient
import org.apache.spark.sql.hive.HiveExternalCatalog
import org.apache.spark.sql.catalyst.catalog._For UDF support:
import org.apache.spark.sql.hive.HiveSimpleUDF
import org.apache.spark.sql.hive.HiveGenericUDF
import org.apache.spark.sql.hive.HiveUDAFFunctionThe primary way to enable Hive support in Spark is through SparkSession:
import org.apache.spark.sql.SparkSession
val spark = SparkSession.builder()
.appName("Hive Integration Example")
.config("spark.sql.catalogImplementation", "hive")
.enableHiveSupport()
.getOrCreate()
// Access Hive tables
val df = spark.sql("SELECT * FROM hive_table")
df.show()
// Create Hive table
spark.sql("""
CREATE TABLE user_data (
id INT,
name STRING,
age INT
) USING HIVE
""")The Spark Hive module is organized around several key architectural components:
HiveSessionStateBuilder and HiveSessionCatalog provide Hive-enabled session stateHiveExternalCatalog and HiveClient interface with Hive metastore for metadata operationsHiveStrategies and analysis rules convert Hive operations to Spark physical plansOrcFileFormat and related classes handle Hive file formatsHiveTableScanExec for Hive table operationsHiveDelegationTokenProvider handles authentication in secure clustersCore session management and configuration for enabling Hive support in Spark SQL sessions.
object SparkSession {
def builder(): Builder
}
class Builder {
def enableHiveSupport(): Builder
def config(key: String, value: String): Builder
def getOrCreate(): SparkSession
}Integration with Hive metastore for table metadata, database operations, and catalog management.
// Configuration constants for Hive metastore integration
object HiveUtils {
val CONVERT_METASTORE_PARQUET: ConfigEntry[Boolean]
val CONVERT_METASTORE_ORC: ConfigEntry[Boolean]
val CONVERT_INSERTING_PARTITIONED_TABLE: ConfigEntry[Boolean]
}Support for Hive-compatible file formats, particularly ORC files with Hive metadata integration.
class OrcFileFormat extends FileFormat {
def inferSchema(
sparkSession: SparkSession,
options: Map[String, String],
files: Seq[FileStatus]
): Option[StructType]
def prepareWrite(
sparkSession: SparkSession,
job: Job,
options: Map[String, String],
dataSchema: StructType
): OutputWriterFactory
}Specialized execution plans and strategies for Hive table operations and query processing.
case class HiveTableScanExec(
requestedAttributes: Seq[Attribute],
relation: HiveTableRelation,
partitionPruningPred: Seq[Expression]
)(@transient private val sparkSession: SparkSession) extends LeafExecNodeDirect interface for interacting with Hive metastore providing database, table, partition, and function operations.
private[hive] trait HiveClient {
def version: HiveVersion
def getDatabase(name: String): CatalogDatabase
def listDatabases(pattern: String): Seq[String]
def createDatabase(database: CatalogDatabase, ignoreIfExists: Boolean): Unit
def dropDatabase(name: String, ignoreIfNotExists: Boolean, cascade: Boolean): Unit
def getTable(dbName: String, tableName: String): CatalogTable
def createTable(table: CatalogTable, ignoreIfExists: Boolean): Unit
def dropTable(dbName: String, tableName: String, ignoreIfNotExists: Boolean, purge: Boolean): Unit
}Comprehensive configuration constants and utilities for Hive integration behavior and metastore connection.
object HiveUtils {
val HIVE_METASTORE_VERSION: ConfigEntry[String]
val HIVE_METASTORE_JARS: ConfigEntry[String]
val HIVE_METASTORE_JARS_PATH: ConfigEntry[Seq[String]]
val CONVERT_METASTORE_PARQUET: ConfigEntry[Boolean]
val CONVERT_METASTORE_ORC: ConfigEntry[Boolean]
val HIVE_THRIFT_SERVER_ASYNC: ConfigEntry[Boolean]
}Integration support for Hive User Defined Functions (UDFs), User Defined Aggregate Functions (UDAFs), and User Defined Table Functions (UDTFs).
case class HiveSimpleUDF(
name: String,
funcWrapper: HiveFunctionWrapper,
children: Seq[Expression]
) extends Expression
case class HiveGenericUDF(
name: String,
funcWrapper: HiveFunctionWrapper,
children: Seq[Expression]
) extends Expression
case class HiveUDAFFunction(
name: String,
funcWrapper: HiveFunctionWrapper,
children: Seq[Expression],
isDistinct: Boolean
) extends AggregateFunction// Common type aliases used throughout the API
type TablePartitionSpec = Map[String, String]
type HiveTable = org.apache.hadoop.hive.ql.metadata.Table
// Version information for Hive compatibility
case class HiveVersion(
fullVersion: String,
majorVersion: Int,
minorVersion: Int
) {
def supportsFeature(feature: String): Boolean
}class HiveOptions(parameters: Map[String, String]) {
def fileFormat: Option[String]
def inputFormat: Option[String]
def outputFormat: Option[String]
def serde: Option[String]
def serdeProperties: Map[String, String]
}case class HiveTableRelation(
tableMeta: CatalogTable,
dataCols: Seq[AttributeReference],
partitionCols: Seq[AttributeReference],
tableStats: Option[Statistics],
prunedPartitions: Option[Seq[CatalogTablePartition]]
) extends LogicalRelationThe module provides specific exceptions for Hive integration errors: