or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

core-hive-integration.mdexecution-engine.mdfile-formats.mdindex.mdmetastore-operations.mdudf-integration.md
tile.json

core-hive-integration.mddocs/

Core Hive Integration

This document covers the primary entry points and configuration utilities for Apache Spark Hive integration, including the legacy HiveContext and modern SparkSession approaches.

SparkSession with Hive Support (Recommended)

The modern approach uses SparkSession with Hive support enabled instead of the deprecated HiveContext.

Creating a Hive-Enabled SparkSession

object SparkSession {
  def builder(): Builder
}

class Builder {
  def appName(name: String): Builder
  def config(key: String, value: String): Builder
  def config(key: String, value: Long): Builder
  def config(key: String, value: Double): Builder
  def config(key: String, value: Boolean): Builder
  def enableHiveSupport(): Builder
  def getOrCreate(): SparkSession
}

Usage Example:

import org.apache.spark.sql.SparkSession

val spark = SparkSession.builder()
  .appName("Hive Integration Application")
  .config("spark.sql.warehouse.dir", "/user/hive/warehouse")
  .config("hive.metastore.uris", "thrift://localhost:9083")
  .enableHiveSupport()
  .getOrCreate()

// Use HiveQL
val df = spark.sql("SELECT * FROM my_hive_table WHERE year = 2023")
df.show()

// Access catalog
spark.catalog.listTables("default").show()

// Create tables
spark.sql("""
  CREATE TABLE IF NOT EXISTS employee (
    id INT,
    name STRING,
    department STRING
  ) USING HIVE
  PARTITIONED BY (year INT)
""")

HiveContext (Deprecated)

Note: HiveContext is deprecated as of Spark 2.0.0. Use SparkSession with enableHiveSupport() instead.

class HiveContext private[hive](_sparkSession: SparkSession) extends SQLContext(_sparkSession) {
  def this(sc: SparkContext)
  def this(sc: JavaSparkContext)
  
  def newSession(): HiveContext
  def refreshTable(tableName: String): Unit
}

Legacy Usage Example:

import org.apache.spark.sql.hive.HiveContext
import org.apache.spark.SparkContext

val sc = new SparkContext()
val hiveContext = new HiveContext(sc)

// Execute HiveQL
val results = hiveContext.sql("SELECT COUNT(*) FROM my_table")
results.show()

// Refresh table metadata
hiveContext.refreshTable("my_table")

// Create new session
val newSession = hiveContext.newSession()

HiveUtils Configuration

The HiveUtils object provides configuration constants and utility functions for Hive integration setup.

object HiveUtils extends Logging {
  // Version constants
  val hiveExecutionVersion: String  // "1.2.1"
  
  // Configuration entries
  val HIVE_METASTORE_VERSION: ConfigEntry[String]
  val HIVE_EXECUTION_VERSION: ConfigEntry[String]
  val HIVE_METASTORE_JARS: ConfigEntry[String]
  val CONVERT_METASTORE_PARQUET: ConfigEntry[Boolean]
  val CONVERT_METASTORE_ORC: ConfigEntry[Boolean]
  val HIVE_METASTORE_SHARED_PREFIXES: ConfigEntry[Seq[String]]
  val HIVE_METASTORE_BARRIER_PREFIXES: ConfigEntry[Seq[String]]
  val HIVE_THRIFT_SERVER_ASYNC: ConfigEntry[Boolean]
  
  // Utility methods
  def withHiveExternalCatalog(sc: SparkContext): SparkContext
  def newTemporaryConfiguration(useInMemoryDerby: Boolean): Map[String, String]
  def inferSchema(table: CatalogTable): CatalogTable
}

Configuration Properties

Access configuration constants through HiveUtils:

import org.apache.spark.sql.hive.HiveUtils

// Set metastore version
spark.conf.set(HiveUtils.HIVE_METASTORE_VERSION.key, "2.1.1")

// Configure metastore JARs location
spark.conf.set(HiveUtils.HIVE_METASTORE_JARS.key, "maven")

// Enable ORC conversion
spark.conf.set(HiveUtils.CONVERT_METASTORE_ORC.key, "true")

// Enable Parquet conversion  
spark.conf.set(HiveUtils.CONVERT_METASTORE_PARQUET.key, "true")

Key Configuration Options

  • HIVE_METASTORE_VERSION: Specify Hive metastore version for compatibility

    • Default: Current Spark's built-in version
    • Example values: "1.2.1", "2.1.1", "2.3.0"
  • HIVE_METASTORE_JARS: Location of Hive metastore JARs

    • "builtin": Use Spark's built-in Hive JARs
    • "maven": Download JARs from Maven repository
    • Classpath string: Use JARs from specified classpath
  • CONVERT_METASTORE_PARQUET: Use Spark's native Parquet reader

    • Default: true
    • When enabled, provides better performance and predicate pushdown
  • CONVERT_METASTORE_ORC: Use Spark's native ORC reader

    • Default: true
    • Enables vectorized ORC reading and advanced optimizations

