or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

configuration-utilities.mdfile-formats.mdhive-client.mdindex.mdmetastore-integration.mdquery-execution.mdsession-configuration.mdudf-support.md
tile.json

hive-client.mddocs/

Hive Client Interface

Core interface for direct interaction with Hive metastore, providing low-level access to databases, tables, partitions, and functions.

Core Imports

import org.apache.spark.sql.hive.client.HiveClient
import org.apache.spark.sql.hive.client.HiveClientImpl
import org.apache.spark.sql.hive.client.HiveVersion
import org.apache.spark.sql.catalyst.catalog._

Capabilities

HiveClient Interface

Core abstraction for Hive metastore client operations with version compatibility.

/**
 * Interface for communicating with Hive metastore
 */
private[hive] trait HiveClient {
  /** Get Hive version information */
  def version: HiveVersion
  
  /**
   * Get configuration property from Hive
   * @param key Configuration key
   * @param defaultValue Default value if key not found
   * @returns Configuration value
   */
  def getConf(key: String, defaultValue: String): String
  
  /**
   * Execute raw HiveQL statement
   * @param sql HiveQL statement to execute
   * @returns Sequence of result strings
   */
  def runSqlHive(sql: String): Seq[String]
}

Database Operations

Complete database management operations through Hive metastore.

trait HiveClient {
  /**
   * Get database metadata
   * @param name Database name
   * @returns CatalogDatabase with complete metadata
   */
  def getDatabase(name: String): CatalogDatabase
  
  /**
   * List all databases matching pattern
   * @param pattern SQL LIKE pattern for database names
   * @returns Sequence of database names
   */
  def listDatabases(pattern: String): Seq[String]
  
  /**
   * Create new database in metastore
   * @param database Database definition to create
   * @param ignoreIfExists Skip creation if database exists
   */
  def createDatabase(database: CatalogDatabase, ignoreIfExists: Boolean): Unit
  
  /**
   * Drop database from metastore
   * @param name Database name to drop
   * @param ignoreIfNotExists Skip error if database doesn't exist
   * @param cascade Drop all tables in database first
   */
  def dropDatabase(name: String, ignoreIfNotExists: Boolean, cascade: Boolean): Unit
  
  /**
   * Alter database properties
   * @param name Database name
   * @param database Updated database definition
   */
  def alterDatabase(name: String, database: CatalogDatabase): Unit
}

Table Operations

Complete table management operations with schema and metadata support.

trait HiveClient {
  /**
   * Get table metadata with complete schema information
   * @param dbName Database name
   * @param tableName Table name
   * @returns CatalogTable with full metadata
   */
  def getTable(dbName: String, tableName: String): CatalogTable
  
  /**
   * List tables in database matching pattern
   * @param dbName Database name
   * @param pattern SQL LIKE pattern for table names
   * @returns Sequence of table names
   */
  def listTables(dbName: String, pattern: String): Seq[String]
  
  /**
   * Create new table in metastore
   * @param table Table definition to create
   * @param ignoreIfExists Skip creation if table exists
   */
  def createTable(table: CatalogTable, ignoreIfExists: Boolean): Unit
  
  /**
   * Drop table from metastore
   * @param dbName Database name
   * @param tableName Table name to drop
   * @param ignoreIfNotExists Skip error if table doesn't exist
   * @param purge Delete data files immediately
   */
  def dropTable(
    dbName: String,
    tableName: String,
    ignoreIfNotExists: Boolean,
    purge: Boolean
  ): Unit
  
  /**
   * Alter table schema and properties
   * @param dbName Database name
   * @param tableName Table name
   * @param table Updated table definition
   */
  def alterTable(dbName: String, tableName: String, table: CatalogTable): Unit
  
  /**
   * Rename table
   * @param dbName Database name
   * @param oldName Current table name
   * @param newName New table name
   */
  def renameTable(dbName: String, oldName: String, newName: String): Unit
}

Partition Operations

Operations for managing table partitions with dynamic partition support.

trait HiveClient {
  /**
   * Get partitions for partitioned table
   * @param table Table to get partitions for
   * @param spec Optional partition specification to filter
   * @returns Sequence of matching table partitions
   */
  def getPartitions(
    table: CatalogTable,
    spec: Option[TablePartitionSpec]
  ): Seq[CatalogTablePartition]
  
  /**
   * Get partition names matching specification
   * @param table Table to get partition names for
   * @param spec Optional partition specification to filter
   * @returns Sequence of partition names
   */
  def getPartitionNames(
    table: CatalogTable,
    spec: Option[TablePartitionSpec]
  ): Seq[String]
  
  /**
   * Create partitions in partitioned table
   * @param table Table to create partitions in
   * @param parts Partition definitions to create
   * @param ignoreIfExists Skip creation if partitions exist
   */
  def createPartitions(
    table: CatalogTable,
    parts: Seq[CatalogTablePartition],
    ignoreIfExists: Boolean
  ): Unit
  
  /**
   * Drop partitions from partitioned table
   * @param db Database name
   * @param table Table name
   * @param specs Partition specifications to drop
   * @param ignoreIfNotExists Skip error if partitions don't exist
   * @param purge Delete partition data immediately
   * @param retainData Keep partition data files
   */
  def dropPartitions(
    db: String,
    table: String,
    specs: Seq[TablePartitionSpec],
    ignoreIfNotExists: Boolean,
    purge: Boolean,
    retainData: Boolean
  ): Unit
  
  /**
   * Alter partition properties
   * @param db Database name
   * @param table Table name
   * @param spec Partition specification
   * @param partition Updated partition definition
   */
  def alterPartitions(
    db: String,
    table: String,
    spec: TablePartitionSpec,
    partition: CatalogTablePartition
  ): Unit
}

