or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

configuration.mddata-conversion.mdexternal-catalog.mdhive-client.mdindex.mdudf-support.md
tile.json

configuration.mddocs/

Configuration Management

Comprehensive configuration system for Hive integration with metastore connection settings, format conversions, and JAR management. The HiveUtils object provides centralized configuration management and client factory methods.

Capabilities

HiveUtils Object

Central configuration and utility object for Hive integration.

/**
 * Central configuration and utility object for Hive integration
 * Provides configuration constants and client factory methods
 */
object HiveUtils {
  /**
   * Built-in Hive version used by Spark
   */
  val builtinHiveVersion: String
}

Core Configuration Entries

Essential configuration options for Hive integration.

/**
 * Built-in Hive version configuration
 */
val BUILTIN_HIVE_VERSION: ConfigEntry[String]

/**
 * Hive metastore version to use
 * Default: matches builtin version
 */
val HIVE_METASTORE_VERSION: ConfigEntry[String]

/**
 * Hive metastore JAR location strategy
 * Options: "builtin", "maven", or custom path
 */
val HIVE_METASTORE_JARS: ConfigEntry[String]

/**
 * Custom paths for Hive metastore JARs
 * Used when HIVE_METASTORE_JARS is set to custom paths
 */
val HIVE_METASTORE_JARS_PATH: ConfigEntry[Seq[String]]

Usage Example:

import org.apache.spark.SparkConf

val conf = new SparkConf()

// Use built-in Hive JARs
conf.set(HiveUtils.HIVE_METASTORE_JARS.key, "builtin")

// Or use Maven to download specific version
conf.set(HiveUtils.HIVE_METASTORE_JARS.key, "maven")
conf.set(HiveUtils.HIVE_METASTORE_VERSION.key, "2.3.10")

// Or specify custom JAR paths
conf.set(HiveUtils.HIVE_METASTORE_JARS.key, "path")
conf.set(HiveUtils.HIVE_METASTORE_JARS_PATH.key, "/path/to/hive/lib/*")

Format Conversion Configuration

Control automatic conversion between Hive and Spark native formats.

/**
 * Enable automatic conversion of Hive SerDe Parquet tables to Spark native format
 * Default: true
 */
val CONVERT_METASTORE_PARQUET: ConfigEntry[Boolean]

/**
 * Enable schema merging when converting Parquet tables
 * Default: false
 */
val CONVERT_METASTORE_PARQUET_WITH_SCHEMA_MERGING: ConfigEntry[Boolean]

/**
 * Enable automatic conversion of Hive SerDe ORC tables to Spark native format
 * Default: true
 */
val CONVERT_METASTORE_ORC: ConfigEntry[Boolean]

/**
 * Enable conversion for INSERT operations on partitioned tables
 * Default: true
 */
val CONVERT_INSERTING_PARTITIONED_TABLE: ConfigEntry[Boolean]

/**
 * Enable conversion for INSERT operations on unpartitioned tables
 * Default: true
 */
val CONVERT_INSERTING_UNPARTITIONED_TABLE: ConfigEntry[Boolean]

/**
 * Enable conversion for CREATE TABLE AS SELECT operations
 * Default: true
 */
val CONVERT_METASTORE_CTAS: ConfigEntry[Boolean]

/**
 * Enable conversion for INSERT DIRECTORY operations
 * Default: true
 */
val CONVERT_METASTORE_INSERT_DIR: ConfigEntry[Boolean]

Usage Example:

val conf = new SparkConf()

// Disable Parquet conversion for compatibility
conf.set(HiveUtils.CONVERT_METASTORE_PARQUET.key, "false")

// Enable schema merging for Parquet
conf.set(HiveUtils.CONVERT_METASTORE_PARQUET_WITH_SCHEMA_MERGING.key, "true")

// Disable ORC conversion
conf.set(HiveUtils.CONVERT_METASTORE_ORC.key, "false")

// Fine-tune insertion behavior
conf.set(HiveUtils.CONVERT_INSERTING_PARTITIONED_TABLE.key, "false")
conf.set(HiveUtils.CONVERT_INSERTING_UNPARTITIONED_TABLE.key, "true")

Classloader Configuration

Control classloader isolation for Hive integration.

/**
 * Class prefixes that should be shared between Spark and Hive classloaders
 * Default: Java standard library, Hadoop, Hive API classes
 */
val HIVE_METASTORE_SHARED_PREFIXES: ConfigEntry[Seq[String]]

