Core interface for direct interaction with Hive metastore, providing low-level access to databases, tables, partitions, and functions.
import org.apache.spark.sql.hive.client.HiveClient
import org.apache.spark.sql.hive.client.HiveClientImpl
import org.apache.spark.sql.hive.client.HiveVersion
import org.apache.spark.sql.catalyst.catalog._Core abstraction for Hive metastore client operations with version compatibility.
/**
* Interface for communicating with Hive metastore
*/
private[hive] trait HiveClient {
/** Get Hive version information */
def version: HiveVersion
/**
* Get configuration property from Hive
* @param key Configuration key
* @param defaultValue Default value if key not found
* @returns Configuration value
*/
def getConf(key: String, defaultValue: String): String
/**
* Execute raw HiveQL statement
* @param sql HiveQL statement to execute
* @returns Sequence of result strings
*/
def runSqlHive(sql: String): Seq[String]
}Complete database management operations through Hive metastore.
trait HiveClient {
/**
* Get database metadata
* @param name Database name
* @returns CatalogDatabase with complete metadata
*/
def getDatabase(name: String): CatalogDatabase
/**
* List all databases matching pattern
* @param pattern SQL LIKE pattern for database names
* @returns Sequence of database names
*/
def listDatabases(pattern: String): Seq[String]
/**
* Create new database in metastore
* @param database Database definition to create
* @param ignoreIfExists Skip creation if database exists
*/
def createDatabase(database: CatalogDatabase, ignoreIfExists: Boolean): Unit
/**
* Drop database from metastore
* @param name Database name to drop
* @param ignoreIfNotExists Skip error if database doesn't exist
* @param cascade Drop all tables in database first
*/
def dropDatabase(name: String, ignoreIfNotExists: Boolean, cascade: Boolean): Unit
/**
* Alter database properties
* @param name Database name
* @param database Updated database definition
*/
def alterDatabase(name: String, database: CatalogDatabase): Unit
}Complete table management operations with schema and metadata support.
trait HiveClient {
/**
* Get table metadata with complete schema information
* @param dbName Database name
* @param tableName Table name
* @returns CatalogTable with full metadata
*/
def getTable(dbName: String, tableName: String): CatalogTable
/**
* List tables in database matching pattern
* @param dbName Database name
* @param pattern SQL LIKE pattern for table names
* @returns Sequence of table names
*/
def listTables(dbName: String, pattern: String): Seq[String]
/**
* Create new table in metastore
* @param table Table definition to create
* @param ignoreIfExists Skip creation if table exists
*/
def createTable(table: CatalogTable, ignoreIfExists: Boolean): Unit
/**
* Drop table from metastore
* @param dbName Database name
* @param tableName Table name to drop
* @param ignoreIfNotExists Skip error if table doesn't exist
* @param purge Delete data files immediately
*/
def dropTable(
dbName: String,
tableName: String,
ignoreIfNotExists: Boolean,
purge: Boolean
): Unit
/**
* Alter table schema and properties
* @param dbName Database name
* @param tableName Table name
* @param table Updated table definition
*/
def alterTable(dbName: String, tableName: String, table: CatalogTable): Unit
/**
* Rename table
* @param dbName Database name
* @param oldName Current table name
* @param newName New table name
*/
def renameTable(dbName: String, oldName: String, newName: String): Unit
}Operations for managing table partitions with dynamic partition support.
trait HiveClient {
/**
* Get partitions for partitioned table
* @param table Table to get partitions for
* @param spec Optional partition specification to filter
* @returns Sequence of matching table partitions
*/
def getPartitions(
table: CatalogTable,
spec: Option[TablePartitionSpec]
): Seq[CatalogTablePartition]
/**
* Get partition names matching specification
* @param table Table to get partition names for
* @param spec Optional partition specification to filter
* @returns Sequence of partition names
*/
def getPartitionNames(
table: CatalogTable,
spec: Option[TablePartitionSpec]
): Seq[String]
/**
* Create partitions in partitioned table
* @param table Table to create partitions in
* @param parts Partition definitions to create
* @param ignoreIfExists Skip creation if partitions exist
*/
def createPartitions(
table: CatalogTable,
parts: Seq[CatalogTablePartition],
ignoreIfExists: Boolean
): Unit
/**
* Drop partitions from partitioned table
* @param db Database name
* @param table Table name
* @param specs Partition specifications to drop
* @param ignoreIfNotExists Skip error if partitions don't exist
* @param purge Delete partition data immediately
* @param retainData Keep partition data files
*/
def dropPartitions(
db: String,
table: String,
specs: Seq[TablePartitionSpec],
ignoreIfNotExists: Boolean,
purge: Boolean,
retainData: Boolean
): Unit
/**
* Alter partition properties
* @param db Database name
* @param table Table name
* @param spec Partition specification
* @param partition Updated partition definition
*/
def alterPartitions(
db: String,
table: String,
spec: TablePartitionSpec,
partition: CatalogTablePartition
): Unit
}Operations for managing user-defined functions in Hive.
trait HiveClient {
/**
* Create user-defined function
* @param db Database name
* @param func Function definition to create
*/
def createFunction(db: String, func: CatalogFunction): Unit
/**
* Drop user-defined function
* @param db Database name
* @param name Function name to drop
*/
def dropFunction(db: String, name: String): Unit
/**
* List functions in database matching pattern
* @param db Database name
* @param pattern SQL LIKE pattern for function names
* @returns Sequence of function names
*/
def listFunctions(db: String, pattern: String): Seq[String]
/**
* Get function metadata
* @param db Database name
* @param name Function name
* @returns CatalogFunction with complete metadata
*/
def getFunction(db: String, name: String): CatalogFunction
/**
* Check if function exists
* @param db Database name
* @param name Function name
* @returns true if function exists
*/
def functionExists(db: String, name: String): Boolean
}Concrete implementation of HiveClient interface.
/**
* Implementation of HiveClient using Hive metastore APIs
*/
private[hive] class HiveClientImpl(
version: HiveVersion,
sparkConf: SparkConf,
hadoopConf: Configuration,
extraClassPath: Seq[URL],
classLoader: ClassLoader,
config: Map[String, String]
) extends HiveClient {
/** Initialize connection to Hive metastore */
def initialize(): Unit
/** Close connection to Hive metastore */
def close(): Unit
/** Reset connection state */
def reset(): Unit
/** Get underlying Hive metastore client */
def client: IMetaStoreClient
}/**
* Hive version information and compatibility
*/
case class HiveVersion(
fullVersion: String,
majorVersion: Int,
minorVersion: Int
) {
/**
* Check if this version supports a specific feature
* @param feature Feature name to check
* @returns true if feature is supported
*/
def supportsFeature(feature: String): Boolean
/** Get version as comparable string */
def versionString: String = s"$majorVersion.$minorVersion"
}import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.catalog.CatalogDatabase
val spark = SparkSession.builder()
.enableHiveSupport()
.getOrCreate()
// Access HiveClient through internal APIs (advanced usage)
val hiveClient = spark.sessionState.catalog.asInstanceOf[HiveSessionCatalog]
.metastoreCatalog.client
// List all databases
val databases = hiveClient.listDatabases("*")
println(s"Databases: ${databases.mkString(", ")}")
// Create database
val newDb = CatalogDatabase(
name = "analytics_db",
description = "Analytics database",
locationUri = "/user/hive/warehouse/analytics_db.db",
properties = Map("created_by" -> "spark_user")
)
hiveClient.createDatabase(newDb, ignoreIfExists = true)
// Get database metadata
val dbMetadata = hiveClient.getDatabase("analytics_db")
println(s"Database location: ${dbMetadata.locationUri}")// List tables in database
val tables = hiveClient.listTables("analytics_db", "*")
println(s"Tables: ${tables.mkString(", ")}")
// Get table metadata
try {
val tableMetadata = hiveClient.getTable("analytics_db", "user_activity")
println(s"Table type: ${tableMetadata.tableType}")
println(s"Schema: ${tableMetadata.schema}")
println(s"Partitions: ${tableMetadata.partitionColumnNames}")
} catch {
case _: NoSuchTableException =>
println("Table does not exist")
}
// Check table statistics
val table = hiveClient.getTable("analytics_db", "large_table")
println(s"Table storage: ${table.storage}")import org.apache.spark.sql.catalyst.catalog.CatalogTablePartition
// Get partitions for partitioned table
val partitionedTable = hiveClient.getTable("analytics_db", "daily_events")
val partitions = hiveClient.getPartitions(partitionedTable, None)
println(s"Found ${partitions.length} partitions")
partitions.foreach { partition =>
println(s"Partition: ${partition.spec}, Location: ${partition.storage.locationUri}")
}
// Get specific partition
val todayPartition = Map("year" -> "2023", "month" -> "12", "day" -> "15")
val specificPartitions = hiveClient.getPartitions(
partitionedTable,
Some(todayPartition)
)
// Create new partition
val newPartition = CatalogTablePartition(
spec = Map("year" -> "2023", "month" -> "12", "day" -> "16"),
storage = partitionedTable.storage.copy(
locationUri = Some("/user/hive/warehouse/daily_events/year=2023/month=12/day=16")
),
parameters = Map("created_by" -> "spark_job")
)
hiveClient.createPartitions(
partitionedTable,
Seq(newPartition),
ignoreIfExists = true
)Common exceptions when working with HiveClient:
import org.apache.hadoop.hive.metastore.api._
try {
val table = hiveClient.getTable("nonexistent_db", "some_table")
} catch {
case _: NoSuchDatabaseException =>
println("Database does not exist")
case _: NoSuchTableException =>
println("Table does not exist")
case e: MetaException =>
println(s"Metastore error: ${e.getMessage}")
}type TablePartitionSpec = Map[String, String]
case class HiveClientConfig(
version: HiveVersion,
extraClassPath: Seq[URL],
config: Map[String, String]
)