Configuration options, utilities, and constants for customizing Hive integration behavior including metastore settings, file format conversion, and compatibility options.
Core configuration entries for Hive integration behavior.
object HiveUtils {
/** Built-in Hive version used by Spark */
val builtinHiveVersion: String = "1.2.1"
/** Hive metastore version configuration */
val HIVE_METASTORE_VERSION: ConfigEntry[String]
/** Deprecated Hive version configuration (use HIVE_METASTORE_VERSION instead) */
val FAKE_HIVE_VERSION: ConfigEntry[String]
/** Location of Hive metastore JARs */
val HIVE_METASTORE_JARS: ConfigEntry[String]
/** Enable automatic conversion of Hive Parquet tables */
val CONVERT_METASTORE_PARQUET: ConfigEntry[Boolean]
/** Enable schema merging for converted Parquet tables */
val CONVERT_METASTORE_PARQUET_WITH_SCHEMA_MERGING: ConfigEntry[Boolean]
/** Enable automatic conversion of Hive ORC tables */
val CONVERT_METASTORE_ORC: ConfigEntry[Boolean]
/** Enable conversion for CREATE TABLE AS SELECT operations */
val CONVERT_METASTORE_CTAS: ConfigEntry[Boolean]
/** Shared class prefixes between Spark and Hive */
val HIVE_METASTORE_SHARED_PREFIXES: ConfigEntry[String]
/** Class prefixes that create barriers between Spark and Hive */
val HIVE_METASTORE_BARRIER_PREFIXES: ConfigEntry[String]
/** Enable asynchronous Hive Thrift Server */
val HIVE_THRIFT_SERVER_ASYNC: ConfigEntry[Boolean]
}Basic Hive Configuration:
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.hive.HiveUtils
val spark = SparkSession.builder()
.appName("Hive Configuration Example")
.config("spark.sql.warehouse.dir", "/user/hive/warehouse")
.config(HiveUtils.HIVE_METASTORE_VERSION.key, "2.3.0")
.config(HiveUtils.HIVE_METASTORE_JARS.key, "path")
.enableHiveSupport()
.getOrCreate()Advanced Configuration:
val spark = SparkSession.builder()
.appName("Advanced Hive Config")
// Metastore configuration
.config(HiveUtils.HIVE_METASTORE_VERSION.key, "2.3.0")
.config(HiveUtils.HIVE_METASTORE_JARS.key, "/opt/hive/lib/*")
// File format conversion
.config(HiveUtils.CONVERT_METASTORE_PARQUET.key, "true")
.config(HiveUtils.CONVERT_METASTORE_ORC.key, "true")
.config(HiveUtils.CONVERT_METASTORE_CTAS.key, "true")
.config(HiveUtils.CONVERT_METASTORE_PARQUET_WITH_SCHEMA_MERGING.key, "false")
// Class loading configuration
.config(HiveUtils.HIVE_METASTORE_SHARED_PREFIXES.key,
"com.mysql.jdbc,org.postgresql,com.microsoft.sqlserver,oracle.jdbc")
.config(HiveUtils.HIVE_METASTORE_BARRIER_PREFIXES.key,
"javax.jdo,org.datanucleus")
// Thrift server configuration
.config(HiveUtils.HIVE_THRIFT_SERVER_ASYNC.key, "true")
.enableHiveSupport()
.getOrCreate()Utility methods for configuration management.
object HiveUtils {
/**
* Configure SparkContext with Hive external catalog support
* @param sc - SparkContext to configure
* @return Configured SparkContext with Hive catalog
*/
def withHiveExternalCatalog(sc: SparkContext): SparkContext
/**
* Check if using CLI session state
* @return true if CLI session state is active
*/
def isCliSessionState(): Boolean
/**
* Create temporary configuration for testing
* @param useInMemoryDerby - Whether to use in-memory Derby database
* @return Configuration map for temporary Hive setup
*/
def newTemporaryConfiguration(useInMemoryDerby: Boolean): Map[String, String]
/**
* Infer schema for Hive table from metastore information
* @param table - Catalog table to infer schema for
* @return Table with inferred schema
*/
def inferSchema(table: CatalogTable): CatalogTable
}Usage Examples:
// Configure SparkContext with Hive support
val sc = new SparkContext(conf)
val hiveEnabledSc = HiveUtils.withHiveExternalCatalog(sc)
// Create temporary configuration for testing
val tempConfig = HiveUtils.newTemporaryConfiguration(useInMemoryDerby = true)
tempConfig.foreach { case (key, value) =>
spark.conf.set(key, value)
}
// Check session state
if (HiveUtils.isCliSessionState()) {
println("Running in CLI mode")
}Detailed configuration options for Hive metastore connectivity.
Metastore Version Configuration:
// Specify Hive metastore version
.config("spark.sql.hive.metastore.version", "2.3.0")
// Supported versions: 0.12.0, 0.13.0, 0.14.0, 1.0.0, 1.1.0, 1.2.0, 2.0.0, 2.1.0, 2.2.0, 2.3.0Metastore JAR Configuration:
// Option 1: Use specific path
.config("spark.sql.hive.metastore.jars", "/opt/hive/lib/*")
// Option 2: Use builtin (default)
.config("spark.sql.hive.metastore.jars", "builtin")
// Option 3: Use Maven coordinates
.config("spark.sql.hive.metastore.jars", "maven")Database Connection Configuration:
// MySQL metastore
.config("javax.jdo.option.ConnectionURL",
"jdbc:mysql://localhost:3306/hive_metastore")
.config("javax.jdo.option.ConnectionDriverName",
"com.mysql.jdbc.Driver")
.config("javax.jdo.option.ConnectionUserName", "hive")
.config("javax.jdo.option.ConnectionPassword", "password")
// PostgreSQL metastore
.config("javax.jdo.option.ConnectionURL",
"jdbc:postgresql://localhost:5432/hive_metastore")
.config("javax.jdo.option.ConnectionDriverName",
"org.postgresql.Driver")Options for automatic conversion of Hive tables to optimized formats.
Parquet Conversion:
// Enable Parquet conversion (default: true)
.config("spark.sql.hive.convertMetastoreParquet", "true")
// Enable schema merging for Parquet (default: false)
.config("spark.sql.hive.convertMetastoreParquet.mergeSchema", "false")
// Example: Query automatically converts Hive Parquet table
spark.sql("SELECT * FROM hive_parquet_table").explain()
// Shows: HiveTableRelation converted to parquet formatORC Conversion:
// Enable ORC conversion (default: true)
.config("spark.sql.hive.convertMetastoreOrc", "true")
// Example: ORC table automatically uses Spark's native ORC reader
spark.sql("SELECT * FROM hive_orc_table").explain()
// Shows: Optimized ORC scanCTAS Conversion:
// Enable conversion for CREATE TABLE AS SELECT (default: true)
.config("spark.sql.hive.convertMetastoreCtas", "true")
// Example: CTAS creates optimized table format
spark.sql("""
CREATE TABLE optimized_table
USING PARQUET
AS SELECT * FROM source_table
""")Configuration for managing class loading between Spark and Hive.
Shared Prefixes:
// Classes shared between Spark and Hive classloaders
.config("spark.sql.hive.metastore.sharedPrefixes",
"com.mysql.jdbc,org.postgresql,com.microsoft.sqlserver,oracle.jdbc")Barrier Prefixes:
// Classes that should not be shared (create barriers)
.config("spark.sql.hive.metastore.barrierPrefixes",
"javax.jdo,org.datanucleus")Configuration class for Hive-specific data source options.
/**
* Configuration options for Hive data source operations
*/
class HiveOptions(parameters: Map[String, String]) {
/** File format specification (e.g., "textfile", "sequencefile") */
val fileFormat: Option[String]
/** Input format class name */
val inputFormat: Option[String]
/** Output format class name */
val outputFormat: Option[String]
/** SerDe class name */
val serde: Option[String]
/** Check if input/output formats are specified */
def hasInputOutputFormat: Boolean
/** Get SerDe properties */
def serdeProperties: Map[String, String]
}
object HiveOptions {
// Option key constants
val FILE_FORMAT = "fileFormat"
val INPUT_FORMAT = "inputFormat"
val OUTPUT_FORMAT = "outputFormat"
val SERDE = "serde"
// Delimiter option mappings
val delimiterOptions: Map[String, String]
/**
* Get compression configuration for Hive writes
*/
def getHiveWriteCompression(
sessionState: SessionState,
hadoopConf: Configuration,
compressionCodec: Option[String]
): Option[String]
}Usage Examples:
// Configure Hive data source options
val options = Map(
"fileFormat" -> "textfile",
"inputFormat" -> "org.apache.hadoop.mapred.TextInputFormat",
"outputFormat" -> "org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat",
"serde" -> "org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe",
"field.delim" -> "\t",
"line.delim" -> "\n"
)
val hiveOptions = new HiveOptions(options)
println(s"Using SerDe: ${hiveOptions.serde}")
println(s"SerDe properties: ${hiveOptions.serdeProperties}")Configuration options for optimizing Hive integration performance.
Execution Configuration:
// Enable vectorized ORC reader
.config("spark.sql.orc.impl", "hive")
.config("spark.sql.hive.convertMetastoreOrc", "true")
// Configure Hive execution engine
.config("spark.sql.hive.execution.engine", "spark")
// Memory configuration for Hive operations
.config("spark.sql.hive.filesourcePartitionFileCacheSize", "262144000")Metastore Performance:
// Connection pool settings
.config("datanucleus.connectionPool.maxPoolSize", "20")
.config("datanucleus.connectionPool.minPoolSize", "5")
// Cache settings
.config("datanucleus.cache.level2.type", "none")
.config("hive.metastore.cache.pinobjtypes", "Table,Database,Type,FieldSchema,Order")Configuration patterns for different deployment environments.
Development Configuration:
val devSpark = SparkSession.builder()
.appName("Development")
.master("local[*]")
.config("spark.sql.warehouse.dir", "/tmp/spark-warehouse")
.config(HiveUtils.HIVE_METASTORE_VERSION.key, "1.2.1")
.config(HiveUtils.HIVE_METASTORE_JARS.key, "builtin")
.enableHiveSupport()
.getOrCreate()Production Configuration:
val prodSpark = SparkSession.builder()
.appName("Production")
.config("spark.sql.warehouse.dir", "hdfs://cluster/user/hive/warehouse")
.config(HiveUtils.HIVE_METASTORE_VERSION.key, "2.3.0")
.config(HiveUtils.HIVE_METASTORE_JARS.key, "/opt/hive/lib/*")
.config("javax.jdo.option.ConnectionURL", "jdbc:mysql://metastore-db:3306/hive")
.config(HiveUtils.CONVERT_METASTORE_PARQUET.key, "true")
.config(HiveUtils.CONVERT_METASTORE_ORC.key, "true")
.enableHiveSupport()
.getOrCreate()Methods for validating and troubleshooting configuration:
// Check current configuration
val spark = SparkSession.builder().enableHiveSupport().getOrCreate()
// Display Hive-related configuration
spark.conf.getAll.filter(_._1.contains("hive")).foreach {
case (key, value) => println(s"$key = $value")
}
// Verify metastore connectivity
try {
spark.catalog.listDatabases().show()
println("Metastore connection successful")
} catch {
case e: Exception =>
println(s"Metastore connection failed: ${e.getMessage}")
}
// Check conversion settings
val parquetConversion = spark.conf.get("spark.sql.hive.convertMetastoreParquet")
val orcConversion = spark.conf.get("spark.sql.hive.convertMetastoreOrc")
println(s"Parquet conversion: $parquetConversion")
println(s"ORC conversion: $orcConversion")