Direct access to Hive metastore for programmatic database, table, partition, and function management through the HiveClient interface. This provides low-level control over Hive metastore operations beyond what's available through SQL.
Core interface for interacting with Hive metastore programmatically.
/**
* Interface to Hive client for metastore operations
* Shared across internal and external classloaders
*/
private[hive] trait HiveClient {
/** Returns the Hive Version of this client */
def version: HiveVersion
/** Returns configuration value for the given key */
def getConf(key: String, defaultValue: String): String
/** Returns the associated Hive SessionState */
def getState: Any
/** Execute HiveQL command and return results as strings */
def runSqlHive(sql: String): Seq[String]
/** Set output streams for Hive operations */
def setOut(stream: PrintStream): Unit
def setInfo(stream: PrintStream): Unit
def setError(stream: PrintStream): Unit
/** Add JAR to class loader */
def addJar(path: String): Unit
/** Create new client session sharing class loader and Hive client */
def newSession(): HiveClient
/** Run function within Hive state (SessionState, HiveConf, Hive client and class loader) */
def withHiveState[A](f: => A): A
/** Remove all metadata - for testing only */
def reset(): Unit
}Comprehensive database management operations.
/**
* Database management operations
*/
trait HiveClient {
/** List database names matching pattern */
def listDatabases(pattern: String): Seq[String]
/** Get database metadata - throws exception if not found */
def getDatabase(name: String): CatalogDatabase
/** Check if database exists */
def databaseExists(dbName: String): Boolean
/** Set current database */
def setCurrentDatabase(databaseName: String): Unit
/** Create new database */
def createDatabase(database: CatalogDatabase, ignoreIfExists: Boolean): Unit
/**
* Drop database
* @param name - Database name to drop
* @param ignoreIfNotExists - Don't throw error if database doesn't exist
* @param cascade - Remove all associated objects (tables, functions)
*/
def dropDatabase(name: String, ignoreIfNotExists: Boolean, cascade: Boolean): Unit
/** Alter existing database */
def alterDatabase(database: CatalogDatabase): Unit
}Usage Examples:
// Assuming you have access to HiveClient instance
val client: HiveClient = // obtained from HiveExternalCatalog or similar
// List all databases
val databases = client.listDatabases("*")
println(s"Available databases: ${databases.mkString(", ")}")
// Create new database
val newDb = CatalogDatabase(
name = "analytics_db",
description = "Analytics database",
locationUri = "hdfs://cluster/user/hive/warehouse/analytics_db.db",
properties = Map("owner" -> "analytics_team")
)
client.createDatabase(newDb, ignoreIfExists = true)
// Switch to database
client.setCurrentDatabase("analytics_db")
// Get database info
val dbInfo = client.getDatabase("analytics_db")
println(s"Database location: ${dbInfo.locationUri}")Complete table management functionality.
/**
* Table management operations
*/
trait HiveClient {
/** List all tables in database */
def listTables(dbName: String): Seq[String]
/** List tables matching pattern in database */
def listTables(dbName: String, pattern: String): Seq[String]
/** Check if table exists */
def tableExists(dbName: String, tableName: String): Boolean
/** Get table metadata - throws NoSuchTableException if not found */
def getTable(dbName: String, tableName: String): CatalogTable
/** Get table metadata - returns None if not found */
def getTableOption(dbName: String, tableName: String): Option[CatalogTable]
/** Create new table */
def createTable(table: CatalogTable, ignoreIfExists: Boolean): Unit
/** Drop table */
def dropTable(dbName: String, tableName: String, ignoreIfNotExists: Boolean, purge: Boolean): Unit
/** Alter existing table */
def alterTable(table: CatalogTable): Unit
def alterTable(dbName: String, tableName: String, table: CatalogTable): Unit
/** Update table schema and properties */
def alterTableDataSchema(
dbName: String,
tableName: String,
newDataSchema: StructType,
schemaProps: Map[String, String]
): Unit
/** Get partitions matching partial spec with optional filtering */
def getPartitionsByFilter(
catalogTable: CatalogTable,
predicates: Seq[Expression]
): Seq[CatalogTablePartition]
/** Add JAR to class loader for UDF/SerDe usage */
def addJar(path: String): Unit
/** Remove all metadata - used for testing only */
def reset(): Unit
}Usage Examples:
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.catalog._
import org.apache.spark.sql.types._
// List tables
val tables = client.listTables("default")
println(s"Tables in default database: ${tables.mkString(", ")}")
// Create table
val tableSchema = StructType(Seq(
StructField("id", IntegerType, nullable = false),
StructField("name", StringType, nullable = true),
StructField("age", IntegerType, nullable = true)
))
val newTable = CatalogTable(
identifier = TableIdentifier("users", Some("default")),
tableType = CatalogTableType.MANAGED,
storage = CatalogStorageFormat.empty,
schema = tableSchema,
partitionColumnNames = Seq.empty,
properties = Map("comment" -> "User information table")
)
client.createTable(newTable, ignoreIfExists = true)
// Get table info
val tableInfo = client.getTable("default", "users")
println(s"Table schema: ${tableInfo.schema}")
println(s"Table type: ${tableInfo.tableType}")
// Check if table exists
if (client.tableExists("default", "users")) {
println("Table exists")
}Comprehensive partition management for partitioned tables.
/**
* Partition management operations
*/
trait HiveClient {
/** Create one or more partitions */
def createPartitions(
db: String,
table: String,
parts: Seq[CatalogTablePartition],
ignoreIfExists: Boolean
): Unit
/** Drop one or more partitions */
def dropPartitions(
db: String,
table: String,
specs: Seq[TablePartitionSpec],
ignoreIfNotExists: Boolean,
purge: Boolean,
retainData: Boolean
): Unit
/** Rename existing partitions */
def renamePartitions(
db: String,
table: String,
specs: Seq[TablePartitionSpec],
newSpecs: Seq[TablePartitionSpec]
): Unit
/** Alter existing partitions */
def alterPartitions(
db: String,
table: String,
newParts: Seq[CatalogTablePartition]
): Unit
/** Get partition - throws NoSuchPartitionException if not found */
def getPartition(
dbName: String,
tableName: String,
spec: TablePartitionSpec
): CatalogTablePartition
/** Get partition - returns None if not found */
def getPartitionOption(
table: CatalogTable,
spec: TablePartitionSpec
): Option[CatalogTablePartition]
/** Get partitions matching partial spec */
def getPartitions(
catalogTable: CatalogTable,
partialSpec: Option[TablePartitionSpec] = None
): Seq[CatalogTablePartition]
/** Get partition names matching partial spec */
def getPartitionNames(
table: CatalogTable,
partialSpec: Option[TablePartitionSpec] = None
): Seq[String]
/** Get partitions filtered by predicates */
def getPartitionsByFilter(
catalogTable: CatalogTable,
predicates: Seq[Expression]
): Seq[CatalogTablePartition]
}Usage Examples:
import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
// Get table for partition operations
val table = client.getTable("sales_db", "monthly_sales")
// List all partitions
val allPartitions = client.getPartitions(table)
println(s"Total partitions: ${allPartitions.length}")
// Get specific partition
val partSpec: TablePartitionSpec = Map("year" -> "2023", "month" -> "12")
val partition = client.getPartitionOption(table, partSpec)
partition match {
case Some(part) => println(s"Partition location: ${part.storage.locationUri}")
case None => println("Partition not found")
}
// Create new partition
val newPartition = CatalogTablePartition(
spec = Map("year" -> "2024", "month" -> "01"),
storage = CatalogStorageFormat(
locationUri = Some("hdfs://cluster/user/hive/warehouse/sales_db.db/monthly_sales/year=2024/month=01"),
inputFormat = Some("org.apache.hadoop.mapred.TextInputFormat"),
outputFormat = Some("org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat"),
serde = Some("org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe")
)
)
client.createPartitions("sales_db", "monthly_sales", Seq(newPartition), ignoreIfExists = true)Operations for loading data into tables and partitions.
/**
* Data loading operations
*/
trait HiveClient {
/** Load data into static partition */
def loadPartition(
loadPath: String,
dbName: String,
tableName: String,
partSpec: java.util.LinkedHashMap[String, String], // Hive requires LinkedHashMap ordering
replace: Boolean,
inheritTableSpecs: Boolean,
isSrcLocal: Boolean
): Unit
/** Load data into existing table */
def loadTable(
loadPath: String,
tableName: String,
replace: Boolean,
isSrcLocal: Boolean
): Unit
/** Load data creating dynamic partitions */
def loadDynamicPartitions(
loadPath: String,
dbName: String,
tableName: String,
partSpec: java.util.LinkedHashMap[String, String], // Hive requires LinkedHashMap ordering
replace: Boolean,
numDP: Int
): Unit
}Management of user-defined functions in Hive metastore.
/**
* Function management operations
*/
trait HiveClient {
/** Create function in database */
def createFunction(db: String, func: CatalogFunction): Unit
/** Drop existing function */
def dropFunction(db: String, name: String): Unit
/** Rename existing function */
def renameFunction(db: String, oldName: String, newName: String): Unit
/** Alter existing function */
def alterFunction(db: String, func: CatalogFunction): Unit
/** Get function - throws NoSuchPermanentFunctionException if not found */
def getFunction(db: String, name: String): CatalogFunction
/** Get function - returns None if not found */
def getFunctionOption(db: String, name: String): Option[CatalogFunction]
/** Check if function exists */
def functionExists(db: String, name: String): Boolean
/** List functions matching pattern */
def listFunctions(db: String, pattern: String): Seq[String]
}Usage Examples:
import org.apache.spark.sql.catalyst.FunctionIdentifier
import org.apache.spark.sql.catalyst.catalog.{CatalogFunction, FunctionResource}
// List functions
val functions = client.listFunctions("default", "*")
println(s"Available functions: ${functions.mkString(", ")}")
// Create custom function
val customFunction = CatalogFunction(
identifier = FunctionIdentifier("my_upper", Some("default")),
className = "com.example.MyUpperUDF",
resources = Seq(FunctionResource(
resourceType = org.apache.spark.sql.catalyst.catalog.FunctionResourceType.jar,
uri = "hdfs://cluster/jars/my-udfs.jar"
))
)
client.createFunction("default", customFunction)
// Check if function exists
if (client.functionExists("default", "my_upper")) {
println("Custom function created successfully")
}/**
* Abstract class representing Hive version with dependencies
*/
abstract class HiveVersion(
val fullVersion: String,
val extraDeps: Seq[String] = Nil,
val exclusions: Seq[String] = Nil
)
// Supported Hive versions
val allSupportedHiveVersions: Set[HiveVersion] = Set(
v12, v13, v14, v1_0, v1_1, v1_2, v2_0, v2_1, v2_2, v2_3
)