or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

core-hive-integration.mdexecution-engine.mdfile-formats.mdindex.mdmetastore-operations.mdudf-integration.md
tile.json

tessl/maven-org-apache-spark--spark-hive_2-10

Apache Spark SQL Hive Integration providing compatibility layer for running Hive queries and accessing Hive tables through Spark's distributed processing engine

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
mavenpkg:maven/org.apache.spark/spark-hive_2.10@2.2.x

To install, run

npx @tessl/cli install tessl/maven-org-apache-spark--spark-hive_2-10@2.2.0

index.mddocs/

Apache Spark Hive Integration

Apache Spark SQL Hive Integration provides a comprehensive compatibility layer for running Hive queries and accessing Hive tables through Spark's distributed processing engine. This module enables organizations to leverage Spark's high-performance capabilities while maintaining compatibility with existing Hive-based data warehousing infrastructure.

Package Information

  • Package Name: spark-hive_2.10
  • Package Type: maven
  • Language: Scala
  • Group ID: org.apache.spark
  • Artifact ID: spark-hive_2.10
  • Version: 2.2.3
  • Maven Installation:
    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-hive_2.10</artifactId>
      <version>2.2.3</version>
    </dependency>

Core Imports

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.hive.HiveContext  // Deprecated
import org.apache.spark.sql.hive.HiveUtils

Basic Usage

Modern Approach (Recommended)

import org.apache.spark.sql.SparkSession

// Create SparkSession with Hive support
val spark = SparkSession.builder()
  .appName("Hive Integration Example")
  .config("hive.metastore.uris", "thrift://localhost:9083")
  .enableHiveSupport()
  .getOrCreate()

// Use HiveQL
val df = spark.sql("SELECT * FROM hive_table")
df.show()

// Access Hive tables
val table = spark.table("my_database.my_table")
table.createOrReplaceTempView("temp_table")

Legacy Approach (Deprecated)

import org.apache.spark.sql.hive.HiveContext
import org.apache.spark.SparkContext

val sc = new SparkContext()
val hiveContext = new HiveContext(sc)

// Run HiveQL queries
val results = hiveContext.sql("SELECT * FROM hive_table")
results.show()

Architecture

The Spark Hive integration is built around several key components:

  • Hive Compatibility Layer: Provides seamless integration with existing Hive metastore and table formats
  • Query Engine: Translates HiveQL queries into Spark execution plans using Catalyst optimizer
  • File Format Support: Native support for ORC, Parquet, and other Hive-compatible formats
  • UDF Integration: Execute existing Hive UDFs, UDAFs, and UDTFs within Spark
  • Metastore Integration: Read/write table metadata from Hive metastore with version compatibility

Capabilities

Core Hive Integration

Primary entry points and configuration utilities for Hive integration.

// Modern entry point (recommended)
object SparkSession {
  def builder(): Builder
}

class Builder {
  def enableHiveSupport(): Builder
}

// Legacy entry point (deprecated since 2.0.0)
class HiveContext(sparkSession: SparkSession) extends SQLContext(sparkSession) {
  def this(sc: SparkContext)
  def this(sc: JavaSparkContext)
  def newSession(): HiveContext
  def refreshTable(tableName: String): Unit
}

// Configuration utilities
object HiveUtils {
  val hiveExecutionVersion: String
  val HIVE_METASTORE_VERSION: ConfigEntry[String]
  val CONVERT_METASTORE_PARQUET: ConfigEntry[Boolean]
  val CONVERT_METASTORE_ORC: ConfigEntry[Boolean]
  
  def withHiveExternalCatalog(sc: SparkContext): SparkContext
  def newTemporaryConfiguration(useInMemoryDerby: Boolean): Map[String, String]
}

Core Hive Integration

File Format Support

Native support for ORC and Hive-compatible file formats with optimization features.

class OrcFileFormat extends FileFormat with DataSourceRegister {
  def shortName(): String
  def inferSchema(
    sparkSession: SparkSession,
    options: Map[String, String], 
    files: Seq[FileStatus]
  ): Option[StructType]
  
  def prepareWrite(
    sparkSession: SparkSession,
    job: Job,
    options: Map[String, String],
    dataSchema: StructType
  ): OutputWriterFactory
}

class HiveFileFormat(fileSinkConf: FileSinkDesc) extends FileFormat {
  def prepareWrite(
    sparkSession: SparkSession, 
    job: Job,
    options: Map[String, String],
    dataSchema: StructType
  ): OutputWriterFactory
}

File Format Support

UDF Integration

Comprehensive support for executing Hive user-defined functions within Spark.

// Simple UDFs
case class HiveSimpleUDF(
  name: String, 
  funcWrapper: HiveFunctionWrapper, 
  children: Seq[Expression]
) extends Expression {
  def eval(input: InternalRow): Any
  def prettyName: String
}

// Generic UDFs
case class HiveGenericUDF(
  name: String,
  funcWrapper: HiveFunctionWrapper, 
  children: Seq[Expression]
) extends Expression