HiveSessionStateBuilder

Builder for creating Hive-enabled Spark SQL session state with proper catalog and analyzer setup.

class HiveSessionStateBuilder(
  session: SparkSession, 
  parentState: Option[SessionState] = None
) extends BaseSessionStateBuilder(session, parentState) {

  override protected def catalog: HiveSessionCatalog
  override protected def externalCatalog: ExternalCatalog  
  override protected def analyzer: Analyzer
  override protected def optimizer: Optimizer
  override protected def planner: SparkPlanner
}

This class is typically used internally by Spark when creating Hive-enabled sessions, but can be extended for custom session state requirements.

Utility Methods

withHiveExternalCatalog

Configures a SparkContext to use Hive as the external catalog:

import org.apache.spark.sql.hive.HiveUtils

val sc = new SparkContext()
val hiveEnabledSc = HiveUtils.withHiveExternalCatalog(sc)

newTemporaryConfiguration

Creates temporary configuration for testing with in-memory Derby database:

val tempConfig = HiveUtils.newTemporaryConfiguration(useInMemoryDerby = true)
// Returns Map[String, String] with Derby-specific Hive settings

inferSchema

Infers and validates schema information for Hive tables:

val inferredTable = HiveUtils.inferSchema(catalogTable)
// Returns CatalogTable with inferred schema information

Version Compatibility

The Hive integration supports multiple Hive versions through configurable metastore and execution versions:

Supported Hive Versions

  • 0.12.0 (v12)
  • 0.13.1 (v13)
  • 0.14.0 (v14)
  • 1.0.0 (v1_0)
  • 1.1.0 (v1_1)
  • 1.2.1 (v1_2) - Default execution version
  • 2.0.1 (v2_0)
  • 2.1.1 (v2_1)

Setting Specific Versions

// Set metastore version different from execution version
spark.conf.set(HiveUtils.HIVE_METASTORE_VERSION.key, "2.1.1")
spark.conf.set(HiveUtils.HIVE_EXECUTION_VERSION.key, "1.2.1") 

// Use Maven to download specific version JARs
spark.conf.set(HiveUtils.HIVE_METASTORE_JARS.key, "maven")

Error Handling

Common errors and their resolutions:

NoClassDefFoundError

Occurs when Hive JARs are not available:

// Solution: Configure metastore JARs
spark.conf.set(HiveUtils.HIVE_METASTORE_JARS.key, "maven")

Version Incompatibility

When metastore and execution versions conflict:

// Solution: Set compatible versions
spark.conf.set(HiveUtils.HIVE_METASTORE_VERSION.key, "1.2.1")

Metastore Connection Issues

When unable to connect to Hive metastore:

// Solution: Configure metastore URI
spark.conf.set("hive.metastore.uris", "thrift://metastore-host:9083")

Migration Guide

From HiveContext to SparkSession

Before (Deprecated):

val hiveContext = new HiveContext(sc)
val df = hiveContext.sql("SELECT * FROM table")

After (Recommended):

val spark = SparkSession.builder()
  .enableHiveSupport()
  .getOrCreate()
val df = spark.sql("SELECT * FROM table")

Configuration Migration

HiveContext-specific configurations should be moved to SparkSession:

Before:

hiveContext.setConf("hive.exec.dynamic.partition", "true")

After:

spark.conf.set("hive.exec.dynamic.partition", "true")

Types

// Configuration entry for Hive settings
abstract class ConfigEntry[T] {
  def key: String
  def defaultValue: Option[T]
  def doc: String
  def valueConverter: String => T
  def stringConverter: T => String
}

// Session state builder base class
abstract class BaseSessionStateBuilder(
  session: SparkSession,
  parentState: Option[SessionState]
) {
  protected def catalog: SessionCatalog
  protected def analyzer: Analyzer
  protected def optimizer: Optimizer
  def build(): SessionState
}

// Hive-specific session catalog
class HiveSessionCatalog(
  externalCatalogBuilder: () => ExternalCatalog,
  globalTempViewManagerBuilder: () => GlobalTempViewManager,
  functionRegistry: FunctionRegistry,
  conf: SQLConf,
  hadoopConf: Configuration,
  parser: ParserInterface,
  functionResourceLoader: FunctionResourceLoader
) extends SessionCatalog