or run

tessl search
Log in

Version

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
mavenpkg:maven/org.apache.spark/spark-hive_2.11@1.6.x

docs

client-interface.mdexecution-engine.mdhive-context.mdhiveql-parser.mdindex.mdorc-support.mdtype-system.mdudf-support.md
tile.json

tessl/maven-org-apache-spark--spark-hive

tessl install tessl/maven-org-apache-spark--spark-hive@1.6.0

Apache Spark SQL Hive integration module providing HiveContext, metastore operations, HiveQL parsing, and Hive data format compatibility

hive-context.mddocs/

HiveContext

HiveContext is the primary entry point for Hive integration in Spark SQL. It extends SQLContext with comprehensive Hive capabilities including metastore integration, HiveQL parsing, and Hive-specific configuration management.

Core Functionality

Context Creation

class HiveContext private[hive](
  sc: SparkContext,
  cacheManager: CacheManager,
  listener: SQLListener,
  execHive: ClientWrapper,
  metaHive: ClientInterface,
  isRootContext: Boolean
) extends SQLContext(sc, cacheManager, listener, isRootContext)

// Public constructors
def this(sc: SparkContext): HiveContext
def this(sc: JavaSparkContext): HiveContext

Session Management

def newSession(): HiveContext

Creates a new HiveContext session with separated SQLConf, UDF/UDAF definitions, temporary tables, and SessionState, while sharing CacheManager, IsolatedClientLoader, and Hive clients.

Usage Example:

val originalContext = new HiveContext(sc)
val newSessionContext = originalContext.newSession()
// newSessionContext has isolated configuration and temporary tables

Table Management

def refreshTable(tableName: String): Unit
def analyze(tableName: String): Unit

refreshTable invalidates and refreshes cached metadata for a table. Use when table structure or data changes outside of Spark SQL.

analyze generates table statistics for query optimization.

Usage Examples:

// Refresh table after external changes
hiveContext.refreshTable("sales.customer_data")

// Generate statistics for better query planning
hiveContext.analyze("sales.large_fact_table")

Configuration Properties

Metastore Configuration

protected[hive] def hiveMetastoreVersion: String
protected[hive] def hiveMetastoreJars: String
protected[hive] def hiveMetastoreSharedPrefixes: Seq[String]
protected[hive] def hiveMetastoreBarrierPrefixes: Seq[String]

hiveMetastoreVersion - Version of Hive client for metastore communication (default: "1.2.1")

hiveMetastoreJars - Location of Hive JAR files:

  • "builtin" - Use JARs from Spark classpath
  • "maven" - Download JARs from Maven automatically
  • "path" - Specific filesystem path to JAR files

hiveMetastoreSharedPrefixes - Class prefixes shared between Spark and Hive (e.g., JDBC drivers)

hiveMetastoreBarrierPrefixes - Class prefixes that should be reloaded for each Hive version

Format Conversion Settings

protected[sql] def convertMetastoreParquet: Boolean
protected[sql] def convertMetastoreParquetWithSchemaMerging: Boolean
protected[sql] def convertCTAS: Boolean

convertMetastoreParquet - Automatically convert Parquet SerDe tables to native Spark SQL Parquet scans

convertMetastoreParquetWithSchemaMerging - Merge compatible Parquet schemas across files (requires convertMetastoreParquet = true)

convertCTAS - Convert Hive CTAS statements to data source tables using spark.sql.sources.default

Server Configuration

protected[hive] def hiveThriftServerAsync: Boolean

hiveThriftServerAsync - Enable background thread pool for Thrift server SQL execution

Configuration Constants

val hiveExecutionVersion: String = "1.2.1"

// Configuration entries
val HIVE_METASTORE_VERSION: SQLConfEntry[String]
val HIVE_EXECUTION_VERSION: SQLConfEntry[String]
val HIVE_METASTORE_JARS: SQLConfEntry[String]
val CONVERT_METASTORE_PARQUET: SQLConfEntry[Boolean]
val CONVERT_METASTORE_PARQUET_WITH_SCHEMA_MERGING: SQLConfEntry[Boolean]
val CONVERT_CTAS: SQLConfEntry[Boolean]
val HIVE_METASTORE_SHARED_PREFIXES: SQLConfEntry[Seq[String]]
val HIVE_METASTORE_BARRIER_PREFIXES: SQLConfEntry[Seq[String]]
val HIVE_THRIFT_SERVER_ASYNC: SQLConfEntry[Boolean]

Utility Methods

Configuration Management

def newTemporaryConfiguration(): Map[String, String]
def setConf(key: String, value: String): Unit
def setConf[T](entry: SQLConfEntry[T], value: T): Unit
def getConf(key: String): String
def getConf(key: String, defaultValue: String): String

newTemporaryConfiguration - Creates temporary Hive configuration for execution client with safe defaults

setConf - Sets configuration property by key-value pair or SQLConfEntry

getConf - Retrieves configuration value with optional default

Type Conversion

def toHiveString(a: (Any, DataType)): String

Converts Catalyst values to Hive-compatible string representations.

Internal Components

Execution and Metadata Clients

protected[hive] lazy val executionHive: ClientWrapper
protected[hive] lazy val metadataHive: ClientInterface

executionHive - Hive client for execution-related tasks (always Hive 1.2.1)

metadataHive - Hive client for metastore operations (version configurable)

SQL Parsing

protected[sql] override def parseSql(sql: String): LogicalPlan

Parses SQL with Hive variable substitution and delegates to HiveQl parser.

Usage Patterns

Basic Setup

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.hive.HiveContext

val conf = new SparkConf().setAppName("HiveIntegration")
val sc = new SparkContext(conf)
val hiveContext = new HiveContext(sc)

// Execute HiveQL
val df = hiveContext.sql("SELECT * FROM hive_table")
df.show()

Advanced Configuration

val hiveContext = new HiveContext(sc)

// Configure Hive metastore
hiveContext.setConf("spark.sql.hive.metastore.version", "1.2.1")
hiveContext.setConf("spark.sql.hive.metastore.jars", "maven")
hiveContext.setConf("spark.sql.hive.convertMetastoreParquet", "true")

// Refresh external table after changes
hiveContext.refreshTable("external_db.updated_table")

// Generate statistics for optimization
hiveContext.analyze("warehouse.fact_sales")

Multi-Session Usage

val mainContext = new HiveContext(sc)
val analyticsSession = mainContext.newSession()
val etlSession = mainContext.newSession()

// Each session has isolated temporary tables and configuration
mainContext.sql("CREATE TEMPORARY VIEW main_view AS SELECT * FROM table1")
analyticsSession.sql("CREATE TEMPORARY VIEW analytics_view AS SELECT * FROM table2")
// main_view is not visible in analyticsSession

Error Handling

Common exceptions and their handling:

  • IllegalArgumentException - Invalid configuration (e.g., mismatched Hive versions)
  • AnalysisException - Table not found or schema issues
  • HiveException - Hive metastore connectivity or operation failures
try {
  hiveContext.refreshTable("non_existent_table")
} catch {
  case e: AnalysisException => 
    println(s"Table not found: ${e.getMessage}")
  case e: HiveException => 
    println(s"Hive operation failed: ${e.getMessage}")
}