or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

configuration-utilities.mdfile-formats.mdhive-client.mdindex.mdmetastore-integration.mdquery-execution.mdsession-configuration.mdudf-support.md
tile.json

metastore-integration.mddocs/

Metastore Integration

Integration with Hive metastore for table metadata, database operations, and catalog management.

Core Imports

import org.apache.spark.sql.hive.HiveExternalCatalog
import org.apache.spark.sql.hive.client.{HiveClient, HiveClientImpl, HiveVersion}
import org.apache.spark.sql.hive.HiveMetastoreCatalog
import org.apache.spark.sql.catalyst.catalog._
import org.apache.spark.sql.execution.datasources.LogicalRelation

Capabilities

External Catalog Operations

Access to Hive metastore through the external catalog interface.

class HiveExternalCatalog(
  conf: SparkConf,
  hadoopConf: Configuration
) extends ExternalCatalog {
  
  /** Create database in Hive metastore */
  override def createDatabase(
    dbDefinition: CatalogDatabase,
    ignoreIfExists: Boolean
  ): Unit
  
  /** Drop database from Hive metastore */
  override def dropDatabase(
    db: String,
    ignoreIfNotExists: Boolean,
    cascade: Boolean
  ): Unit
  
  /** Create table in Hive metastore */
  override def createTable(
    tableDefinition: CatalogTable,
    ignoreIfExists: Boolean
  ): Unit
  
  /** Drop table from Hive metastore */
  override def dropTable(
    db: String,
    table: String,
    ignoreIfNotExists: Boolean,
    purge: Boolean
  ): Unit
  
  /** Get table metadata from Hive metastore */
  override def getTable(db: String, table: String): CatalogTable
  
  /** List all tables in database */
  override def listTables(db: String, pattern: Option[String]): Seq[String]
}

Hive Client Interface

Low-level interface for Hive metastore client operations.

private[hive] trait HiveClient {
  /** Get Hive version information */
  def version: HiveVersion
  
  /** Get database metadata */
  def getDatabase(name: String): CatalogDatabase
  
  /** List all databases */
  def listDatabases(pattern: String): Seq[String]
  
  /** Get table metadata with detailed schema information */
  def getTable(dbName: String, tableName: String): CatalogTable
  
  /** List tables in database matching pattern */
  def listTables(dbName: String, pattern: String): Seq[String]
  
  /** Create new table in metastore */
  def createTable(table: CatalogTable, ignoreIfExists: Boolean): Unit
  
  /** Drop table from metastore */
  def dropTable(
    dbName: String,
    tableName: String,
    ignoreIfNotExists: Boolean,
    purge: Boolean
  ): Unit
  
  /** Get partitions for partitioned table */
  def getPartitions(
    table: CatalogTable,
    spec: Option[TablePartitionSpec]
  ): Seq[CatalogTablePartition]
  
  /** Create partition in partitioned table */
  def createPartitions(
    table: CatalogTable,
    parts: Seq[CatalogTablePartition],
    ignoreIfExists: Boolean
  ): Unit
}

Metastore Catalog Conversion

Converts Hive table relations for optimized Spark processing.

class HiveMetastoreCatalog(sparkSession: SparkSession) {
  /**
   * Convert Hive table relation to optimized Spark relation
   * @param relation Hive table relation to convert
   * @param isWrite Whether this conversion is for write operations
   * @returns Converted logical relation
   */
  def convert(relation: HiveTableRelation, isWrite: Boolean): LogicalRelation
  
  /**
   * Convert storage format for data source operations
   * @param storage Catalog storage format to convert
   * @returns Converted storage format
   */
  def convertStorageFormat(storage: CatalogStorageFormat): CatalogStorageFormat
}

Configuration and Connection

Client Implementation

private[hive] class HiveClientImpl(
  version: HiveVersion,
  sparkConf: SparkConf,
  hadoopConf: Configuration,
  extraClassPath: Seq[URL],
  classLoader: ClassLoader,
  config: Map[String, String]
) extends HiveClient {
  
  /** Initialize connection to Hive metastore */
  def initialize(): Unit
  
  /** Close connection to Hive metastore */
  def close(): Unit
  
  /** Execute raw Hive metastore thrift call */
  def runSqlHive(sql: String): Seq[String]
}

Isolated Client Loader

Manages class loading isolation for different Hive versions.

private[hive] class IsolatedClientLoader(
  version: HiveVersion,
  sparkConf: SparkConf,
  execJars: Seq[URL],
  hadoopConf: Configuration,
  config: Map[String, String]
) extends Logging {
  
  /** Create isolated Hive client instance */
  def createClient(): HiveClient
  
  /** Get underlying class loader for this Hive version */
  def classLoader: ClassLoader
}

Usage Examples

Basic Metastore Operations

import org.apache.spark.sql.SparkSession

val spark = SparkSession.builder()
  .enableHiveSupport()
  .getOrCreate()

// List databases
spark.sql("SHOW DATABASES").show()

// Create database
spark.sql("CREATE DATABASE IF NOT EXISTS my_database")

// Use database
spark.sql("USE my_database")

// Create external table pointing to Hive data
spark.sql("""
  CREATE TABLE IF NOT EXISTS users (
    id INT,
    name STRING,
    age INT
  )
  USING HIVE
  LOCATION '/user/hive/warehouse/users'
""")

// Access table metadata
val tableMetadata = spark.catalog.getTable("my_database", "users")
println(s"Table location: ${tableMetadata}")

Working with Partitioned Tables

// Create partitioned table
spark.sql("""
  CREATE TABLE IF NOT EXISTS sales (
    product_id INT,
    quantity INT,
    revenue DOUBLE
  )
  PARTITIONED BY (year INT, month INT)
  USING HIVE
""")

// Add partition
spark.sql("""
  ALTER TABLE sales 
  ADD PARTITION (year=2023, month=12)
  LOCATION '/user/hive/warehouse/sales/year=2023/month=12'
""")

// Query specific partition
val decemberSales = spark.sql("""
  SELECT * FROM sales 
  WHERE year = 2023 AND month = 12
""")

Error Handling

Common metastore integration exceptions:

  • MetaException: General Hive metastore errors
  • NoSuchObjectException: When database or table doesn't exist
  • AlreadyExistsException: When trying to create existing database/table
  • InvalidOperationException: For invalid metastore operations
import org.apache.hadoop.hive.metastore.api.MetaException

try {
  spark.sql("DROP TABLE non_existent_table")
} catch {
  case e: AnalysisException if e.getMessage.contains("Table or view not found") =>
    println("Table does not exist")
  case e: MetaException =>
    println(s"Metastore error: ${e.getMessage}")
}

Types

Hive Version Support

case class HiveVersion(
  fullVersion: String,
  majorVersion: Int,
  minorVersion: Int
) {
  def supportsFeature(feature: String): Boolean
}

Table and Database Types

case class CatalogDatabase(
  name: String,
  description: String,
  locationUri: String,
  properties: Map[String, String]
)

case class CatalogTable(
  identifier: TableIdentifier,
  tableType: CatalogTableType,
  storage: CatalogStorageFormat,
  schema: StructType,
  provider: Option[String],
  partitionColumnNames: Seq[String],
  bucketSpec: Option[BucketSpec],
  properties: Map[String, String]
)

case class CatalogTablePartition(
  spec: TablePartitionSpec,
  storage: CatalogStorageFormat,
  parameters: Map[String, String]
) {
  def location: Option[URI]
  def toRow: InternalRow
}

case class CatalogFunction(
  identifier: FunctionIdentifier,  
  className: String,
  resources: Seq[FunctionResource]
)