Apache Spark SQL Hive Integration providing compatibility layer for running Hive queries and accessing Hive tables through Spark's distributed processing engine
npx @tessl/cli install tessl/maven-org-apache-spark--spark-hive_2-10@2.2.0Apache Spark SQL Hive Integration provides a comprehensive compatibility layer for running Hive queries and accessing Hive tables through Spark's distributed processing engine. This module enables organizations to leverage Spark's high-performance capabilities while maintaining compatibility with existing Hive-based data warehousing infrastructure.
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-hive_2.10</artifactId>
<version>2.2.3</version>
</dependency>import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.hive.HiveContext // Deprecated
import org.apache.spark.sql.hive.HiveUtilsimport org.apache.spark.sql.SparkSession
// Create SparkSession with Hive support
val spark = SparkSession.builder()
.appName("Hive Integration Example")
.config("hive.metastore.uris", "thrift://localhost:9083")
.enableHiveSupport()
.getOrCreate()
// Use HiveQL
val df = spark.sql("SELECT * FROM hive_table")
df.show()
// Access Hive tables
val table = spark.table("my_database.my_table")
table.createOrReplaceTempView("temp_table")import org.apache.spark.sql.hive.HiveContext
import org.apache.spark.SparkContext
val sc = new SparkContext()
val hiveContext = new HiveContext(sc)
// Run HiveQL queries
val results = hiveContext.sql("SELECT * FROM hive_table")
results.show()The Spark Hive integration is built around several key components:
Primary entry points and configuration utilities for Hive integration.
// Modern entry point (recommended)
object SparkSession {
def builder(): Builder
}
class Builder {
def enableHiveSupport(): Builder
}
// Legacy entry point (deprecated since 2.0.0)
class HiveContext(sparkSession: SparkSession) extends SQLContext(sparkSession) {
def this(sc: SparkContext)
def this(sc: JavaSparkContext)
def newSession(): HiveContext
def refreshTable(tableName: String): Unit
}
// Configuration utilities
object HiveUtils {
val hiveExecutionVersion: String
val HIVE_METASTORE_VERSION: ConfigEntry[String]
val CONVERT_METASTORE_PARQUET: ConfigEntry[Boolean]
val CONVERT_METASTORE_ORC: ConfigEntry[Boolean]
def withHiveExternalCatalog(sc: SparkContext): SparkContext
def newTemporaryConfiguration(useInMemoryDerby: Boolean): Map[String, String]
}Native support for ORC and Hive-compatible file formats with optimization features.
class OrcFileFormat extends FileFormat with DataSourceRegister {
def shortName(): String
def inferSchema(
sparkSession: SparkSession,
options: Map[String, String],
files: Seq[FileStatus]
): Option[StructType]
def prepareWrite(
sparkSession: SparkSession,
job: Job,
options: Map[String, String],
dataSchema: StructType
): OutputWriterFactory
}
class HiveFileFormat(fileSinkConf: FileSinkDesc) extends FileFormat {
def prepareWrite(
sparkSession: SparkSession,
job: Job,
options: Map[String, String],
dataSchema: StructType
): OutputWriterFactory
}Comprehensive support for executing Hive user-defined functions within Spark.
// Simple UDFs
case class HiveSimpleUDF(
name: String,
funcWrapper: HiveFunctionWrapper,
children: Seq[Expression]
) extends Expression {
def eval(input: InternalRow): Any
def prettyName: String
}
// Generic UDFs
case class HiveGenericUDF(
name: String,
funcWrapper: HiveFunctionWrapper,
children: Seq[Expression]
) extends Expression
// Aggregate functions (UDAFs)
case class HiveUDAFFunction(
name: String,
funcWrapper: HiveFunctionWrapper,
children: Seq[Expression]
) extends TypedImperativeAggregate[AggregationBuffer]
// Table-generating functions (UDTFs)
case class HiveGenericUDTF(
name: String,
funcWrapper: HiveFunctionWrapper,
children: Seq[Expression]
) extends GeneratorInterface for interacting with Hive metastore for database, table, and partition management.
trait HiveClient {
// Version and configuration
def version: HiveVersion
def getConf(key: String, defaultValue: String): String
// SQL execution
def runSqlHive(sql: String): Seq[String]
// Database operations
def listTables(dbName: String): Seq[String]
def setCurrentDatabase(databaseName: String): Unit
def getDatabase(name: String): CatalogDatabase
def databaseExists(dbName: String): Boolean
// Table operations
def tableExists(dbName: String, tableName: String): Boolean
def getTable(dbName: String, tableName: String): CatalogTable
def createTable(table: CatalogTable, ignoreIfExists: Boolean): Unit
def dropTable(dbName: String, tableName: String, ignoreIfNotExists: Boolean, purge: Boolean): Unit
// Partition operations
def createPartitions(db: String, table: String, parts: Seq[CatalogTablePartition], ignoreIfExists: Boolean): Unit
def listPartitions(db: String, table: String, partialSpec: Option[TablePartitionSpec]): Seq[CatalogTablePartition]
// Function operations
def createFunction(db: String, func: CatalogFunction): Unit
def listFunctions(db: String, pattern: String): Seq[String]
}Physical execution plans and strategies for Hive table operations.
case class HiveTableScanExec(
requestedAttributes: Seq[Attribute],
relation: HiveTableRelation,
partitionPruningPred: Seq[Expression]
) extends LeafExecNode {
def doExecute(): RDD[InternalRow]
}
case class InsertIntoHiveTable(
table: CatalogTable,
partition: Map[String, Option[String]],
query: LogicalPlan,
overwrite: Boolean,
ifPartitionNotExists: Boolean
) extends UnaryCommand {
def run(sparkSession: SparkSession, child: SparkPlan): Seq[Row]
}
case class CreateHiveTableAsSelectCommand(
tableDesc: CatalogTable,
query: LogicalPlan,
ignoreIfExists: Boolean
) extends DataWritingCommandThe HiveContext class is deprecated as of Spark 2.0.0. Migration steps:
HiveContext with SparkSession.builder().enableHiveSupport()spark.sql() instead of hiveContext.sql() for queriesspark.catalog instead of direct metastore callsCommon exceptions and error patterns:
// Configuration entries
abstract class ConfigEntry[T] {
def key: String
def defaultValue: Option[T]
def doc: String
}
// Hive version representation
abstract class HiveVersion {
def fullVersion: String
def extraDeps: Seq[String]
def exclusions: Seq[String]
}
// Function wrapper for Hive UDFs
case class HiveFunctionWrapper(
className: String,
instance: AnyRef
)
// Table partition specification
type TablePartitionSpec = Map[String, String]
// Catalog types (from Spark SQL)
case class CatalogTable(
identifier: TableIdentifier,
tableType: CatalogTableType,
storage: CatalogStorageFormat,
schema: StructType,
partitionColumnNames: Seq[String] = Seq.empty,
bucketSpec: Option[BucketSpec] = None
)
case class CatalogTablePartition(
spec: TablePartitionSpec,
storage: CatalogStorageFormat,
parameters: Map[String, String] = Map.empty
)
case class CatalogDatabase(
name: String,
description: String,
locationUri: String,
properties: Map[String, String]
)
case class CatalogFunction(
identifier: FunctionIdentifier,
className: String,
resources: Seq[FunctionResource]
)