// Aggregate functions (UDAFs)
case class HiveUDAFFunction(
  name: String,
  funcWrapper: HiveFunctionWrapper,
  children: Seq[Expression]
) extends TypedImperativeAggregate[AggregationBuffer]

// Table-generating functions (UDTFs)
case class HiveGenericUDTF(
  name: String,
  funcWrapper: HiveFunctionWrapper,
  children: Seq[Expression]
) extends Generator

UDF Integration

Metastore Operations

Interface for interacting with Hive metastore for database, table, and partition management.

trait HiveClient {
  // Version and configuration
  def version: HiveVersion
  def getConf(key: String, defaultValue: String): String
  
  // SQL execution
  def runSqlHive(sql: String): Seq[String]
  
  // Database operations
  def listTables(dbName: String): Seq[String]
  def setCurrentDatabase(databaseName: String): Unit
  def getDatabase(name: String): CatalogDatabase
  def databaseExists(dbName: String): Boolean
  
  // Table operations
  def tableExists(dbName: String, tableName: String): Boolean
  def getTable(dbName: String, tableName: String): CatalogTable
  def createTable(table: CatalogTable, ignoreIfExists: Boolean): Unit
  def dropTable(dbName: String, tableName: String, ignoreIfNotExists: Boolean, purge: Boolean): Unit
  
  // Partition operations
  def createPartitions(db: String, table: String, parts: Seq[CatalogTablePartition], ignoreIfExists: Boolean): Unit
  def listPartitions(db: String, table: String, partialSpec: Option[TablePartitionSpec]): Seq[CatalogTablePartition]
  
  // Function operations
  def createFunction(db: String, func: CatalogFunction): Unit
  def listFunctions(db: String, pattern: String): Seq[String]
}

Metastore Operations

Execution Engine

Physical execution plans and strategies for Hive table operations.

case class HiveTableScanExec(
  requestedAttributes: Seq[Attribute],
  relation: HiveTableRelation,
  partitionPruningPred: Seq[Expression]
) extends LeafExecNode {
  def doExecute(): RDD[InternalRow]
}

case class InsertIntoHiveTable(
  table: CatalogTable,
  partition: Map[String, Option[String]],
  query: LogicalPlan,
  overwrite: Boolean,
  ifPartitionNotExists: Boolean
) extends UnaryCommand {
  def run(sparkSession: SparkSession, child: SparkPlan): Seq[Row]
}

case class CreateHiveTableAsSelectCommand(
  tableDesc: CatalogTable,
  query: LogicalPlan,
  ignoreIfExists: Boolean
) extends DataWritingCommand

Execution Engine

Configuration

Key Hive Configuration Properties

  • spark.sql.hive.metastore.version: Hive metastore version (default: 1.2.1)
  • spark.sql.hive.metastore.jars: Location of Hive metastore JARs ("builtin", "maven", or classpath)
  • spark.sql.hive.convertMetastoreParquet: Use Spark's Parquet reader for Hive tables (default: true)
  • spark.sql.hive.convertMetastoreOrc: Use Spark's ORC reader for Hive tables (default: true)

Supported Hive Versions

  • Hive 0.12.0 through 2.1.1
  • Default execution version: 1.2.1
  • Configurable metastore version for compatibility

Migration from HiveContext

The HiveContext class is deprecated as of Spark 2.0.0. Migration steps:

  1. Replace HiveContext with SparkSession.builder().enableHiveSupport()
  2. Update configuration from Hive-specific settings to Spark SQL settings
  3. Use spark.sql() instead of hiveContext.sql() for queries
  4. Access catalog through spark.catalog instead of direct metastore calls

Error Handling

Common exceptions and error patterns:

  • AnalysisException: Thrown for invalid table references or schema mismatches
  • HiveException: Wrapper for underlying Hive metastore errors
  • UnsupportedOperationException: For unsupported Hive features or version incompatibilities

Types

// Configuration entries
abstract class ConfigEntry[T] {
  def key: String
  def defaultValue: Option[T]
  def doc: String
}

// Hive version representation
abstract class HiveVersion {
  def fullVersion: String
  def extraDeps: Seq[String]
  def exclusions: Seq[String]
}

// Function wrapper for Hive UDFs
case class HiveFunctionWrapper(
  className: String,
  instance: AnyRef
)

// Table partition specification
type TablePartitionSpec = Map[String, String]

// Catalog types (from Spark SQL)
case class CatalogTable(
  identifier: TableIdentifier,
  tableType: CatalogTableType,
  storage: CatalogStorageFormat,
  schema: StructType,
  partitionColumnNames: Seq[String] = Seq.empty,
  bucketSpec: Option[BucketSpec] = None
)

case class CatalogTablePartition(
  spec: TablePartitionSpec,
  storage: CatalogStorageFormat,
  parameters: Map[String, String] = Map.empty
)

case class CatalogDatabase(
  name: String,
  description: String,
  locationUri: String,
  properties: Map[String, String]
)

case class CatalogFunction(
  identifier: FunctionIdentifier,
  className: String,
  resources: Seq[FunctionResource]
)