tessl install tessl/maven-org-apache-spark--spark-hive@1.6.0Apache Spark SQL Hive integration module providing HiveContext, metastore operations, HiveQL parsing, and Hive data format compatibility
Apache Spark SQL Hive integration module provides comprehensive compatibility with Apache Hive, enabling Spark applications to seamlessly access and query Hive tables, metadata, and data formats. This module implements HiveContext as an extension to SQLContext, offering full metastore operations, HiveQL parsing, SerDe support, and advanced Hive ecosystem integration.
import org.apache.spark.sql.hive.HiveContext
import org.apache.spark.sql.hive.client._
import org.apache.spark.sql.hive.execution._
import org.apache.spark.sql.hive.HiveQl
import org.apache.spark.sql.hive.HiveInspectors
// For Java applications
import org.apache.spark.api.java.JavaSparkContext
// Client types
import org.apache.spark.sql.hive.client.{HiveTable, HiveColumn, HiveDatabase}
import org.apache.spark.sql.hive.client.{ExternalTable, ManagedTable, VirtualView, IndexTable}
// Function support
import org.apache.spark.sql.hive.HiveFunctionRegistryimport org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.hive.HiveContext
// Create Spark context and HiveContext
val conf = new SparkConf().setAppName("HiveApp")
val sc = new SparkContext(conf)
val hiveContext = new HiveContext(sc)
// Execute HiveQL queries
val result = hiveContext.sql("SELECT * FROM my_hive_table")
result.show()
// Access Hive metastore
hiveContext.refreshTable("my_database.my_table")
// Create temporary views from Hive tables
hiveContext.sql("CREATE TEMPORARY VIEW temp_view AS SELECT * FROM hive_table WHERE year = 2023")The Spark Hive integration is built around several key components:
Primary entry point providing full Hive integration capabilities including context management, configuration, and session handling.
class HiveContext(sc: SparkContext) extends SQLContext
// Constructors
def this(sc: SparkContext): HiveContext
def this(sc: JavaSparkContext): HiveContext
// Core methods
def newSession(): HiveContext
def refreshTable(tableName: String): Unit
def analyze(tableName: String): UnitAbstracted interface for interacting with different versions of Hive metastore, providing database and table operations with version compatibility.
trait ClientInterface {
def version: HiveVersion
def currentDatabase: String
def getTable(dbName: String, tableName: String): HiveTable
def createTable(table: HiveTable): Unit
def runSqlHive(sql: String): Seq[String]
}
case class HiveTable(
specifiedDatabase: Option[String],
name: String,
schema: Seq[HiveColumn],
partitionColumns: Seq[HiveColumn],
properties: Map[String, String],
tableType: TableType
)Converts HiveQL syntax to Catalyst logical plans, supporting full HiveQL language features including DDL, DML, and complex expressions.
object HiveQl {
def parseSql(sql: String): LogicalPlan
}
case class CreateTableAsSelect(
tableDesc: HiveTable,
child: LogicalPlan,
allowExisting: Boolean
) extends LogicalPlanPhysical operators for Hive-specific operations including table scans, data insertion, script transformations, and native command execution.
case class HiveTableScan(
attributes: Seq[Attribute],
relation: MetastoreRelation,
partitionPruningPred: Option[Expression]
) extends LeafNode
case class InsertIntoHiveTable(
table: MetastoreRelation,
partition: Map[String, Option[String]],
child: SparkPlan,
overwrite: Boolean
) extends UnaryNodeNative ORC file format integration with predicate pushdown, column pruning, and schema evolution support.
class DefaultSource extends HadoopFsRelationProvider with DataSourceRegister {
def shortName(): String = "orc"
}
case class OrcRelation(
location: String,
parameters: Map[String, String]
)(sqlContext: SQLContext) extends HadoopFsRelationBidirectional type conversion utilities between Spark SQL Catalyst types and Hive ObjectInspectors for seamless data exchange.
trait HiveInspectors {
def toInspector(dataType: DataType): ObjectInspector
def unwrap(data: Any, oi: ObjectInspector): Any
def wrap(a: Any, oi: ObjectInspector): AnyRef
}Support for User-Defined Functions (UDFs), User-Defined Aggregate Functions (UDAFs), and User-Defined Table Functions (UDTFs) with comprehensive Hive compatibility.
class HiveFunctionRegistry(underlying: FunctionRegistry, executionHive: ClientWrapper)
extends FunctionRegistry with HiveInspectors {
def getFunctionInfo(name: String): FunctionInfo
def lookupFunction(name: String, children: Seq[Expression]): Expression
}Key configuration properties for Hive integration:
// Metastore configuration
val HIVE_METASTORE_VERSION: SQLConfEntry[String]
val HIVE_METASTORE_JARS: SQLConfEntry[String]
// Format conversion settings
val CONVERT_METASTORE_PARQUET: SQLConfEntry[Boolean]
val CONVERT_CTAS: SQLConfEntry[Boolean]
// Class loading configuration
val HIVE_METASTORE_SHARED_PREFIXES: SQLConfEntry[Seq[String]]
val HIVE_METASTORE_BARRIER_PREFIXES: SQLConfEntry[Seq[String]]spark.sql.hive.metastore.version - Hive metastore version (default: "1.2.1")spark.sql.hive.metastore.jars - Hive JAR location ("builtin", "maven", or path)spark.sql.hive.convertMetastoreParquet - Convert Parquet tables (default: true)spark.sql.hive.convertCTAS - Convert CTAS statements (default: false)spark.sql.hive.thriftServer.async - Async thrift server (default: true)case class HiveDatabase(
name: String,
location: String
)
case class HiveColumn(
name: String,
hiveType: String,
comment: String
)
case class HiveStorageDescriptor(
location: String,
inputFormat: String,
outputFormat: String,
serde: String,
serdeProperties: Map[String, String]
)
case class HivePartition(
values: Seq[String],
storage: HiveStorageDescriptor
)
abstract class TableType { val name: String }
case object ExternalTable extends TableType { val name = "EXTERNAL_TABLE" }
case object ManagedTable extends TableType { val name = "MANAGED_TABLE" }
case object VirtualView extends TableType { val name = "VIRTUAL_VIEW" }abstract class HiveVersion(
fullVersion: String,
extraDeps: Seq[String],
exclusions: Seq[String]
)
// Supported versions: v12, v13, v14, v1_0, v1_1, v1_2case class AnalyzeTable(tableName: String) extends RunnableCommand
case class DropTable(tableName: String, ifExists: Boolean) extends RunnableCommand
case class AddJar(path: String) extends RunnableCommand
case class CreateMetastoreDataSource(...) extends RunnableCommand