/**
 * Class prefixes that should be isolated in Hive classloader
 * Default: Hive implementation classes
 */
val HIVE_METASTORE_BARRIER_PREFIXES: ConfigEntry[Seq[String]]

Usage Example:

val conf = new SparkConf()

// Add custom shared prefixes
val sharedPrefixes = Seq(
  "java.",
  "javax.",
  "org.apache.hadoop.",
  "com.mycompany.shared."
)
conf.set(HiveUtils.HIVE_METASTORE_SHARED_PREFIXES.key, sharedPrefixes.mkString(","))

// Add custom barrier prefixes
val barrierPrefixes = Seq(
  "org.apache.hive.",
  "com.mycompany.hive."
)
conf.set(HiveUtils.HIVE_METASTORE_BARRIER_PREFIXES.key, barrierPrefixes.mkString(","))

Thrift Server Configuration

Configuration for Hive Thrift Server integration.

/**
 * Enable async processing in Hive Thrift Server
 * Default: true
 */
val HIVE_THRIFT_SERVER_ASYNC: ConfigEntry[Boolean]

Usage Example:

val conf = new SparkConf()

// Disable async processing for debugging
conf.set(HiveUtils.HIVE_THRIFT_SERVER_ASYNC.key, "false")

Client Factory Methods

Create Hive clients for different use cases.

/**
 * Create Hive client for SQL execution context
 * @param conf Spark configuration
 * @param hadoopConf Hadoop configuration
 * @return HiveClientImpl instance for execution
 */
def newClientForExecution(conf: SparkConf, hadoopConf: Configuration): HiveClientImpl

/**
 * Create Hive client for metadata operations
 * @param conf Spark configuration
 * @param hadoopConf Hadoop configuration
 * @param configurations Additional Hive configurations
 * @return HiveClient instance for metadata operations
 */
def newClientForMetadata(
  conf: SparkConf, 
  hadoopConf: Configuration,
  configurations: Map[String, String]
): HiveClient

Usage Example:

import org.apache.spark.SparkConf
import org.apache.hadoop.conf.Configuration

val sparkConf = new SparkConf()
val hadoopConf = new Configuration()

// Client for executing Hive SQL
val executionClient = HiveUtils.newClientForExecution(sparkConf, hadoopConf)
val results = executionClient.runSqlHive("SHOW DATABASES")

// Client for metadata operations with custom settings
val metadataConf = Map(
  "hive.metastore.uris" -> "thrift://metastore:9083",
  "hive.metastore.connect.retries" -> "3"
)
val metadataClient = HiveUtils.newClientForMetadata(sparkConf, hadoopConf, metadataConf)
val databases = metadataClient.listDatabases("*")

Utility Methods

Helper methods for common operations.

/**
 * Check if using CLI session state
 * @return True if CLI session state is active
 */
def isCliSessionState(): Boolean

/**
 * Create temporary configuration for testing
 * @param useInMemoryDerby Whether to use in-memory Derby database
 * @return Map of temporary configuration settings
 */
def newTemporaryConfiguration(useInMemoryDerby: Boolean): Map[String, String]

/**
 * Infer schema for a Hive table
 * @param table Catalog table definition
 * @return Table with inferred schema
 */
def inferSchema(table: CatalogTable): CatalogTable

/**
 * Parse partition name into values
 * @param name Partition name (e.g., "year=2023/month=01")
 * @return Array of partition values
 */
def partitionNameToValues(name: String): Array[String]

Usage Examples:

// Check session state
if (HiveUtils.isCliSessionState()) {
  println("Running in CLI mode")
}

// Create temporary configuration for testing
val tempConfig = HiveUtils.newTemporaryConfiguration(useInMemoryDerby = true)
tempConfig.foreach { case (key, value) =>
  println(s"$key = $value")
}

// Infer schema for table
val tableWithInferredSchema = HiveUtils.inferSchema(catalogTable)
println(s"Inferred schema: ${tableWithInferredSchema.schema}")

// Parse partition values
val partitionValues = HiveUtils.partitionNameToValues("year=2023/month=01/day=15")
// Result: Array("2023", "01", "15")

Configuration Best Practices

Recommended configuration patterns for different scenarios.

