Integration with Hive metastore for table metadata, database operations, and catalog management.
import org.apache.spark.sql.hive.HiveExternalCatalog
import org.apache.spark.sql.hive.client.{HiveClient, HiveClientImpl, HiveVersion}
import org.apache.spark.sql.hive.HiveMetastoreCatalog
import org.apache.spark.sql.catalyst.catalog._
import org.apache.spark.sql.execution.datasources.LogicalRelationAccess to Hive metastore through the external catalog interface.
class HiveExternalCatalog(
conf: SparkConf,
hadoopConf: Configuration
) extends ExternalCatalog {
/** Create database in Hive metastore */
override def createDatabase(
dbDefinition: CatalogDatabase,
ignoreIfExists: Boolean
): Unit
/** Drop database from Hive metastore */
override def dropDatabase(
db: String,
ignoreIfNotExists: Boolean,
cascade: Boolean
): Unit
/** Create table in Hive metastore */
override def createTable(
tableDefinition: CatalogTable,
ignoreIfExists: Boolean
): Unit
/** Drop table from Hive metastore */
override def dropTable(
db: String,
table: String,
ignoreIfNotExists: Boolean,
purge: Boolean
): Unit
/** Get table metadata from Hive metastore */
override def getTable(db: String, table: String): CatalogTable
/** List all tables in database */
override def listTables(db: String, pattern: Option[String]): Seq[String]
}Low-level interface for Hive metastore client operations.
private[hive] trait HiveClient {
/** Get Hive version information */
def version: HiveVersion
/** Get database metadata */
def getDatabase(name: String): CatalogDatabase
/** List all databases */
def listDatabases(pattern: String): Seq[String]
/** Get table metadata with detailed schema information */
def getTable(dbName: String, tableName: String): CatalogTable
/** List tables in database matching pattern */
def listTables(dbName: String, pattern: String): Seq[String]
/** Create new table in metastore */
def createTable(table: CatalogTable, ignoreIfExists: Boolean): Unit
/** Drop table from metastore */
def dropTable(
dbName: String,
tableName: String,
ignoreIfNotExists: Boolean,
purge: Boolean
): Unit
/** Get partitions for partitioned table */
def getPartitions(
table: CatalogTable,
spec: Option[TablePartitionSpec]
): Seq[CatalogTablePartition]
/** Create partition in partitioned table */
def createPartitions(
table: CatalogTable,
parts: Seq[CatalogTablePartition],
ignoreIfExists: Boolean
): Unit
}Converts Hive table relations for optimized Spark processing.
class HiveMetastoreCatalog(sparkSession: SparkSession) {
/**
* Convert Hive table relation to optimized Spark relation
* @param relation Hive table relation to convert
* @param isWrite Whether this conversion is for write operations
* @returns Converted logical relation
*/
def convert(relation: HiveTableRelation, isWrite: Boolean): LogicalRelation
/**
* Convert storage format for data source operations
* @param storage Catalog storage format to convert
* @returns Converted storage format
*/
def convertStorageFormat(storage: CatalogStorageFormat): CatalogStorageFormat
}private[hive] class HiveClientImpl(
version: HiveVersion,
sparkConf: SparkConf,
hadoopConf: Configuration,
extraClassPath: Seq[URL],
classLoader: ClassLoader,
config: Map[String, String]
) extends HiveClient {
/** Initialize connection to Hive metastore */
def initialize(): Unit
/** Close connection to Hive metastore */
def close(): Unit
/** Execute raw Hive metastore thrift call */
def runSqlHive(sql: String): Seq[String]
}Manages class loading isolation for different Hive versions.
private[hive] class IsolatedClientLoader(
version: HiveVersion,
sparkConf: SparkConf,
execJars: Seq[URL],
hadoopConf: Configuration,
config: Map[String, String]
) extends Logging {
/** Create isolated Hive client instance */
def createClient(): HiveClient
/** Get underlying class loader for this Hive version */
def classLoader: ClassLoader
}import org.apache.spark.sql.SparkSession
val spark = SparkSession.builder()
.enableHiveSupport()
.getOrCreate()
// List databases
spark.sql("SHOW DATABASES").show()
// Create database
spark.sql("CREATE DATABASE IF NOT EXISTS my_database")
// Use database
spark.sql("USE my_database")
// Create external table pointing to Hive data
spark.sql("""
CREATE TABLE IF NOT EXISTS users (
id INT,
name STRING,
age INT
)
USING HIVE
LOCATION '/user/hive/warehouse/users'
""")
// Access table metadata
val tableMetadata = spark.catalog.getTable("my_database", "users")
println(s"Table location: ${tableMetadata}")// Create partitioned table
spark.sql("""
CREATE TABLE IF NOT EXISTS sales (
product_id INT,
quantity INT,
revenue DOUBLE
)
PARTITIONED BY (year INT, month INT)
USING HIVE
""")
// Add partition
spark.sql("""
ALTER TABLE sales
ADD PARTITION (year=2023, month=12)
LOCATION '/user/hive/warehouse/sales/year=2023/month=12'
""")
// Query specific partition
val decemberSales = spark.sql("""
SELECT * FROM sales
WHERE year = 2023 AND month = 12
""")Common metastore integration exceptions:
import org.apache.hadoop.hive.metastore.api.MetaException
try {
spark.sql("DROP TABLE non_existent_table")
} catch {
case e: AnalysisException if e.getMessage.contains("Table or view not found") =>
println("Table does not exist")
case e: MetaException =>
println(s"Metastore error: ${e.getMessage}")
}case class HiveVersion(
fullVersion: String,
majorVersion: Int,
minorVersion: Int
) {
def supportsFeature(feature: String): Boolean
}case class CatalogDatabase(
name: String,
description: String,
locationUri: String,
properties: Map[String, String]
)
case class CatalogTable(
identifier: TableIdentifier,
tableType: CatalogTableType,
storage: CatalogStorageFormat,
schema: StructType,
provider: Option[String],
partitionColumnNames: Seq[String],
bucketSpec: Option[BucketSpec],
properties: Map[String, String]
)
case class CatalogTablePartition(
spec: TablePartitionSpec,
storage: CatalogStorageFormat,
parameters: Map[String, String]
) {
def location: Option[URI]
def toRow: InternalRow
}
case class CatalogFunction(
identifier: FunctionIdentifier,
className: String,
resources: Seq[FunctionResource]
)