Function Operations

Operations for managing user-defined functions in Hive.

trait HiveClient {
  /**
   * Create user-defined function
   * @param db Database name
   * @param func Function definition to create
   */
  def createFunction(db: String, func: CatalogFunction): Unit
  
  /**
   * Drop user-defined function
   * @param db Database name
   * @param name Function name to drop
   */
  def dropFunction(db: String, name: String): Unit
  
  /**
   * List functions in database matching pattern
   * @param db Database name
   * @param pattern SQL LIKE pattern for function names
   * @returns Sequence of function names
   */
  def listFunctions(db: String, pattern: String): Seq[String]
  
  /**
   * Get function metadata
   * @param db Database name
   * @param name Function name
   * @returns CatalogFunction with complete metadata
   */
  def getFunction(db: String, name: String): CatalogFunction
  
  /**
   * Check if function exists
   * @param db Database name
   * @param name Function name
   * @returns true if function exists
   */
  def functionExists(db: String, name: String): Boolean
}

Implementation Classes

HiveClientImpl

Concrete implementation of HiveClient interface.

/**
 * Implementation of HiveClient using Hive metastore APIs
 */
private[hive] class HiveClientImpl(
  version: HiveVersion,
  sparkConf: SparkConf,
  hadoopConf: Configuration,
  extraClassPath: Seq[URL],
  classLoader: ClassLoader,
  config: Map[String, String]
) extends HiveClient {
  
  /** Initialize connection to Hive metastore */
  def initialize(): Unit
  
  /** Close connection to Hive metastore */
  def close(): Unit
  
  /** Reset connection state */
  def reset(): Unit
  
  /** Get underlying Hive metastore client */
  def client: IMetaStoreClient
}

Version Management

/**
 * Hive version information and compatibility
 */
case class HiveVersion(
  fullVersion: String,
  majorVersion: Int,
  minorVersion: Int
) {
  /**
   * Check if this version supports a specific feature
   * @param feature Feature name to check
   * @returns true if feature is supported
   */
  def supportsFeature(feature: String): Boolean
  
  /** Get version as comparable string */
  def versionString: String = s"$majorVersion.$minorVersion"
}

Usage Examples

Basic Database Operations

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.catalog.CatalogDatabase

val spark = SparkSession.builder()
  .enableHiveSupport()
  .getOrCreate()

// Access HiveClient through internal APIs (advanced usage)
val hiveClient = spark.sessionState.catalog.asInstanceOf[HiveSessionCatalog]
  .metastoreCatalog.client

// List all databases
val databases = hiveClient.listDatabases("*")
println(s"Databases: ${databases.mkString(", ")}")

// Create database
val newDb = CatalogDatabase(
  name = "analytics_db",
  description = "Analytics database",
  locationUri = "/user/hive/warehouse/analytics_db.db",
  properties = Map("created_by" -> "spark_user")
)
hiveClient.createDatabase(newDb, ignoreIfExists = true)

// Get database metadata
val dbMetadata = hiveClient.getDatabase("analytics_db")
println(s"Database location: ${dbMetadata.locationUri}")

Table Management

// List tables in database
val tables = hiveClient.listTables("analytics_db", "*")
println(s"Tables: ${tables.mkString(", ")}")

// Get table metadata
try {
  val tableMetadata = hiveClient.getTable("analytics_db", "user_activity")
  println(s"Table type: ${tableMetadata.tableType}")
  println(s"Schema: ${tableMetadata.schema}")
  println(s"Partitions: ${tableMetadata.partitionColumnNames}")
} catch {
  case _: NoSuchTableException =>
    println("Table does not exist")
}

// Check table statistics
val table = hiveClient.getTable("analytics_db", "large_table")
println(s"Table storage: ${table.storage}")

Partition Management

import org.apache.spark.sql.catalyst.catalog.CatalogTablePartition

// Get partitions for partitioned table
val partitionedTable = hiveClient.getTable("analytics_db", "daily_events")
val partitions = hiveClient.getPartitions(partitionedTable, None)

println(s"Found ${partitions.length} partitions")
partitions.foreach { partition =>
  println(s"Partition: ${partition.spec}, Location: ${partition.storage.locationUri}")
}

// Get specific partition
val todayPartition = Map("year" -> "2023", "month" -> "12", "day" -> "15")
val specificPartitions = hiveClient.getPartitions(
  partitionedTable, 
  Some(todayPartition)
)

// Create new partition
val newPartition = CatalogTablePartition(
  spec = Map("year" -> "2023", "month" -> "12", "day" -> "16"),
  storage = partitionedTable.storage.copy(
    locationUri = Some("/user/hive/warehouse/daily_events/year=2023/month=12/day=16")
  ),
  parameters = Map("created_by" -> "spark_job")
)

hiveClient.createPartitions(
  partitionedTable,
  Seq(newPartition),
  ignoreIfExists = true
)

Error Handling

Common exceptions when working with HiveClient:

  • NoSuchDatabaseException: When database doesn't exist
  • NoSuchTableException: When table doesn't exist
  • AlreadyExistsException: When creating existing objects
  • MetaException: General metastore errors
import org.apache.hadoop.hive.metastore.api._

try {
  val table = hiveClient.getTable("nonexistent_db", "some_table")
} catch {
  case _: NoSuchDatabaseException =>
    println("Database does not exist")
  case _: NoSuchTableException =>
    println("Table does not exist")
  case e: MetaException =>
    println(s"Metastore error: ${e.getMessage}")
}

Types

Client Configuration Types

type TablePartitionSpec = Map[String, String]

case class HiveClientConfig(
  version: HiveVersion,
  extraClassPath: Seq[URL],
  config: Map[String, String]
)