Comprehensive configuration constants and utilities for Hive integration behavior, metastore connection, and performance optimization.
import org.apache.spark.sql.hive.HiveUtils
import org.apache.spark.internal.config.ConfigEntry
import org.apache.spark.sql.internal.SQLConfCore configuration constants for controlling Hive integration behavior.
object HiveUtils {
/**
* Version of Hive metastore to connect to
* Default: Built-in Hive version bundled with Spark
*/
val HIVE_METASTORE_VERSION: ConfigEntry[String]
/**
* Strategy for loading Hive metastore jars
* Options: "builtin", "maven", "path"
* Default: "builtin"
*/
val HIVE_METASTORE_JARS: ConfigEntry[String]
/**
* Classpath locations when using "path" strategy for metastore jars
*/
val HIVE_METASTORE_JARS_PATH: ConfigEntry[Seq[String]]
/**
* Class prefixes shared between Spark and Hive metastore classloaders
*/
val HIVE_METASTORE_SHARED_PREFIXES: ConfigEntry[Seq[String]]
/**
* Class prefixes isolated in Hive metastore classloader
*/
val HIVE_METASTORE_BARRIER_PREFIXES: ConfigEntry[Seq[String]]
}Control automatic conversion of Hive tables to Spark-native formats for better performance.
object HiveUtils {
/**
* Convert Hive Parquet tables to use Spark's native Parquet reader
* Provides better performance and feature support
* Default: true
*/
val CONVERT_METASTORE_PARQUET: ConfigEntry[Boolean]
/**
* Convert Hive ORC tables to use Spark's native ORC reader
* Enables vectorized reading and better performance
* Default: true
*/
val CONVERT_METASTORE_ORC: ConfigEntry[Boolean]
/**
* Convert CREATE TABLE AS SELECT operations to use data source API
* Improves performance for table creation from queries
* Default: true
*/
val CONVERT_METASTORE_CTAS: ConfigEntry[Boolean]
/**
* Convert partitioned table inserts to use data source API
* Enables better partition handling and performance
* Default: true
*/
val CONVERT_INSERTING_PARTITIONED_TABLE: ConfigEntry[Boolean]
/**
* Convert INSERT DIRECTORY operations to use data source API
* Default: true
*/
val CONVERT_METASTORE_INSERT_DIR: ConfigEntry[Boolean]
}Configuration for Hive Thrift Server integration.
object HiveUtils {
/**
* Enable asynchronous execution in Hive Thrift Server
* Improves concurrency and resource utilization
* Default: true
*/
val HIVE_THRIFT_SERVER_ASYNC: ConfigEntry[Boolean]
/**
* Authentication type for Hive Thrift Server
* Options: "NONE", "LDAP", "KERBEROS", "CUSTOM"
*/
val HIVE_THRIFT_SERVER_AUTHENTICATION: ConfigEntry[String]
/**
* Principal for Kerberos authentication
*/
val HIVE_THRIFT_SERVER_KERBEROS_PRINCIPAL: ConfigEntry[String]
/**
* Keytab file location for Kerberos authentication
*/
val HIVE_THRIFT_SERVER_KERBEROS_KEYTAB: ConfigEntry[String]
}Utilities for configuring Hive client connections and behavior.
object HiveUtils {
/**
* Create Hive client configuration from Spark configuration
* @param sparkConf Spark configuration
* @returns Map of Hive client configuration properties
*/
def formatHiveConfigs(sparkConf: SparkConf): Map[String, String]
/**
* Get default warehouse directory for Hive tables
* @param conf Spark configuration
* @returns Warehouse directory path
*/
def hiveWarehouseLocation(conf: SparkConf): String
/**
* Check if Hive support is available and properly configured
* @param sparkConf Spark configuration
* @returns true if Hive support can be enabled
*/
def isHiveAvailable(sparkConf: SparkConf): Boolean
/**
* Get Hive version from configuration or detect from classpath
* @param sparkConf Spark configuration
* @returns HiveVersion instance
*/
def hiveVersion(sparkConf: SparkConf): HiveVersion
/**
* Create isolated classloader for Hive client
* @param version Hive version to load
* @param sparkConf Spark configuration
* @param hadoopConf Hadoop configuration
* @returns Isolated classloader for Hive
*/
def createHiveClassLoader(
version: HiveVersion,
sparkConf: SparkConf,
hadoopConf: Configuration
): ClassLoader
}Utilities for managing Hive metastore connections and lifecycle.
object HiveUtils {
/**
* Initialize Hive metastore client with proper configuration
* @param sparkConf Spark configuration
* @param hadoopConf Hadoop configuration
* @returns Configured metastore client
*/
def newClientForMetadata(
sparkConf: SparkConf,
hadoopConf: Configuration
): HiveClient
/**
* Initialize Hive client for execution with isolation
* @param sparkConf Spark configuration
* @param hadoopConf Hadoop configuration
* @returns Isolated Hive client for execution
*/
def newClientForExecution(
sparkConf: SparkConf,
hadoopConf: Configuration
): HiveClient
/**
* Close Hive client and clean up resources
* @param client Hive client to close
*/
def closeHiveClient(client: HiveClient): Unit
}import org.apache.spark.sql.SparkSession
val spark = SparkSession.builder()
.appName("Hive Configuration Example")
.config("spark.sql.catalogImplementation", "hive")
// Metastore version configuration
.config("spark.sql.hive.metastore.version", "2.3.9")
.config("spark.sql.hive.metastore.jars", "builtin")
// Performance optimizations
.config("spark.sql.hive.convertMetastoreParquet", "true")
.config("spark.sql.hive.convertMetastoreOrc", "true")
.config("spark.sql.hive.convertInsertingPartitionedTable", "true")
.enableHiveSupport()
.getOrCreate()// Connect to external Hive metastore
val spark = SparkSession.builder()
.appName("External Hive Metastore")
.config("spark.sql.catalogImplementation", "hive")
// External metastore configuration
.config("spark.sql.hive.metastore.version", "3.1.2")
.config("spark.sql.hive.metastore.jars", "maven")
.config("hive.metastore.uris", "thrift://metastore-host:9083")
// Security configuration
.config("hive.metastore.sasl.enabled", "true")
.config("hive.metastore.kerberos.principal", "hive/_HOST@REALM.COM")
.enableHiveSupport()
.getOrCreate()// Use custom Hive jars from specific path
val customHiveJars = Seq(
"/opt/hive/lib/hive-metastore-3.1.2.jar",
"/opt/hive/lib/hive-exec-3.1.2.jar",
"/opt/hive/lib/hive-common-3.1.2.jar"
)
val spark = SparkSession.builder()
.appName("Custom Hive Jars")
.config("spark.sql.catalogImplementation", "hive")
.config("spark.sql.hive.metastore.version", "3.1.2")
.config("spark.sql.hive.metastore.jars", "path")
.config("spark.sql.hive.metastore.jars.path", customHiveJars.mkString(","))
.enableHiveSupport()
.getOrCreate()// Optimize for performance with native format conversion
spark.conf.set("spark.sql.hive.convertMetastoreParquet", "true")
spark.conf.set("spark.sql.hive.convertMetastoreOrc", "true")
spark.conf.set("spark.sql.hive.convertInsertingPartitionedTable", "true")
// ORC-specific optimizations
spark.conf.set("spark.sql.orc.impl", "native")
spark.conf.set("spark.sql.orc.enableVectorizedReader", "true")
spark.conf.set("spark.sql.orc.columnarReaderBatchSize", "4096")
// Parquet optimizations
spark.conf.set("spark.sql.parquet.enableVectorizedReader", "true")
spark.conf.set("spark.sql.parquet.columnarReaderBatchSize", "4096")// Configure Hive Thrift Server with authentication
val spark = SparkSession.builder()
.appName("Hive Thrift Server")
.config("spark.sql.catalogImplementation", "hive")
// Thrift server settings
.config("spark.sql.hive.thriftServer.async", "true")
.config("hive.server2.authentication", "KERBEROS")
.config("hive.server2.authentication.kerberos.principal", "hive/_HOST@REALM.COM")
.config("hive.server2.authentication.kerberos.keytab", "/etc/hive/hive.keytab")
// Connection limits
.config("hive.server2.thrift.max.worker.threads", "500")
.config("hive.server2.session.check.interval", "60000")
.enableHiveSupport()
.getOrCreate()import org.apache.spark.sql.hive.HiveUtils
// Check if Hive is available
if (HiveUtils.isHiveAvailable(spark.sparkContext.getConf)) {
println("Hive support is available")
// Get current Hive version
val hiveVersion = HiveUtils.hiveVersion(spark.sparkContext.getConf)
println(s"Using Hive version: ${hiveVersion.fullVersion}")
// Get warehouse location
val warehouseLocation = HiveUtils.hiveWarehouseLocation(spark.sparkContext.getConf)
println(s"Hive warehouse: $warehouseLocation")
// Format Hive configurations
val hiveConfigs = HiveUtils.formatHiveConfigs(spark.sparkContext.getConf)
hiveConfigs.foreach { case (key, value) =>
println(s"$key = $value")
}
} else {
println("Hive support is not available")
}// Configure class loading for Hive integration
spark.conf.set("spark.sql.hive.metastore.sharedPrefixes",
"com.mysql.jdbc,org.postgresql,com.microsoft.sqlserver,oracle.jdbc")
spark.conf.set("spark.sql.hive.metastore.barrierPrefixes",
"org.apache.hive.service.rpc.thrift,org.apache.hadoop.hive.metastore.api")
// Custom class loading strategy
val customClassLoader = HiveUtils.createHiveClassLoader(
hiveVersion = HiveVersion("3.1.2", 3, 1),
sparkConf = spark.sparkContext.getConf,
hadoopConf = spark.sparkContext.hadoopConfiguration
)Common configuration-related errors and their solutions:
import org.apache.spark.sql.AnalysisException
try {
val spark = SparkSession.builder()
.config("spark.sql.hive.metastore.version", "invalid-version")
.enableHiveSupport()
.getOrCreate()
} catch {
case e: IllegalArgumentException if e.getMessage.contains("Unsupported Hive version") =>
println("Invalid Hive version specified")
case e: ClassNotFoundException =>
println("Hive jars not found in classpath")
case e: AnalysisException if e.getMessage.contains("Hive support is required") =>
println("Hive support not properly configured")
}case class HiveClientConfig(
version: String,
jarsStrategy: String,
jarsPath: Seq[String],
sharedPrefixes: Seq[String],
barrierPrefixes: Seq[String]
)
case class MetastoreConfig(
uris: String,
principal: Option[String],
keytab: Option[String],
saslEnabled: Boolean
)
case class ThriftServerConfig(
async: Boolean,
authentication: String,
maxWorkerThreads: Int,
sessionCheckInterval: Long
)