// Production configuration
def productionHiveConfig(): SparkConf = {
  new SparkConf()
    .set(HiveUtils.HIVE_METASTORE_JARS.key, "maven")
    .set(HiveUtils.HIVE_METASTORE_VERSION.key, "2.3.10")
    .set(HiveUtils.CONVERT_METASTORE_PARQUET.key, "true")
    .set(HiveUtils.CONVERT_METASTORE_ORC.key, "true")
    .set(HiveUtils.HIVE_THRIFT_SERVER_ASYNC.key, "true")
}

// Development configuration with custom JARs
def developmentHiveConfig(hivePath: String): SparkConf = {
  new SparkConf()
    .set(HiveUtils.HIVE_METASTORE_JARS.key, "path")
    .set(HiveUtils.HIVE_METASTORE_JARS_PATH.key, s"$hivePath/lib/*")
    .set(HiveUtils.CONVERT_METASTORE_PARQUET.key, "false") // For debugging
    .set(HiveUtils.CONVERT_METASTORE_ORC.key, "false")
}

// Testing configuration with in-memory database
def testingHiveConfig(): SparkConf = {
  val tempConfig = HiveUtils.newTemporaryConfiguration(useInMemoryDerby = true)
  val conf = new SparkConf()
  
  tempConfig.foreach { case (key, value) =>
    conf.set(key, value)
  }
  
  conf.set(HiveUtils.HIVE_METASTORE_JARS.key, "builtin")
}

Usage Example:

import org.apache.spark.sql.SparkSession

// Production setup
val prodConf = productionHiveConfig()
val prodSpark = SparkSession.builder()
  .config(prodConf)
  .enableHiveSupport()
  .getOrCreate()

// Development setup
val devConf = developmentHiveConfig("/opt/hive")
val devSpark = SparkSession.builder()
  .config(devConf)
  .enableHiveSupport()
  .getOrCreate()

// Testing setup
val testConf = testingHiveConfig()
val testSpark = SparkSession.builder()
  .config(testConf)
  .enableHiveSupport()
  .getOrCreate()

Advanced Configuration

Advanced configuration scenarios and troubleshooting.

// Handle version conflicts
def resolveHiveVersionConflict(
  sparkConf: SparkConf,
  targetHiveVersion: String
): SparkConf = {
  
  sparkConf
    .set(HiveUtils.HIVE_METASTORE_VERSION.key, targetHiveVersion)
    .set(HiveUtils.HIVE_METASTORE_JARS.key, "maven")
    // Add version-specific exclusions
    .set("spark.sql.hive.metastore.jars.scope", "compile")
}

// Custom SerDe configuration
def configureCustomSerDe(
  sparkConf: SparkConf,
  serDeJars: Seq[String]
): SparkConf = {
  
  val currentJars = sparkConf.get(HiveUtils.HIVE_METASTORE_JARS_PATH.key, Seq.empty)
  val allJars = currentJars ++ serDeJars
  
  sparkConf
    .set(HiveUtils.HIVE_METASTORE_JARS.key, "path")
    .set(HiveUtils.HIVE_METASTORE_JARS_PATH.key, allJars.mkString(","))
}

Usage Example:

// Resolve version conflicts
val conflictResolvedConf = resolveHiveVersionConflict(sparkConf, "3.1.3")

// Add custom SerDe JARs
val customSerDeJars = Seq(
  "/path/to/custom-serde.jar",
  "/path/to/another-serde.jar"
)
val serDeConf = configureCustomSerDe(sparkConf, customSerDeJars)

// Create SparkSession with resolved configuration
val spark = SparkSession.builder()
  .config(serDeConf)
  .enableHiveSupport()
  .getOrCreate()

Monitoring and Debugging

Configuration for monitoring Hive integration.

// Enable debug logging for Hive operations
def enableHiveDebugLogging(sparkConf: SparkConf): SparkConf = {
  sparkConf
    .set("spark.sql.debug.maxToStringFields", "1000")
    .set("spark.sql.adaptive.enabled", "false") // For consistent debugging
}

// Monitor metastore connection health
def checkMetastoreHealth(client: HiveClient): Boolean = {
  try {
    client.listDatabases("*")
    true
  } catch {
    case _: Exception => false
  }
}

Usage Example:

// Enable debugging
val debugConf = enableHiveDebugLogging(sparkConf)

val spark = SparkSession.builder()
  .config(debugConf)
  .enableHiveSupport()
  .getOrCreate()

// Check metastore health
val client = HiveUtils.newClientForMetadata(sparkConf, hadoopConf, Map.empty)
if (checkMetastoreHealth(client)) {
  println("Metastore connection is healthy")
} else {
  println("Metastore connection failed")
}