This document covers the primary entry points and configuration utilities for Apache Spark Hive integration, including the legacy HiveContext and modern SparkSession approaches.
The modern approach uses SparkSession with Hive support enabled instead of the deprecated HiveContext.
object SparkSession {
def builder(): Builder
}
class Builder {
def appName(name: String): Builder
def config(key: String, value: String): Builder
def config(key: String, value: Long): Builder
def config(key: String, value: Double): Builder
def config(key: String, value: Boolean): Builder
def enableHiveSupport(): Builder
def getOrCreate(): SparkSession
}Usage Example:
import org.apache.spark.sql.SparkSession
val spark = SparkSession.builder()
.appName("Hive Integration Application")
.config("spark.sql.warehouse.dir", "/user/hive/warehouse")
.config("hive.metastore.uris", "thrift://localhost:9083")
.enableHiveSupport()
.getOrCreate()
// Use HiveQL
val df = spark.sql("SELECT * FROM my_hive_table WHERE year = 2023")
df.show()
// Access catalog
spark.catalog.listTables("default").show()
// Create tables
spark.sql("""
CREATE TABLE IF NOT EXISTS employee (
id INT,
name STRING,
department STRING
) USING HIVE
PARTITIONED BY (year INT)
""")Note: HiveContext is deprecated as of Spark 2.0.0. Use SparkSession with enableHiveSupport() instead.
class HiveContext private[hive](_sparkSession: SparkSession) extends SQLContext(_sparkSession) {
def this(sc: SparkContext)
def this(sc: JavaSparkContext)
def newSession(): HiveContext
def refreshTable(tableName: String): Unit
}Legacy Usage Example:
import org.apache.spark.sql.hive.HiveContext
import org.apache.spark.SparkContext
val sc = new SparkContext()
val hiveContext = new HiveContext(sc)
// Execute HiveQL
val results = hiveContext.sql("SELECT COUNT(*) FROM my_table")
results.show()
// Refresh table metadata
hiveContext.refreshTable("my_table")
// Create new session
val newSession = hiveContext.newSession()The HiveUtils object provides configuration constants and utility functions for Hive integration setup.
object HiveUtils extends Logging {
// Version constants
val hiveExecutionVersion: String // "1.2.1"
// Configuration entries
val HIVE_METASTORE_VERSION: ConfigEntry[String]
val HIVE_EXECUTION_VERSION: ConfigEntry[String]
val HIVE_METASTORE_JARS: ConfigEntry[String]
val CONVERT_METASTORE_PARQUET: ConfigEntry[Boolean]
val CONVERT_METASTORE_ORC: ConfigEntry[Boolean]
val HIVE_METASTORE_SHARED_PREFIXES: ConfigEntry[Seq[String]]
val HIVE_METASTORE_BARRIER_PREFIXES: ConfigEntry[Seq[String]]
val HIVE_THRIFT_SERVER_ASYNC: ConfigEntry[Boolean]
// Utility methods
def withHiveExternalCatalog(sc: SparkContext): SparkContext
def newTemporaryConfiguration(useInMemoryDerby: Boolean): Map[String, String]
def inferSchema(table: CatalogTable): CatalogTable
}Access configuration constants through HiveUtils:
import org.apache.spark.sql.hive.HiveUtils
// Set metastore version
spark.conf.set(HiveUtils.HIVE_METASTORE_VERSION.key, "2.1.1")
// Configure metastore JARs location
spark.conf.set(HiveUtils.HIVE_METASTORE_JARS.key, "maven")
// Enable ORC conversion
spark.conf.set(HiveUtils.CONVERT_METASTORE_ORC.key, "true")
// Enable Parquet conversion
spark.conf.set(HiveUtils.CONVERT_METASTORE_PARQUET.key, "true")HIVE_METASTORE_VERSION: Specify Hive metastore version for compatibility
HIVE_METASTORE_JARS: Location of Hive metastore JARs
CONVERT_METASTORE_PARQUET: Use Spark's native Parquet reader
CONVERT_METASTORE_ORC: Use Spark's native ORC reader
Builder for creating Hive-enabled Spark SQL session state with proper catalog and analyzer setup.
class HiveSessionStateBuilder(
session: SparkSession,
parentState: Option[SessionState] = None
) extends BaseSessionStateBuilder(session, parentState) {
override protected def catalog: HiveSessionCatalog
override protected def externalCatalog: ExternalCatalog
override protected def analyzer: Analyzer
override protected def optimizer: Optimizer
override protected def planner: SparkPlanner
}This class is typically used internally by Spark when creating Hive-enabled sessions, but can be extended for custom session state requirements.
Configures a SparkContext to use Hive as the external catalog:
import org.apache.spark.sql.hive.HiveUtils
val sc = new SparkContext()
val hiveEnabledSc = HiveUtils.withHiveExternalCatalog(sc)Creates temporary configuration for testing with in-memory Derby database:
val tempConfig = HiveUtils.newTemporaryConfiguration(useInMemoryDerby = true)
// Returns Map[String, String] with Derby-specific Hive settingsInfers and validates schema information for Hive tables:
val inferredTable = HiveUtils.inferSchema(catalogTable)
// Returns CatalogTable with inferred schema informationThe Hive integration supports multiple Hive versions through configurable metastore and execution versions:
// Set metastore version different from execution version
spark.conf.set(HiveUtils.HIVE_METASTORE_VERSION.key, "2.1.1")
spark.conf.set(HiveUtils.HIVE_EXECUTION_VERSION.key, "1.2.1")
// Use Maven to download specific version JARs
spark.conf.set(HiveUtils.HIVE_METASTORE_JARS.key, "maven")Common errors and their resolutions:
Occurs when Hive JARs are not available:
// Solution: Configure metastore JARs
spark.conf.set(HiveUtils.HIVE_METASTORE_JARS.key, "maven")When metastore and execution versions conflict:
// Solution: Set compatible versions
spark.conf.set(HiveUtils.HIVE_METASTORE_VERSION.key, "1.2.1")When unable to connect to Hive metastore:
// Solution: Configure metastore URI
spark.conf.set("hive.metastore.uris", "thrift://metastore-host:9083")Before (Deprecated):
val hiveContext = new HiveContext(sc)
val df = hiveContext.sql("SELECT * FROM table")After (Recommended):
val spark = SparkSession.builder()
.enableHiveSupport()
.getOrCreate()
val df = spark.sql("SELECT * FROM table")HiveContext-specific configurations should be moved to SparkSession:
Before:
hiveContext.setConf("hive.exec.dynamic.partition", "true")After:
spark.conf.set("hive.exec.dynamic.partition", "true")// Configuration entry for Hive settings
abstract class ConfigEntry[T] {
def key: String
def defaultValue: Option[T]
def doc: String
def valueConverter: String => T
def stringConverter: T => String
}
// Session state builder base class
abstract class BaseSessionStateBuilder(
session: SparkSession,
parentState: Option[SessionState]
) {
protected def catalog: SessionCatalog
protected def analyzer: Analyzer
protected def optimizer: Optimizer
def build(): SessionState
}
// Hive-specific session catalog
class HiveSessionCatalog(
externalCatalogBuilder: () => ExternalCatalog,
globalTempViewManagerBuilder: () => GlobalTempViewManager,
functionRegistry: FunctionRegistry,
conf: SQLConf,
hadoopConf: Configuration,
parser: ParserInterface,
functionResourceLoader: FunctionResourceLoader
) extends SessionCatalog